目的
? ??????不停的去获取当前最近一次访问的话,会发现数据是在不停改变,不停更新的,如果我们实时的把数据做一个输出去进行监控的话,就可以看到当前每一个用户实时的最近一次访问的变化
1、代码展示
package com.atguigu.chapter0905;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkToRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//因为是clickSource数据源,不会停止,无限循环
DataStreamSource<Event> stream = env.addSource(new ClickSource());
//创建一个jedis连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("10.0.81.155")
.setPort(6379)
.setDatabase(5)
.setPassword("sungrow2011")
.build();
//写入redis
stream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
//自定义类实现RedisMapper接口
public static class MyRedisMapper implements RedisMapper<Event>{
@Override
//getCommandDescription()方法主要是返回当前Redis操作命令的描述,
//我们当前想要做的操作是想要把当前每一个用户的访问事件这个数据写入到Redis保存
public RedisCommandDescription getCommandDescription() {
//希望把所有用户的访问信息都保存在一张表里面,接下来要操作的是一张hash表
//第一个参数是像Redis里面一张hash表去写入的命令,第二个参数是表的名称
return new RedisCommandDescription(RedisCommand.HSET,"clicks");
}
@Override
//针对每一个用户去写入,相当于同一个用户的数据要不停的更新
//当前的key是user
public String getKeyFromData(Event data) {
return data.user;
}
@Override
//还需要定义当前的data是什么
public String getValueFromData(Event data) {
//保存的是当前用户最近一次访问的页面地址
return data.url;
}
}
}
2、ClickSource数据源代码展示
package com.atguigu.chapter0905;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
//声明一个标志位,用来控制数据的发生
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
//循环生成数据
Random random = new Random();
//定义字段选取的数据集
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=100", "./prod?id=10"};
//循环生成数据
while (running) {
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = Calendar.getInstance().getTimeInMillis();
ctx.collect(new Event(user, url, timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
3、运行代码
数据源源不断的向redis传送
4、启动Redis集群
ssh 10.0.81.155
cd /soft/redis-3.2.11
启动redis客户端的命令:redis-cli -h 10.0.81.155 -p 6379 -a sungrow2011
选择第五个数据库:select? 5
?keys *:查看所有的数据
5、查看数据
?查看clicks这个topic下的数据:hgetall? clicks
?再次输入hgetall clicks,数据又会发生变化,不停的去获取当前最近一次访问的话,会发现数据是在不停改变,不停更新的,如果我们实时的把数据做一个输出去进行监控的话,就可以看到当前每一个用户实时的最近一次访问的变化
在可视化界面上查看,数据已经写进来
?
?
|