wedasasda
数据如下:
3333 flume
4444 ooize
5555 flume
4444 ooize
5555 flume
2222 hive
3333 hadoop
4444 hbase
3333 flume
4444 ooize
5555 flume
flume 1
hadoop 2
import java.io.Serializable;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import org.apache.spark.streaming.Durations;
public class finaltest {
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
//1.获取实时数据
SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60));
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
//2.处理数据,获得每天对某个广告点击超过N次的用户
JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1]));
data.foreachRDD(rdd -> {
JavaRDD<Advert> adRDD = rdd.map(f -> {
Advert ad = new Advert();
ad.setUserId(f._1);
ad.setAdvertId(f._2);
return ad;
});
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class);
words.createOrReplaceGlobalTempView("words");
Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a");
//3.将实时产生的黑名单存入MYSQL数据库
result.write().format("jdbc").option
("url","jdbc:mysql://localhost:3306/studentInfo").option
("driver","com.mysql.jdbc.Driver").option
("dbtable","lists").option
("user","debian-sys-maint").option
("password","6KCiLZuGt5t8EuZU").mode("append").save();
});
//4.实时从MYSQL中读取黑名单
JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> {
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> jdbcDF = spark.read().format("jdbc").option
("url", "jdbc:mysql://localhost:3306/studentInfo").option
("driver","com.mysql.jdbc.Driver").option
("dbtable","lists").option
("user","debian-sys-maint").option
("password","6KCiLZuGt5t8EuZU").load();
JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());
JavaRDD<String> rdd1 = stu.map(f -> f.toString());
List<String> rdd2 = rdd1.distinct().collect();
//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1)));
//6.实时统计广告点击数 7.输出前三点击量的广告到文件
JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y);
JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap());
JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));
return rdd6;
});
data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark");
ssc.start();
ssc.awaitTermination();
}
//2.处理数据,获得每天对某个广告点击超过N次的用户
public static class JavaSparkSessionSingleton{
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if(instance == null) {
instance = SparkSession.builder().config(sparkConf).getOrCreate();
}
return instance;
}
}
public static class Advert implements Serializable{
private String userId;
private String advertId;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getAdvertId() {
return advertId;
}
public void setAdvertId(String advertId) {
this.advertId = advertId;
}
}
}
-=============================================================================================================
package thisterm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class filterblock {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws IOException, InterruptedException {
if (args.length < 2) {
System.err.println("需要传入参数:主机名端口号");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]");
JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10));
// JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
SQLContext sqlContext = new SQLContext(scc.sparkContext());
String url = "jdbc:mysql://localhost:3306/name";
Properties connectionProperties = new Properties();
connectionProperties.put("user","root");
connectionProperties.put("password","123456");
connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");
JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(f -> {
return Arrays.asList(f).iterator();
});
JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y);
JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume 1
JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume 1
JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){
public Row call(String line) throws Exception {
String[] splited = line.split(" ");
return RowFactory.create(splited[0],Integer.valueOf(splited[1]));
}
});
List structFields = new ArrayList();
structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true));
StructType structType = DataTypes.createStructType(structFields);
personsRDD.foreachRDD(f->{
Dataset personsDF = sqlContext.createDataFrame(f,structType);
personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties);
});
List<String> listblock=new ArrayList<String>();
personsRDD.foreachRDD(f->{
Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType);
Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties);
JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList());
JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1));
rdd1.foreach(f2->System.err.println(f2));
List<String> list = rdd1.distinct().collect();
//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
listblock.addAll(list);
// System.out.println(stu.toString());
// readfile.show();
});
words.foreachRDD(f->{
JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1]));
JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1)));
//6.实时统计广告点击数 7.输出前三点击量的广告到文件
JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y);
JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap());
JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3));
});
wordCountfiltter.print();
scc.start();
scc.awaitTermination();
}
}
|