doris的编译
编译过程官网有,有时间会总结 需要编译的包可以私聊我 1.0.0版本
doris的配置
be在fe中可以手动添加/删除(sql语句执行)
fe的配置
ps:priority_networks = 192.168.0.149/16 记得修改对应的ip!
vi /opt/doris/fe/conf/fe.conf
LOG_DIR = ${DORIS_HOME}/log
DATE = `date +%Y%m%d-%H%M%S`
JAVA_OPTS="-Xmx8192m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:$DORIS_HOME/log/fe.gc.log.$DATE"
JAVA_OPTS_FOR_JDK_9="-Xmx8192m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xlog:gc*:$DORIS_HOME/log/fe.gc.log.$DATE:time"
sys_log_level = INFO
http_port = 8033
rpc_port = 9020
query_port = 9030
edit_log_port = 9010
mysql_service_nio_enabled = true
priority_networks = 192.168.0.149/16
meta_dir = /data/doris-meta
be的配置(多节点需要修改ip)
ps:priority_networks = 192.168.0.149/16 记得修改对应的ip!
PPROF_TMPDIR="$DORIS_HOME/log/"
sys_log_level = INFO
be_port = 9060
webserver_port = 8041
heartbeat_service_port = 9050
brpc_port = 8060
priority_networks = 192.168.0.149/16
storage_root_path = /data/disk1;
exec_mem_limit = 64G
streaming_load_max_mb=1024000
systemctl方式的启动
fe所在几点 be所在节点依次启动
如果加入systemctl服务里
fe的systemctl脚本
vi /usr/lib/systemd/system/fe.service
[Unit]
Description=cosmo-bdp doris-fe
After=network.target
[Service]
Type=forking
User=root
Group=root
LimitCORE=infinity
LimitNOFILE=1024000
LimitNPROC=1024000
ExecStart=/bin/bash -c "JAVA_HOME=/usr/local/java/ exec /opt/doris/fe/bin/start_fe.sh --daemon"
ExecStop=/bin/bash -c "JAVA_HOME=/usr/local/java/ exec /opt/doris/fe/bin/stop_fe.sh --daemon"
Restart=no
[Install]
WantedBy=multi-user.target
然后直接start、stop、status就可以
systemctl daemon-reload
systemctl start/stop/status fe
be的systemctl脚本
vi /usr/lib/systemd/system/be.service
[Unit]
Description=cosmo-bdp doris-be
After=network.target
[Service]
Type=forking
User=root
Group=root
LimitCORE=infinity
LimitNOFILE=1024000
LimitNPROC=1024000
ExecStart=/bin/bash -c "JAVA_HOME=/usr/local/java/ exec /opt/doris/be/bin/start_be.sh --daemon"
ExecStop=/bin/bash -c "JAVA_HOME=/usr/local/java/ exec /opt/doris/be/bin/stop_be.sh --daemon"
Restart=no
[Install]
WantedBy=multi-user.target
然后直接start、stop、status就可以
systemctl daemon-reload
systemctl start/stop/status be
shell方式的集群启动
fe的启动
/opt/doris/fe/bin/start_fe.sh --daemon
be的启动
/opt/doris/be/bin/start_be.sh --daemon
访问doris的web
192.168.0.149是fe所在节点; http://192.168.0.149:8030 用户:admin 密码:空
sql语句进行操作
交互方式操作
shell执行 -hvm-CentOS75-0-149 这里-hip地址
/usr/local/mysql/bin/mysql -hvm-CentOS75-0-149 -P 9030 -uroot
添加用户
添加test用户
mysql> create user 'test' identified by 'test';
Query OK, 0 rows affected (0.06 sec)
使用test用户登录
mysql -hvm-CentOS75-0-149 -P 9030 -utest -ptest
将be添加到fe
mysql> ALTER SYSTEM ADD BACKEND "vm-CentOS75-0-149:9050";
Query OK, 0 rows affected (0.05 sec)
mysql> ALTER SYSTEM ADD BACKEND "vm-CentOS75-0-150:9050";
Query OK, 0 rows affected (0.01 sec)
mysql> ALTER SYSTEM ADD BACKEND "vm-CentOS75-0-151:9050";
Query OK, 0 rows affected (0.00 sec)
mysql> SHOW PROC '/backends';
会显示active状态
web界面操作
创建表并插入数据
CREATE TABLE site_visit
(
siteid INT,
city SMALLINT,
username VARCHAR(32),
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, city, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10;insert into site_visit values(1,1,'name1',10);
insert into site_visit values(1,1,'name1',20);
导入数据
Stream Load方式:
http协议
curl --location-trusted -u root -H "label:123" -H"column_separator:," -T /opt/bdp/wy/dorisTest.csv -X PUT http://vm-CentOS75-0-149:8030/api/test_db/student_result/_stream_load
Broker Load方式:
(1)启动hdfs集群 (2)进入到hive创建student_tmp表,虽然官网提示说支持列式存储,但测试发现并不支持,会提示一下错误 所以在hive表里创建行式存储表
hive
create table student_tmp_h(
id int,
name string,
age int,
score decimal(10,4))
partitioned by (
`dt` string)
row format delimited fields terminated by '\t';
(3)插入数据
hive (default)> set hive.exec.dynamic.partition=true;
hive (default)> set hive.exec.dynamic.partition.mode=nonstrict;
insert into student_tmp_h values(1,'张三',11,99.8,20200908),(2,'李四',12,99.9,20200908),(3,'王五',13,100,20200908),
(4,'赵六',14,55.5,20200908),(5,'test1',13,66.5,20200908),(7,'test2',14,80,20200908),(8,'test3',19,75,20200908);
(4)修改各节点的hosts文件,添加
hadoop101,hadoop102,hadoop103
root@doris1:~
ip hostname
。。。。。。
。。。。。。
(5)各个节点将hadoop集群的配置文件复制到doris集群的broker上
scp hdfs-site.xml ip:/opt/software/
(6)使用mysql客户端登录doris创建对应表student_result
mysql -hdoris1 -P 9030 -uroot
mysql> use test_db;
create table student_result
(
id int ,
name varchar(50),
age int ,
score decimal(10,4),
dt varchar(20)
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
(7)编写导入语句,dt是分区列,在数据块读不到所以使用固定值
LOAD LABEL test_db.student_result_h_2
(
DATA INFILE("hdfs://mycluster/user/hive/warehouse/student_tmp_h/dt=20200908/*")
INTO TABLE student_result
COLUMNS TERMINATED BY "\t"
(co1,co2,co3,co4)
set(
id=co1,
name=co2,
age=co3,
score=co4,
dt='20200908'
)
)
WITH BROKER "broker_name"
(
"dfs.nameservices"="mycluster",
"dfs.ha.namenodes.mycluster"="nn1,nn2,nn3",
"dfs.namenode.rpc-address.mycluster.nn1"= "hadoop101:8020",
"dfs.namenode.rpc-address.mycluster.nn2"= "hadoop102:8020",
"dfs.namenode.rpc-address.mycluster.nn3"="hadoop103:8020",
"dfs.client.failover.proxy.provider.mycluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
(8)导入成功后,查询doris数据
Routine Load
例行导入功能为用户提供了一种自动从指定数据源进行数据导入的功能。 当前仅支持Kafka系统进行例行导入。 使用限制:
- 支持无认证的Kafka访问,以及通过SSL方式认证的Kafka集群
- 仅支持kafka0.10.0.0 及以上版本
先安装好zookeeper和kafka,创建topic,并往topic里灌一批数据 root@doris1:~# /opt/module/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper doris1:2181/kafka_2.4 --create --replication-factor 2 --partitions 3 --topic test (1)编写java生产者代码,王test topic灌一批测试数据
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "doris1:9092,doris2:9092,doris3:9092");
props.put("acks", "-1");
props.put("batch.size", "16384");
props.put("linger.ms", "10");
props.put("buffer.memory", "33554432");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100000; i++) {
producer.send(new ProducerRecord<String,String>("test2",i+"\tname"+i+"\t18"));
}
producer.flush();
producer.close();
}
}
(2)在doris中创建对应表
create table student_kafka
(
id int,
name varchar(50),
age int
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
(3)创建导入作业,desired_concurrent_number指定并行度
CREATE ROUTINE LOAD test_db.kafka_test ON student_kafka
PROPERTIES
(
"desired_concurrent_number"="3",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list"= "doris1:9092,doris2:9092,doris:9092",
"kafka_topic" = "test2",
"property.group.id"="test_group_2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.enable.auto.commit"="false"
);
(4)创建完作业导入作业后查询doris
|