IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据 -> 正文阅读

[大数据]教材P164操作题。编写Spark Steaming程序,使用leftOuterJoin操作及filter方法过滤掉黑名单的数据

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
import scala.Tuple2;
 
public final class 过滤黑名单{
	
	     
	      public static void main(String[] args) throws Exception 
	{
		if (args.length< 2) {
			System.err.println("需要传入参数:主机名端口号");
			System.exit(1);
	            }
	// 设置拉取数据的频率,即批处理的时间间隔为1秒
	//控制台上显示的是每隔1000毫秒
		SparkConf  sparkConf = new SparkConf().setAppName
				("JavaNetworkWordCount").setMaster("local[2]");
		JavaStreamingContext  ssc = new 
			JavaStreamingContext(sparkConf, Durations.seconds(10));
	   JavaReceiverInputDStream<String> filedata = ssc.socketTextStream(
	args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
	   JavaRDD<String>  blackname= ssc.sparkContext().textFile("file:///homeq/eclipse-workspace/t33");
//	   JavaDStream<String> blackname=ssc.textFileStream("file:///homeq/eclipse-workspace/t33");
	   JavaDStream<String> words = filedata.map(f->f);
		 JavaPairRDD<String,String> prd=blackname.mapToPair(f->new Tuple2<String,String>(f.split(" ")[0],f.split(" ")[1]));
		
	   //此处的reduceByKey方法,每次只输出当次的操作记录,
	   //不保留上次的记录信息。对应的就是只针对本次的key,values。
	   //要保留前次的操作记录。相对应的方法就是updateStateByKey。
	   JavaPairDStream<String, Tuple2<Tuple2<String, String>, Optional<String>>> pard=words.mapToPair
				(f->new Tuple2<String,Tuple2<String,String>>(f.split(" ")[1],
						new Tuple2<>(f.split(" ")[0],f.split(" ")[1]))).transformToPair(fs->{
			JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd2= fs.leftOuterJoin(prd);
			JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<String>>> prd3=
					   prd2.filter(f->!f._2._2().toString().contains("true"));
			return prd3;
			
						}
								);
	   
	   
	   JavaPairDStream<String,String> prd4=pard.mapToPair(f->f._2._1);
 
prd4.print();
	   ssc.start();     // 启动Spark Streaming,开始计算
	ssc.awaitTermination();    // 等待计算结束
	
	  }
	}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-29 12:12:55  更:2022-04-29 12:13:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 1:01:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码