目录
需求:查询给定的若干表在代码中有几个被使用
转换概念:给定的关键
词,在日志文件中命中了几种
选型:Spark解析关键词列表rdd1;Spark解析文件目录数据rdd2;rdd1 join rdd2(broadcast)
上代码:
Step 1:创建SparkContext
Step 2:读取目标路径下的文件
Step 3:读取关键词列表
Step 4:把若干文件的数据整合到一起
Step 6:核心计算【广播Join,计数,筛选,去重】
Step 7:关闭SparksContext
总结
技巧1:sc.textFile读文件,返回内容的数组,sc.wholeTextFiles读文件,返回的是[(path, content)],即文件路径和文件内容的元组的数组二者都可以正则匹配目录及文件,比如
技巧2:正则匹配不支持深度+2的递归,比如指定/opt/*;
技巧3:如果想覆盖的目录更全面些,可以自己弄一个数组,拼接成一串path
技巧4:如果想用多种分隔符可以用|隔开
需求:查询给定的若干表在代码中有几个被使用
转换概念:给定的关键
词,在日志文件中命中了几种
选型:Spark解析关键词列表rdd1;Spark解析文件目录数据rdd2;rdd1 join rdd2(broadcast)
(计划用flume采集目录下的文件,sink到HDFS,再用Spark计算,后来发现Spark就能全覆盖)
从一个列表中去若干文件中查找有多少个地方,用到了那几个单词
list.txt文件内容如下
table_a
table_b
table_c
被查询目录:
game
├── lagou
│?? ├── notes
│?? │?? ├── Kafka.pdf
│?? │?? ├── Redis01.pdf
│?? │?? └── Redis06.pdf
│?? ├── servers
│?? │?? ├── apache-maven-3.6.3
│?? │?? │?? ├── bin
│?? │?? │?? ├── conf
│?? │?? │?? └── README.txt
│?? │?? ├── flume-1.9.0
│?? │?? │?? ├── bin
│?? │?? │?? ├── conf
│?? │?? │?? └── tools
│?? │?? ├── hadoop-2.9.2
│?? │?? │?? ├── bin
│?? │?? │?? ├── etc
│?? │?? │?? └── share
│?? │?? ├── hbase-1.3.1
│?? │?? │?? ├── bin
│?? │?? │?? ├── conf
│?? │?? │?? └── README.txt
│?? │?? ├── hive-2.3.7
│?? │?? │?? ├── bin
│?? │?? │?? ├── conf
│?? │?? │?? └── scripts
│?? │?? ├── kafka_2.12-1.0.2
│?? │?? │?? ├── bin
│?? │?? │?? ├── config
│?? │?? │?? └── site-docs
│?? │?? ├── spark-2.4.5
│?? │?? │?? ├── bin
│?? │?? │?? ├── conf
│?? │?? │?? └── yarn
│?? │?? └── zookeeper-3.4.14
│?? │?? ├── bin
│?? │?? ├── conf
│?? │?? └── zookeeper-server
│?? └── software
│?? ├── azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz
│?? ├── cdh
│?? │?? ├── 5 -> 5.7.6
│?? │?? ├── 5.7 -> 5.7.6
│?? │?? └── 5.7.6
│?? ├── clickhouse2
│?? ├── flink-1.11.1-bin-scala_2.11.tgz
│?? └── nohup.out
└── rh
└── devtoolset-8
├── enable
└── root
├── bin -> usr/bin
├── etc
├── home
├── opt
├── root
├── usr
└── var
实验环境
IntelliJ IDEA
scala 2.11.8
spark 2.4.5
hadoop 2.9.2
maven 3.6.3
上代码:
Step 1:创建SparkContext
val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
Step 2:读取目标路径下的文件
val srcFile0 = "file:///G:\\client\\TsProj\\src\\game\\*";
val srcFile1 = "file:///G:\\client\\TsProj\\src\\game\\*\\*.ts";
val srcFile2 = "file:///G:\\client\\TsProj\\src\\game\\*\\*\\*.ts";
val srcFile3 = "file:///G:\\client\\TsProj\\src\\game\\*\\*\\*\\*.ts";
val srcFile4 = "file:///G:\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*.ts";
val srcFile5 = "file:///G:\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*\\*.ts";
val srcFile6 = "file:///G:\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*\\*\\*.ts";
val fileList = Array(srcFile2, srcFile3, srcFile4, srcFile5, srcFile6);
val content_map: RDD[(String, String)] = sc.wholeTextFiles(fileList.mkString(","))
Step 3:读取关键词列表
val listFile = "file:///C:\\Users\\pc\\Desktop\\table_list.txt";
val listData = sc.textFile(listFile).map(x => (x.split("\\|")(0))).collect();
listData.foreach(println)
Step 4:把若干文件的数据整合到一起
val arr_lines: Array[String] = content_map.collect().map {
content =>
println(s"path = ${content._1}")
content._2
}.flatMap(_.split("\n+"))
/**
* path = file:/G:/client/TsProj/src/game/entity/component/BagComponent.ts
* path = file:/G://client/TsProj/src/game/entity/component/BattleArrayComponent.ts
* path = file:/G://client/TsProj/src/game/entity/component/CardBaoQiComponent.ts
*/
arr_lines.take(5).foreach { l => println(s"每一行==> ${l}") }
/**
* 每一行==> import { Component } from "../../../framework/entity/Component";
* 每一行==> import { ysj_ts } from "../../../data/pb/gen/pb";
* 每一行==> import { S } from '../../../global/GameConfig';
* 每一行==> import { itemconfigTR } from "data/fb/itemconfig-t-r";
* 每一行==> import { ArrayMap } from "framework/common/ArrayMap";
*/
println(s"行数 = ${arr_lines.length}");
/**
* 行数 = 68243
*/
Step 5:单词处理【分割,前缀过滤,后缀过滤】
//用分隔符处理【空格,点,等于号,分号】
val words = arr_lines.flatMap(_.trim.split("\\s+|\\.|=|;"))
words.take(3).foreach { w => println(s"每单词==> ${w}") }
/**
* 每单词==> import
* 每单词==> {
* 每单词==> Component
*/
println(s"单词数 = ${words.length}");
/**
* 单词数 = 230893
*/
//过滤关键词前缀
val filerStartWord = words.filter(_.startsWith("get"))
println(s"滤前缀==> ${filerStartWord.length}");
/**
* 滤前缀==> 5664
*/
filerStartWord.take(3).foreach { w => println(s"每前缀==> ${w}") }
/**
* 每前缀==> getInstance()
* 每前缀==> getInstance()
* 每前缀==> getInstance()
*/
//过滤关键词后缀
val filerEndWord = words.filter(_.endsWith("TBS()"))
println(s"滤后缀==> ${filerEndWord.length}");
/** 滤后缀==> 197 */
filerEndWord.foreach { w => println(s"每后缀==> ${w}") }
/**
* 每后缀==> getmainlevelconfigTBS()
* 每后缀==> getmainlevelsettingconfigTBS()
* 每后缀==> getelementalresonanceconfigTBS()
*/
Step 6:核心计算【广播Join,计数,筛选,去重】
//广播变量
val listBC = sc.broadcast(listData)
//Join,计数,筛选,去重
val tuples = filerEndWord.map { iter =>
val tabInfo: Array[String] = listBC.value
//println(iter)
var cnt = 0;
for (tab <- tabInfo) {
if (iter.toLowerCase.contains("get" + tab.toLowerCase + "tbs")) {
cnt = cnt + 1;
}
}
(iter, cnt);
}.filter(_._2 > 0).distinct
tuples.take(3).foreach(println(_))
/**
* (getleveldifficultyconfigTBS(),1)
* (getmonsterviewconfigTBS(),1)
*/
println(s"匹配 = ${tuples.length}")
/**
* 匹配 = 2
*/
Step 7:关闭SparksContext
// 关闭SparkContext
sc.stop()
总结
技巧1:sc.textFile读文件,返回内容的数组, sc.wholeTextFiles读文件,返回的是[(path, content)],即文件路径和文件内容的元组的数组 二者都可以正则匹配目录及文件,比如
c:/dir/*/*
技巧2:正则匹配不支持深度+2的递归, 比如指定/opt/*;
opt
├── lagou
│???├── notes
│???│???├── Kafka.pdf
│???│???└── 267.pdf
│???├── servers
│???│???├── apache-maven-3.6.3
│???│???└── zookeeper-3.4.14
│???└── software
│??? ? ?├── apache-maven-3.6.3-bin.tar.gz
│??? ? ?├── azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz
│??? ? ?├── cdh
│??? ? ?├── clickhouse
│??? ? ?├── derby.log
│??? ? ?├── mysql-community-test-5.7.26-1.el7.x86_64.rpm
│??? ? ?└── nohup.out
└── rh
? ? └── devtoolset-8
? ? ? ? ├── enable
? ? ? ? └── root
只能查到【opt】目录下的文件 和【opt.lagou, opt.rh】里面的文件,查不到更深一层的目录[note,servers,software],后续补充
技巧3:如果想覆盖的目录更全面些,可以自己弄一个数组,拼接成一串path
val srcFile2 = "file:///G:\\ysj\\client\\TsProj\\src\\game\\*\\*\\*.ts";
val srcFile3 = "file:///G:\\ysj\\client\\TsProj\\src\\game\\*\\*\\*\\*.ts";
val srcFile4 = "file:///G:\\ysj\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*.ts";
val srcFile5 = "file:///G:\\ysj\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*\\*.ts";
val srcFile6 = "file:///G:\\ysj\\client\\TsProj\\src\\game\\*\\*\\*\\*\\*\\*\\*.ts";
val fileList = Array(srcFile2, srcFile3, srcFile4, srcFile5, srcFile6);
val value: RDD[(String, String)] = sc.wholeTextFiles(fileList.mkString(","))
技巧4:如果想用多种分隔符可以用|隔开
val words = parts.flatMap(_.trim.split("\\S+|\\.|=|;"))
这里支持的分隔符有四种【多个空格】【.】【=】【;】,任意一种匹配上即切割单词
|