IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Azure云平台 GPS大数据解决方案 EventHub+Azure Databricks+Azure Cosmos DB Cassandra -> 正文阅读

[大数据]Azure云平台 GPS大数据解决方案 EventHub+Azure Databricks+Azure Cosmos DB Cassandra

场景

有时候需要处理并存储大量流式数据,以Gps数据为例,面对一天千万级别的数据,时常要分库分表,数据偶尔会存在丢失的情况,很难做到既要保证大量数据处理的同时又要保证数据查询的实时性。这时借助云平台就是一个很好的做法。

数据流向

在这里插入图片描述

大致数据流向数据源->缓冲区->数据库处理中心->云数据库

所用Azure云平台介绍

1.Azure EventHub (云Kakfa)
Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。
2.Azure Databricks (云Spark)
是一个已针对 Microsoft Azure 云服务平台进行优化的数据分析平台。 Azure Databricks开发数据密集型应用程序的三个环境:Databricks SQL、Databricks Data Science Engineering 和 & Databricks 机器学习。Databricks SQL 为想要针对数据湖运行 SQL 查询、创建多种可视化类型以从不同角度探索查询结果,以及生成和共享仪表板的分析员提供了一个易于使用的平台。
3.Azure Cosmos DB Cassandra(云数据库)
Azure Cosmos DB Cassandra API 可以充当为 Apache Cassandra 编写的应用的数据存储。 这意味着通过使用现有的符合 CQLv4 的 Apache 驱动程序,现有 Cassandra 应用程序现在可以与 Azure Cosmos DB Cassandra API 通信。 在许多情况下,只需更改连接字符串,就可以从使用 Apache Cassandra 切换为使用 Azure Cosmos DB Cassandra API。通过 Cassandra API 可以使用 Cassandra 查询语言 (CQL)、基于 Cassandra 的工具(如 cqlsh)和熟悉的 Cassandra 客户端驱动程序与 Azure Cosmos DB 中存储的数据进行交互。

技术方案优势

1.高效实时,高吞吐(理论上只要预算够无论多大的数据都能处理)
2.只要部署好,根本不用担心自己搭的服务器会爆炸
3.减少各种环境搭建过程,减少人力机器维护成本
4.后续查询扩展快速方便,可以根据业务需求做出各种定制

具体讲解

  1. 数据发送部分
    数据发送部分主要将数据源发送至EventHub,那么这里以.net为例附上一部分发送代码,因为是在老项目上改的,用的是.net fk,所以没有太多的操作空间(不然不兼容),EventHub使用环境在.net fk4.6以上,大家记得更新框架,推荐使用.net core或者.net 5的BackGroundService。
    大概说一下思路,一个并发队列,4线程发送,而且做了分区,可以根据自身语言环境或者需要更改,最好自己写
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.AppCenter.Crashes;

namespace EHiGPSSocket.FormsApp
{
    public static class EventHub
    {
        private const string ConnectionString =
            "Your ConnectionString ";

        private const string EventHubName = "EventHubName";
        private const int Capacity = 500;
        private const int MacCapacity = 200000;
        private static EventHubProducerClient _producerClient = 
            new EventHubProducerClient(ConnectionString, EventHubName);
        private static ConcurrentDictionary<string, ConcurrentQueue<string>> DicQueues = new ConcurrentDictionary<string, ConcurrentQueue<string>>();

        static EventHub()
        {
            for (int i = 0; i < 4; i++)
            {
                Task.Factory.StartNew(SendAsync, CancellationToken.None, TaskCreationOptions.LongRunning);
            }
        }

        public static void Enqueue(string message, string topic)
        {
            if (!DicQueues.Keys.Contains(topic))
            {
                var queue = new ConcurrentQueue<string>();
                queue.Enqueue(message);
                DicQueues.TryAdd(topic, queue);
            }
            else
            {
                DicQueues.TryGetValue(topic, out var queue);
                if (queue?.Count < MacCapacity)
                {
                    queue?.Enqueue(message);
                }
            }
        }

        static async Task SendAsync(object state)
        {
            while (true)
            {
                try
                {
                    foreach (var dicQueue in DicQueues)
                    {
                        var queue = dicQueue.Value;
                        if (queue.Count > Capacity)
                        {
                            var count = 0;
                            var list = new List<EventData>();
                            while (count < Capacity && count < dicQueue.Value.Count)
                            {
                                queue.TryDequeue(out var data);
                                if (!string.IsNullOrEmpty(data))
                                {
                                    list.Add(new EventData(data));
                                    count++;
                                }
                            }
                            if (_producerClient.IsClosed)
                            {
                                _producerClient = new EventHubProducerClient(ConnectionString, EventHubName);
                            }
                            await _producerClient.SendAsync(list, new SendEventOptions() { PartitionKey = dicQueue.Key }).ConfigureAwait(false);
                           
                        }
                    }
                }
                catch (Exception e)
                {
                    Crashes.TrackError(e);
                }
            }
        }
    }
}

2.EventHub
数据发送至EventHub后,可以看到大致的数据量
在这里插入图片描述

3.Azure Databricks
Azure Databricks从EventHub读取数据
大致的读取代码如下Scala

def EventHandle(): Unit = {
  val namespaceName = "namespaceName "
  val eventHubName = "eventHubName "
  val sasKeyName = "sasKeyName "
  val sasKey = "sasKey "
  val domainName="domainName.chinacloudapi.cn"
  val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
      .setEndpoint(namespaceName,domainName)
      .setEventHubName(eventHubName)
      .setSasKeyName(sasKeyName)
      .setSasKey(sasKey)
  val customEventhubParameters =EventHubsConf(connStr.toString()).setMaxEventsPerTrigger(100)
  customEventhubParameters.setConsumerGroup("$Default")
  println(customEventhubParameters.consumerGroup)
  println(customEventhubParameters.connectionString)
  val carDF = spark.sql("select * from xxxx")
  val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
  val messages =incomingStream.withColumn("Body", $"body".cast(StringType)).withColumn("PartitionKey", $"PartitionKey".cast(StringType)).select("Body","PartitionKey")
  val query= messages.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long)=>
     batchDF.filter("PartitionKey='DBQ'")
     //这里处理DF
    .write.cassandraFormat("gpsdata", "gpsprofile").mode("append").save()
  }
  query.start().awaitTermination()
}

4.Azure Cosmos DB Cassandra
最后丢入Azure Cosmos DB Cassandra
在这里插入图片描述

总结

EventHub Azure Databricks Azure Cosmos DB Cassandra
这三个云平台中需要有很多可以配置的地方,大家参阅官方文档应该都可以解决,这里只 、提供一些思路和解决方案,具体操作与选型还是要看个人业务需求与技术选型

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-22 20:40:12  更:2022-02-22 20:40:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:01:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码