IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka在ASP.Net Core上的应用 -> 正文阅读

[大数据]Kafka在ASP.Net Core上的应用

Kafka在ASP.Net Core上的应用**
ASP.Net Core中使用的最多的是Confluent.Kafka这个包,以下用实例展示应用
1.下载Nuget包
? 首先是下载Confluent.Kafka这个包
2.创建Producer消息生产者
发送者
public class KafkaProducer
{
public static async Task SendAsync(string topic, T value) where T: KafkaMessage
{
var config = new ProducerConfig { BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers };//服务器IP
ProducerBuilder<Null, string> producerBuilder = new ProducerBuilder<Null, string>(config);
using (var p = producerBuilder.Build())
{
try
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = JsonConvert.SerializeObject(value) });
Console.WriteLine(KaTeX parse error: Expected 'EOF', got '}' at position 75: … }? …“Delivery failed: {e.Error.Reason}”);
}
}
}
}
其中要注意的一点ProducerBuilder<TKey,TValue>中的TValue类型只能是Confluent.Kafka.Null, int, long, string, float, double, byte[]. 这7种类型, 否则在调用producerBuilder.Build()时会抛出 ArgumentNullException(Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name})

消息体中包含你的消息必须的内容
public class KafkaMessage
{
}

3.创建Consumer消息消费者
public class KafkaConsumer where T : KafkaMessage
{
public string Topic { get; set; }
public string ConsumerGroup { get; set; }

    public void Subscribe(Action<T> dealMessage)
    {
        var config = new ConsumerConfig
        {
            GroupId = ConsumerGroup,
            BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers,
            AutoOffsetReset = AutoOffsetReset.Latest
        };
        Task.Run(() =>
       {
           var builder = new ConsumerBuilder<string, string>(config);
           using (var consumer = builder.Build())
           {
               consumer.Subscribe(Topic);
               while (true)
               {
                   var result = consumer.Consume();
                   try
                   {
                       var message = JsonConvert.DeserializeObject<T>(result.Value);
                       dealMessage(message);
                   }
                   catch (Exception)
                   {
                       Console.WriteLine($"Topic : {result.Topic}, Message : {result.Value}");
                   }
               }
           }
       });
    }
}

子消费类
interface ITestKafkaConsumer
{
void DealMessage(TestKafkaEntity message);
void Subscribe();
}

public class TestKafkaConsumer : ITestKafkaConsumer
{

    private KafkaConsumer<TestKafkaEntity> consumer { get; set; }

    public TestKafkaConsumer()
    {
        consumer = new KafkaConsumer<TestKafkaEntity>
        {
            Topic = "test",
            ConsumerGroup = "console-consumer-63873",
        };

    }
    public void DealMessage(TestKafkaEntity message)
    {
        Console.WriteLine("-------------------------------------------------------------");
        Console.WriteLine("这是一个消费者!!!" + message.ConsumerValue);
        Console.WriteLine("-------------------------------------------------------------");
    }

    public void Subscribe()
    {
        consumer.Subscribe(DealMessage);
    }
}

通过回调方法的方式, 将子消息类中的方法传入总消息类中
4.注入消费者
在Startup.cs类中的ConfigureServices方法中注入子消费类:

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<ITestKafkaConsumer, TestKafkaConsumer>();
}

然后在Program.cs类中的Main方法启动消费者:
public static void Main(string[] args)
{
var hostBuilder = CreateHostBuilder(args);
var host = hostBuilder.Build();
using (var scope = host.Services.CreateScope())
{
var testConsumer = scope.ServiceProvider.GetService();
testConsumer.Subscribe();
}
host.Run(); ;
}

以上就是kafka在ASP.Net Core中的简单实现

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 15:11:21  更:2021-08-20 15:12:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:05:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码