IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据课程——Storm综合应用 -> 正文阅读

[大数据]大数据课程——Storm综合应用

作者:token keyword

大数据课程——Storm综合应用

实验内容以及要求

假设在某一搜索应用中,需要实时统计搜索次数最多的热门关键词,并按照实时统计搜索次数输出最热门的20大热门关键词及被搜索次数。用户搜索的日志通过Flume采集,并写入Kafka,Storm从Kafka中实时读取数据,并完成实时统计工作,并进行输出。
在这里插入图片描述
提示:
(1)搜索日志可以采用实验5的数据(搜狗搜索数据),一行代表一次用户搜索;
(2)Flume Agent的Source可以配置为syslogudp类型(端口5640),监控搜索日志;
(3)输出形式自定。

问题总结

Agent配置时的Source类型

Client包是模拟数据包的产生的,将数据发往5640端口。因此Flume的Agent配置要写明Source是syslogudp类型,并且监控5640端口。

Storm的UI界面端口问题

Storm的UI界面端口是可以改动的,默认为8080。但在实验室里做实验的话,就发现老师已经吧端口改了,改成了8099,因此要访问localhost:8099才可以进入Storm的UI界面,否则会出现404错误,这个要注意。

Key、Value问题

写代码的时候,一开始照着书上的Kafka整合Storm的代码来写,但其实获取数据的时候,其实Key是没东西的,Value才能获得具体数据。Key得到的全是null。如下图所示。

在这里插入图片描述

吐槽一下

顺便吐槽一下,实验数据输出的排行榜略少儿不宜。但没办法,只能说数据太真实了哈哈哈。
另外具体代码放在本文最后面。

实验步骤

Flume Agent 配置

在这里插入图片描述
本次将Centos01作为Flume的Agent,负责监控5640端口,拦截收到的数据包,并将其中的数据存储到Kafka集群的topictest话题中。
所以将Source的类型设置为syslogudp,监控5640端口,接受其中产生的数据包。
Sink设置为Kafka集群,将得到的数据包传入kafka集群中。

Storm代码编写思路

在这里插入图片描述
如上图所示,我的项目中主体由KafkaSpout、SplitDataBolt、WordCountBolt、ReportBolt四项组成,数据在其中依次流动,StormTopology对他们进行设置、调用。(代码在文末)。
KafkaSpout作为数据源,从kafka集群中提取需要的数据,并且发送给SplitDataBolt进行处理。
SplitOutBolt作为一个处理Bolt,将接收到的数据进行分割处理,从中提取出关键字字段,并且设置值为1(因为出现一次),发送给WordCountBolt进行统计。
WordCountBolt作为第二个处理Bolt,对关键词的出现次数进行统计和存储,并将统计结果发送给ReportBolt。
ReportBolt作为第三个Bolt,将接收到的“关键词-出现次数”数据进行存储,并且按照“出现次数”进行排序,将排序结果进行输出,得到最终的结果排行榜。

代码展示

首先依次开启Zookeeper、HDFS、HA集群、Kafka、Flume、Storm。并且可以进入Storm的UI界面来测试Storm是否开启成功,其他的集群开启这里不予以赘述。

在这里插入图片描述
系统开启完毕后,就可以运行Client.jar包,该JAR包的作用是负责从数据集中提取数据,整合成数据包,并且随机间隔一定时间向5640端口发送该数据包。这样就可以模拟真实的数据产生,并且被Flume抓住该数据包,发送到Kafka集群中。

在这里插入图片描述
如果一切顺利的话,可以开启一个消费者,查看topictest话题,能看到数据正常被发送到Kafka集群中。如下图所示。
在这里插入图片描述
项目代码写好后,打包成Storm.jar包,在Centos01中运行,成功运行后如下图所示。

在这里插入图片描述
每间隔一段时间,都会打印出排行榜信息,如下图所示。可以看到随着时间的增加,排行榜是在根据数据不断变化的,不断输出排名前20的关键词排行信息,可以看到关键词以及其出现次数统计。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

代码

KafkaSpout

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @author: 冰冷灬泡面
 * @date: 2021/5/7 13:31
 * @description:
 * @modifiedBy:
 */
public class KafkaSpout extends BaseRichSpout {
    private static final long serialVersionUID = 7582771881226024741L;
    private KafkaConsumer<String, String> consumer;
    SpoutOutputCollector collector;


    /*---------------设置Kafka配置-------------*/
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        //Kafka属性信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "centos01:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化消费者,设置消费主题
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("topictest"));
    }
    /*------------从Kafka集群中获取数据-------------*/
    public void nextTuple() {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            String key = record.key();
            String value = record.value();
            //其实KEY值是没有的,Value值才是有的
            //所以这里吧value的值赋给了Key
            key = value;
            System.out.println("key " + key);
//            System.out.println("value " + value);

            collector.emit(new Values(key));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("key"));
    }
}

SplitDataBolt

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author: 冰冷灬泡面
 * @date: 2021/5/7 13:23
 * @description:
 * @modifiedBy:
 */
public class SplitDataBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private OutputCollector outputcollector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputcollector = outputCollector;
    }

    /*--------------对接收到的Tuple进行处理--------------------*/
    @Override
    public void execute(Tuple tuple) {
        String key =tuple.getStringByField("key");
        //分割数据
        String[] words =key.split("\t");
        //提取关键词字段
        String keyword = words[2].trim();
        Integer cnt = 1;
        //发送给下一bolt
        this.outputcollector.emit(new Values(keyword, cnt));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("keyword", "cnt"));
    }
}

WordCountBolt

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: 冰冷灬泡面
 * @date: 2021/5/7 13:23
 * @description:
 * @modifiedBy:
 */
public class WordCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = 2374950653902413273L;
    private OutputCollector outputcollector;


    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputcollector = outputCollector;
        this.keywordMap = new HashMap<String, Integer>();
    }



    /*----------存储、统计关键词和出现次数------------*/
    //定义存放单词与词频的Map
    private HashMap<String, Integer> keywordMap = null;
    @Override
    public void execute(Tuple tuple) {
        String keyword = tuple.getStringByField("keyword");
        int cnt = tuple.getIntegerByField("cnt");
        Integer sum = keywordMap.get(keyword);
        if (sum == null) {
            sum = 0;
        }
        sum++;
        keywordMap.put(keyword, sum);
        this.outputcollector.emit(new Values(keyword, sum));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("keyword", "sum"));
    }
}

ReportBolt


import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.*;
import java.util.Map.Entry;

/**
 * @author: 冰冷灬泡面
 * @date: 2021/5/7 13:23
 * @description:
 * @modifiedBy:
 */
public class ReportBolt extends BaseRichBolt {
    private static final long serialVersionUID = -1512537746316594950L;


    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.keywordMap = new HashMap<String, Integer>();
    }

    /*-------------对存储的数据进行排序,并且输出出现次数最多的前20位关键词------------*/
    private HashMap<String, Integer> keywordMap = null;
    @Override
    public void execute(Tuple tuple) {
        String keyword = tuple.getStringByField("keyword");
        int count = tuple.getIntegerByField("sum");
        keywordMap.put(keyword, count);
        List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(keywordMap.entrySet());
        Collections.sort(list, new Comparator<Entry<String, Integer>>() {
            //升序排列
            @Override
            public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
                return (o2.getValue() - o1.getValue());
            }
        });
        //输出排名前20的数据
        int n = list.size() <= 20 ? list.size() : 20;
        String result = "";
        for (int i = 0; i < n; i++) {
            Entry<String, Integer> entry = list.get(i);
            String sortWord = entry.getKey();
            Integer sortCount = entry.getValue();
            result += sortWord + " ------- " + sortCount +"\n";
        }
        System.out.println("--------(关键词)--------搜索关键词排行榜--------(搜索次数)--------");
        System.out.println(result);

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

StormTopology

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author: 冰冷灬泡面
 * @date: 2021/5/7 14:25
 * @description:
 * @modifiedBy:
 */
public class StormTopology {
    public static void main(String[] args) throws Exception {
        KafkaSpout kafkaSpout = new KafkaSpout();
        SplitDataBolt splitDataBolt = new SplitDataBolt();
        WordCountBolt wordCountBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        //创建一个拓扑
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //TODO 这些命名还有fields的参数可能会出问题,可以看看
        //设置Spout,名称为"kafka-spout",并行度为2(也就是线程数),任务数为4(也就是实例数)。默认是1个线程,1个任务。  如果不指定Task数量,则一个线程执行一个Task,Task数量与线程数量一样。
        topologyBuilder.setSpout("kafka-spout", kafkaSpout,2).setNumTasks(4);
        //设置bolt,名称为"split-bolt",数据来源是名称为"sentence-spout"的spout,
        //ShuffleGrouping:随机选择一个Task来发送,对Task的分配比较均匀。
        topologyBuilder.setBolt("split-bolt", splitDataBolt,2).setNumTasks(4).shuffleGrouping("kafka-spout");
        //FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
        topologyBuilder.setBolt("count-bolt", wordCountBolt,2).setNumTasks(4).fieldsGrouping("split-bolt", new Fields("keyword"));
        //GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task,此时不管有多少个Task,只发往一个Task
        topologyBuilder.setBolt("report-bolt", reportBolt,2).setNumTasks(4).globalGrouping("count-bolt");

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        //本地模式 ,第一个参数为定义拓扑名称
//      cluster.submitTopology("word-count-topology", config, topologyBuilder.createTopology());
       /* Utils.sleep(5000);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();  */

        //集群模式,需要打包jar上传到集群,然后使用命令 :storm jar storm_demo-0.0.1-SNAPSHOT.jar com.zwy.storm.demo.wordcount.WordCountTopology
        config.setNumWorkers(2); //设置Worker进程数量
        config.setNumAckers(0);//设置acker并发数,0代表取消acker任务。Acker任务默认是每个worker进程启动一个executor线程来执行,该实例启动了2个worker,则默认会启动2个executor线程,2个acker任务
        StormSubmitter.submitTopology("keyword-rank-topology",config,topologyBuilder.createTopology());
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-13 11:48:46  更:2022-05-13 11:49:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:48:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码