部署StreamX
官网地址http://www.streamxhub.com/docs/intro
解压streamx安装包
tar -zxvf streamx-console-service-1.2.2-bin.tar.gz -C /opt/module
部署streamx平台
在mysql中创建数据库
mysql> create database streamx charset utf8 collate utf8_general_ci;
初始化表
use streamx;
source /opt/module/streamx-console-service-1.2.2/script/final.sql
配置连接信息vim streamx-console-service-1.2.2/conf/application.yml
启动server
/opt/module/streamx-console-service-1.2.2/bin/startup.sh
访问页面http://服务器:10000/ ,默认用户名admin,密码streamx
系统配置
配置Flink Home
集成Flink Stream
编写flink项目,并将代码推送到远程仓库
pom依赖和打包方式
<properties>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
编写一个最简单的测试,并流推送到远程代码仓库
配置项目
编译项目
第一次编译需要的时间比较久, 因为需要下载许多的依赖,编译成功会呈现上图的SUCCESSFUL绿色标签
提交应用
确认执行结果
集成Flink Sql
编写Flink Sql
create table s1 (
id string,
ts bigint,
vc int
) with(
'connector' = 'kafka',
'topic' = 's1',
'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',
'properties.group.id' = 'czs',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
create table s2 (
id string,
ts bigint,
vc int
) with(
'connector' = 'print'
);
insert into s2 select * from s1;
准备依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
|