import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public final class 过滤黑名单{
public static void main(String[] args) throws Exception
{
if (args.length< 2) {
System.err.println("需要传入参数:主机名端口号");
System.exit(1);
}
// 设置拉取数据的频率,即批处理的时间间隔为1秒
//控制台上显示的是每隔1000毫秒
SparkConf sparkConf = new SparkConf().setAppName
("JavaNetworkWordCount").setMaster("local[2]");
JavaStreamingContext ssc = new
JavaStreamingContext(sparkConf, Durations.seconds(10));
JavaReceiverInputDStream<String> filedata = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaRDD<String> blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33");
// JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33");
JavaDStream<String> words = filedata.map(f->f);
JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1]));
//此处的reduceByKey方法,每次只输出当次的操作记录,
//不保留上次的记录信息。对应的就是只针对本次的key,values。
//要保留前次的操作记录。相对应的方法就是updateStateByKey。
JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair
(f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1],
new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd);
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3=
prd2.filter(f->!f._2._2().toString().contains("true"));
return prd3;
}
);
JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1);
prd4.print();
ssc.start(); // 启动Spark Streaming,开始计算
ssc.awaitTermination(); // 等待计算结束
}
}
|