cat > zk.yaml <<EOF
apiVersion: v1
kind: Service
metadata:
labels:
app: zookeeper-service
name: zookeeper-service
spec:
ports:
- name: zookeeper-port
port: 2181
targetPort: 2181
selector:
app: zookeeper
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: zookeeper
name: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- image: wurstmeister/zookeeper
imagePullPolicy: IfNotPresent
name: zookeeper
ports:
- containerPort: 2181
EOF
cat > kafka.yaml <<EOF
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
app: kafka
spec:
type: NodePort
ports:
- port: 9092
name: kafka-port
targetPort: 9092
nodePort: 30092
protocol: TCP
selector:
app: kafka
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
value: "kafka-service" #[kafka的service的clusterIP]
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_BROKER_ID
value: "1"
EOF
[root@master240 asd]# kubectl get svc && kubectl get pods
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-service NodePort 10.106.153.152 <none> 9092:30092/TCP 11h
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 3d19h
zookeeper-service ClusterIP 10.96.240.113 <none> 2181/TCP 11h
NAME READY STATUS RESTARTS AGE
kafka-6d5d8884fc-qmq68 1/1 Running 0 11h
zookeeper-75fd7fdb96-6dh6f 1/1 Running 0 11h
kubectl exec -it [Kafka的pod名称] -- bash
[root@master240 asd]# kubectl exec -it kafka-6d5d8884fc-qmq68 -- bash
bash-5.1# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic jbjb
>11111111111111
>2222222222222
>
- 另外打开一个linux终端,执行相同的命令进入容器或者外部执行以下命令。这次将这个终端作为消费者。注意,上面的博客中写的创建消费者的方法在新版的Kafka中已经改变,需要执行下面的命令:
[root@master240 ~]# kubectl exec -it kafka-6d5d8884fc-qmq68 -- kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic jbjb --from-beginning
jbjbbbjb
11111111111111
2222222222222
#之后,在生产者输入信息,查看消费者是否能够接收到。如果接收到,说明运行成功
- 最后,还可以执行下面的命令以测试列出所有的消息主题:
[root@master240 ~]# kubectl exec -it kafka-6d5d8884fc-qmq68 -- kafka-topics.sh --list --zookeeper zookeeper-service:2181
__consumer_offsets
jbjb
test
#注意,有时需要用Kafka的端口,有时需要用Zookeeper的端口,应注意区分。
|