IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Golang操作kafka -> 正文阅读

[大数据]Golang操作kafka

kafka cluster: kafka集群,一台或多台服务器组成
????????Broker: Broker是指部署了Kafka实例的服务器节点。每个服务器上有一个或多个kafka的实
例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等......
????????Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。实际应用中通常是一个业务线建一个topic。
????????Partition: Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
????????Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的
时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。

consumer:消费者,即消息的消费方,是消息的出口。
????????Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分
区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
工作流程
我们看上面的架构图中,producer就是生产者,是数据的入口。Producer在写入数据的时候会把数据写入到leader中,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:


1.Kafka集群的架构
????????1.broker: 节点
????????2.topic: 消息主题
????????3.partition: 分区,把同一个topic分成不同的分区,提高负载
????????????????1.leader: 分区的主节点(老大)
????????????????2. follower: 分区的从节点(小弟)
????????4.Consumer Group
2.生产者往Kafka发送数据的流程(6步)
????????1. 生产者从Kafka集群获取分区leader信息
????????2. 生产者将消息发送给leader
????????3. leader将消息写入本地磁盘
????????4. follower从leader拉取消息数据
????????5. follower将消息写入本地磁盘后向leader发送ACK
????????6. leader收到所有的follower的ACK之后向生产者发送ACK
3.Kafka选择分区的模式(3种)
????????1.指定往哪个分区写
????????2.指定key,kafka根据key做hash然后决定写哪个分区
????????3.轮询方式
4.生产者往kafka发送数据的模式(3种)
????????1. 0?:把数据发给leader就成功,效率最高、安全性最低。
????????2. 1?:把数据发送给leader,等待leader回ACK
????????3. all?:把数据发给leader,确保follower从leader拉取数据回复ack给leader,leader再回复ACK,安全性最高

选择partition的原则

????????那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
????????1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。? ? ? ??
? ? ? ? 2.如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
????????3.如果既没指定partition,又没有设置key,则会采用轮询方式,即每次取一小段时间的数据写入某个partition,下一小段的时间写入下一个partition。

下载并启动kafka,需先配置jdk环境

??下载好kafka后修改zookeeper.properties和server.properties两个配置文件的日志生成路径?

启动?

linux和mac执行 bin下的kafka-server-start.sh和zookeeper-server-start.sh

windows以管理员身份打开cmd终端执行bin下的windows下的kafka-server-start.bat和zookeeper-server-start.bat

?运行并指定上面修改的配置文件,先启动zookeeper?

LogAgent的工作流程:

1.读日志 --tailf第三方库
2.往kafka写日志

1.读日志,使用tailf库

go get github.com/hpcloud/tail

tailf文档代码示范

package main

import (
   "fmt"
   "github.com/hpcloud/tail"
   "time"
)

func main() {
   filename := "./my.log"
   config := tail.Config{
      ReOpen:    true,  //重新打开
      Follow:    true, // 是否跟随
      Location:    &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
      MustExist: false,              //文件不存在不报错
      Poll:      true,          //
   }
   tails,err := tail.TailFile(filename,config)
   if err != nil {
      fmt.Println("tail file failed, err",err)
      return
   }
   var (
      line *tail.Line
      ok bool
   )
   for {
      line,ok = <- tails.Lines
      if !ok {
         fmt.Printf("tail file close reopen, filename:%s\n",tails.Filename)
         time.Sleep(time.Second)
         continue
      }
      fmt.Println("line:",line.Text)
   }
}

????????log agent开发

????????sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会提示类似如下错误:

github.com/ DataDog/zstd
exec: "gcc" :executable file not found in %PATH%

????????所以在Windows平台请使用v1.19版本的sarama。
????????在go.mod中写入

module kafka/kafka_demo //创建的包名 根据自己需求更改

go 1.17

require (                          
   github.com/Shopify/sarama v1.19  //windows指定版本  mac,linux不用  
                                    
)

? ? ? ? ?然后打开在终端执行go mod download下载,linux,mac可直接go get github.com/Shopify/sarama下载。

go get github.com/Shopify/sarama

????????文档代码示范

package main

import (
   "fmt"
   "github.com/Shopify/sarama"
)
//基于sarama第三方库开发的kafka client
func main() {
   config := sarama.NewConfig()
   //tailf包使用
   config.Producer.RequiredAcks = sarama.WaitForAll //Producer生产者    发送完数据需要leader和follow都确认
   config.Producer.Partitioner = sarama.NewRandomPartitioner //Partitioner分区   新选出一个partition
   config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
   //构造消息
   msg := &sarama.ProducerMessage{}
   msg.Topic = "web_log"
   msg.Value = sarama.StringEncoder("this is a test log")
   //链接kafka
   client , err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)
   if err != nil {
      fmt.Println("producer closed ,err:",err)
      return
   }
   fmt.Println("链接kafka成功!")
   defer client.Close()
   //发送消息
   pid, offset ,err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send msg failed, err:",err)
      return
   }
   fmt.Printf("pid:%v, offset:%v\n",pid, offset)

}

执行会在kafka日志配置文件生成web_log文件夹


?

设计LogAgent

目录结构

?函数入口src/logagent/main.go

package main

import (
	"fmt"
	"logagent/kafka"
	"logagent/taillog"
	"time"

)

//logAgent程序入口
func run() {
	//1.读取日志
	for {
		select {
		case line := <- taillog.ReadChan():
			//2. 发送到kafka
			kafka.SendToKafka("web_log", line.Text) //line.Text
		default:
			time.Sleep(time.Second)
		}
	}
}

func main() {
	//1.初始化kafka链接
	err := kafka.Init([]string{"127.0.0.1:9092"})
	if err != nil {
		fmt.Printf("Init Kafka failed, err:%v", err)
		return
	}
	fmt.Println("Init kafka success")

	//2.打开日志准备收集日志
	err = taillog.Init("./my.log")
	if err != nil {
		fmt.Printf("Init taillog failed ,err: %v", err)
		return
	}
	fmt.Println("Init taillog success...")
	run()
}

收集日志模块src/logagent/taillog.go

package taillog

import (
	"fmt"
	"github.com/hpcloud/tail"

)

//专门从日志收集日志的模块
var (
	tailObj *tail.Tail
	LogChan chan string
)


func Init(fileName string) (err error) {
	filename := "./my.log"
	config := tail.Config{
		ReOpen:    true,  //重新打开
		Follow:    true, // 是否跟随
		Location:    &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
		MustExist: false,              //文件不存在不报错
		Poll:      true,          //
	}
	tailObj,err = tail.TailFile(filename,config)
	if err != nil {
		fmt.Println("tail file failed, err",err)
		return
	}
	return
}

func ReadChan() <- chan *tail.Line {
	return tailObj.Lines
}
往kafka写日志的模块src/logagent/kafka.go
package kafka

import (
	"fmt"
	"github.com/Shopify/sarama"
)

//专门往kafka写日志的模块

var (
	client sarama.SyncProducer //声明一个全局的连接kafka的生产者client
)

func Init(addrs []string) (err error) {
	config := sarama.NewConfig()
	//tailf包使用
	config.Producer.RequiredAcks = sarama.WaitForAll //Producer生产者    发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //Partitioner分区   新选出一个partition
	config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
	//链接kafka
	client , err = sarama.NewSyncProducer(addrs ,config)
	if err != nil {
		fmt.Println("producer closed ,err:",err)
		return
	}
	fmt.Println("链接kafka成功!")
	return
}

func SendToKafka(topic, data string) {
	//构造消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic //"web_log"
	msg.Value = sarama.StringEncoder(data) //"this is a test log"
	//发送消息
	pid, offset ,err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed, err:",err)
		return
	}
	fmt.Printf("pid:%v, offset:%v\n",pid, offset)
}

?


?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-12 19:39:55  更:2021-11-12 19:42:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 0:29:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码