IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark_黑名单过滤 -> 正文阅读

[大数据]spark_黑名单过滤

wedasasda

数据如下:

3333 flume
4444 ooize
5555 flume
4444 ooize
5555 flume
2222 hive
3333 hadoop
4444 hbase
3333 flume
4444 ooize
5555 flume
flume 1
hadoop 2

import java.io.Serializable;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import org.apache.spark.streaming.Durations;

public class finaltest {

	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		//1.获取实时数据
		SparkConf sparkConf = new SparkConf().setAppName("Streaming").setMaster("local[2]");
		JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(60));
		JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
		//2.处理数据,获得每天对某个广告点击超过N次的用户
		JavaPairDStream<String,String> data = lines.mapToPair(f -> new Tuple2<>(f.split(",")[0],f.split(",")[1]));
		data.foreachRDD(rdd -> {
			JavaRDD<Advert> adRDD = rdd.map(f -> {
				Advert ad = new Advert();
				ad.setUserId(f._1);
				ad.setAdvertId(f._2);
				return ad;
			});
		
		SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
		Dataset<Row> words = spark.createDataFrame(adRDD,Advert.class);
		words.createOrReplaceGlobalTempView("words");
		Dataset<Row> result = spark.sql("select userId from (select userId,advertId,count(*) from words group by userId,advertId having count(*) > 2 a");
		//3.将实时产生的黑名单存入MYSQL数据库	
		result.write().format("jdbc").option
			("url","jdbc:mysql://localhost:3306/studentInfo").option
			("driver","com.mysql.jdbc.Driver").option
			("dbtable","lists").option
			("user","debian-sys-maint").option
			("password","6KCiLZuGt5t8EuZU").mode("append").save();
		});
		//4.实时从MYSQL中读取黑名单
		JavaPairDStream<String,Integer> data2 = data.transformToPair(rdd -> {
			SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
			Dataset<Row> jdbcDF = spark.read().format("jdbc").option
					("url", "jdbc:mysql://localhost:3306/studentInfo").option
					("driver","com.mysql.jdbc.Driver").option
					("dbtable","lists").option
					("user","debian-sys-maint").option
					("password","6KCiLZuGt5t8EuZU").load();
					JavaRDD<Row> stu = ssc.sparkContext().parallelize(jdbcDF.collectAsList());
					JavaRDD<String> rdd1 = stu.map(f -> f.toString());
					List<String> rdd2 = rdd1.distinct().collect();
					//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
					JavaPairRDD<String,String> rdd3 = rdd.filter(f -> !(rdd2.contains(f._1)));
					//6.实时统计广告点击数 7.输出前三点击量的广告到文件
					JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f -> new Tuple2<String,Integer>(f._2,1)).reduceByKey((x,y) -> x+y);
					JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f -> f.swap()).sortByKey(false).mapToPair(f -> f.swap());
					JavaPairRDD<String,Integer> rdd6 = ssc.sparkContext().parallelizePairs(rdd5.take(3));
					return rdd6;			
		});
		data2.dstream().repartition(1).saveAsTextFiles("/home/yyl/data/top3","spark");
		ssc.start();
		ssc.awaitTermination();
		
		}	

	//2.处理数据,获得每天对某个广告点击超过N次的用户
			public static class JavaSparkSessionSingleton{
				private static transient SparkSession instance = null;
				public static SparkSession getInstance(SparkConf sparkConf) {
					if(instance == null) {
						instance = SparkSession.builder().config(sparkConf).getOrCreate();
					}
					return instance;
				}
			}	
			public static class Advert implements Serializable{
				private String userId;
				private String advertId;
				public String getUserId() {
					return userId;
				}
				public void setUserId(String userId) {
					this.userId = userId;
				}
				public String getAdvertId() {
					return advertId;
				}
				public void setAdvertId(String advertId) {
					this.advertId = advertId;
				}
			}
				
	
}

-=============================================================================================================
package thisterm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class filterblock {
	private static final Pattern SPACE = Pattern.compile(" ");
	public static void main(String[] args) throws IOException, InterruptedException {
		if (args.length < 2) {
			System.err.println("需要传入参数:主机名端口号");
			System.exit(1);
		}
		SparkConf sparkConf = new SparkConf().setAppName("JavaNetWorkWordCount").setMaster("local[2]");
		JavaStreamingContext scc = new JavaStreamingContext(sparkConf,Durations.seconds(10));
//		 JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
	        SQLContext sqlContext = new SQLContext(scc.sparkContext());
		String url = "jdbc:mysql://localhost:3306/name";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","com.mysql.cj.jdbc.Driver");
		JavaReceiverInputDStream<String> lines = scc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORY_AND_DISK_SER);
		JavaDStream<String> words = lines.flatMap(f -> {
			return Arrays.asList(f).iterator();
		});	
		JavaPairDStream<String,Integer> wordCounts = words.mapToPair(f -> new Tuple2<>(f,1)).reduceByKey((x,y) -> x + y);
		JavaPairDStream<String,Integer> wordCountfiltter=wordCounts.filter(f->f._2>2);//333 flume  1
		JavaDStream<String> wordC=wordCountfiltter.map(f->f._1.split(" ")[1]+" "+f._2);//flume  1
 		JavaDStream<Row> personsRDD = wordC.map(new Function<String,Row>(){
            public Row call(String line) throws Exception {
                String[] splited = line.split(" ");
                return RowFactory.create(splited[0],Integer.valueOf(splited[1]));
            }
        });
	        List structFields = new ArrayList();
	        structFields.add(DataTypes.createStructField("bname",DataTypes.StringType,true));
	        structFields.add(DataTypes.createStructField("number",DataTypes.IntegerType,true));
	        StructType structType = DataTypes.createStructType(structFields);
	        personsRDD.foreachRDD(f->{
	        	 Dataset personsDF = sqlContext.createDataFrame(f,structType);
	        	 personsDF.write().mode("append").jdbc(url,"blockname1",connectionProperties);
	        });
	        List<String> listblock=new ArrayList<String>();
	        personsRDD.foreachRDD(f->{
	        Dataset<Row> personsDF = sqlContext.createDataFrame(f,structType);
	        Dataset<Row> readfile= sqlContext.read().jdbc(url,"blockname1",connectionProperties);
	        JavaRDD<Row> stu=scc.sparkContext().parallelize(readfile.collectAsList());
	        JavaRDD<String> rdd1=stu.map(f1->f1.toString().split(",")[0].substring(1));
	        
	        rdd1.foreach(f2->System.err.println(f2));
	        List<String> list = rdd1.distinct().collect();
			//5.根据黑名单内容对获得的广告点击数据进行过滤,去掉黑名单中的用户
	        listblock.addAll(list);
			
	       // System.out.println(stu.toString());
	       // readfile.show();
	        
	        });
	       
	        words.foreachRDD(f->{
	        	JavaPairRDD<String,String> rdd=f.mapToPair(s->new Tuple2<String,String>(s.split(" ")[0],s.split(" ")[1]));
	        	JavaPairRDD<String,String> rdd3 = rdd.filter(ff -> !(listblock.contains(ff._1)));
				//6.实时统计广告点击数 7.输出前三点击量的广告到文件
				JavaPairRDD<String,Integer> rdd4 = rdd3.mapToPair(f3 -> new Tuple2<String,Integer>(f3._2,1)).reduceByKey((x,y) -> x+y);
				JavaPairRDD<String,Integer> rdd5 = rdd4.mapToPair(f4 -> f4.swap()).sortByKey(false).mapToPair(f4 -> f4.swap());
				JavaPairRDD<String,Integer> rdd6 = scc.sparkContext().parallelizePairs(rdd5.take(3));
				
	        });
	      
		wordCountfiltter.print();
		scc.start();
		scc.awaitTermination();
		}
	
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:46:25  更:2022-05-09 12:49:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:30:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码