IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 嵌入式 -> MQTT 在 Elixir 中的应用 -> 正文阅读

[嵌入式]MQTT 在 Elixir 中的应用

简介

最近读到一本好书,书名是《通过 Elixir 和 Nerves 搭建气象站》, 书中介绍了如何引入 Elixir 作为构建嵌入式应用程序的工具。

通过使用 Nerves,我们可以在支持网络的设备上运行 Elixir 代码,并且与一些控制软件交互。

上面提到的书主要关注点在 Nerves,使用 HTTP 协议进行网络交互。 尽管在许多情况下这是一个合理的选择,但我想介绍另一个广泛用于生产物联网 (IoT) 设置的选择:MQTT。

MQTT 协议

MQTT 是一种专为物联网 (IoT) 设备通信而设计的消息传输协议。 它广泛应用于许多领域,例如银行、石油和天然气、制造业等。

MQTT 协议有很多优点,部分如下所示:

  • 它是一种轻量级的二进制协议,通常在 TCP/IP 协议之上运行。
  • 它专为网络不可靠的场景设计,是户外安装的理想选择。
  • 它遵循发布/订阅模式,简化客户端逻辑。

我们将在设置中演示 MQTT 的一些优势。

MQTT Broker

MQTT 的一个重要特征是它简化了客户端逻辑,这对于嵌入式设备至关重要。 这是通过发布/订阅模式实现的:在 MQTT 中,没有“服务器”的概念。 相反,所有参与实体都是连接到所谓 broker 的客户端。 客户端订阅主题向它们发布消息,broker 进行路由(以及许多其他事情)。

一个好的用于生产的 broker,如 EMQ X,通常不仅提供 MQTT 路由功能,还提供许多其他有趣的功能,例如

  • 其他类型的连接方法,如 WebSockets;
  • 不同的认证和授权模式;
  • 将数据流传输到数据库;
  • 基于消息特征的自定义路由规则。

传感器设置

为简单起见,我们的设备将由一个普通的 Mix 应用程序表示:它可以轻松转换为 Nerves 应用程序。

首先,我们创建一个 Mix 项目:

mix new --sup weather_sensor
cd weather_sensor

为了与 MQTT broker 交互,我们需要一个 MQTT 客户端。 我们采用 emqtt。 将其添加到 mix.exs 作为依赖项:

defp deps do
  [
    {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]}
  ]
end

我们将把所有的“传感器”代码放到主模块 WeatherSensor 中,所以我们需要将它添加到应用程序管理器 lib/weather_sensor/application.ex 中:

defmodule WeatherSensor.Application do
  @moduledoc false

  use Application

  @impl true
  def start(_type, _args) do
    children = [
      WeatherSensor
    ]

    opts = [strategy: :one_for_one, name: WeatherSensor.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

现在,让我们在 lib/weather_sensor.ex 中实现主模块:

defmodule WeatherSensor do
  @moduledoc false

  use GenServer

  def start_link([]) do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    interval = Application.get_env(:weather_sensor, :interval)
    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    report_topic = "reports/#{emqtt_opts[:clientid]}/temperature"
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    st = %{
      interval: interval,
      timer: nil,
      report_topic: report_topic,
      pid: pid
    }

    {:ok, set_timer(st), {:continue, :start_emqtt}}
  end

  def handle_continue(:start_emqtt, %{pid: pid} = st) do
    {:ok, _} = :emqtt.connect(pid)

    emqtt_opts = Application.get_env(:weather_sensor, :emqtt)
    clientid = emqtt_opts[:clientid]
    {:ok, _, _} = :emqtt.subscribe(pid, {"commands/#{clientid}/set_interval", 1})
    {:noreply, st}
  end

  def handle_info(:tick, %{report_topic: topic, pid: pid} = st) do
    report_temperature(pid, topic)
    {:noreply, set_timer(st)}
  end

  def handle_info({:publish, publish}, st) do
    handle_publish(parse_topic(publish), publish, st)
  end

  defp handle_publish(["commands", _, "set_interval"], %{payload: payload}, st) do
    new_st = %{st | interval: String.to_integer(payload)}
    {:noreply, set_timer(new_st)}
  end

  defp handle_publish(_, _, st) do
    {:noreply, st}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp set_timer(st) do
    if st.timer do
      Process.cancel_timer(st.timer)
    end
    timer = Process.send_after(self(), :tick, st.interval)
    %{st | timer: timer}
  end

  defp report_temperature(pid, topic) do
    temperature = 10.0 + 2.0 * :rand.normal()
    message = {System.system_time(:millisecond), temperature}
    payload = :erlang.term_to_binary(message)
    :emqtt.publish(pid, topic, payload)
  end
end

并在 config/config.exs 中添加一些选项:

import Config

config :weather_sensor, :emqtt,
  host: '127.0.0.1',
  port: 1883,
  clientid: "weather_sensor",
  clean_start: false,
  name: :emqtt

config :weather_sensor, :interval, 1000

让我们总结一下 WeatherSensor 中发生的事情:

  • 它实现了GenServer 行为。
  • 启动时,它有如下动作:
    • 打开一个 MQTT 连接;
    • 订阅 commands/weather_sensor/set_interval 主题以接收命令,将接收到的数据将通过 :emqtt 发送到进程,作为 {:publish, publish }消息。
    • 以预定义的时间间隔设置计时器。
  • 在定时器超时时,它发布 {Timestamp, Temperature} 元组到 reports/weather_sensor/temperature 主题。
  • 在收到来自 commands/weather_sensor/set_interval 主题的消息时,它会更新计时器间隔。

由于我们的应用程序不是真正的 Nerves 应用程序,它连接了 BMP280 之类的传感器,因此我们生成温度数据。

在这里我们已经可以看到相对于 HTTP 交互的一个优势:我们不仅可以发送数据,还可以实时接收一些命令。

我们需要一个 broker 来运行节点; 我们稍后会开始。

控制台设置

由于 MQTT 中没有“服务器”,因此我们的控制台也将是一个 MQTT 客户端。 但它会订阅reports/weather_sensor/temperature主题和发布命令到 commands/weather_sensor/set_interval。

对于控制台,我们将设置 Phoenix LiveView 应用程序。

创建过程如下:

mix phx.new --version
Phoenix installer v1.6.2
mix phx.new weather_dashboard --no-ecto --no-gettext --no-dashboard --live
cd weather_dashboard

向 mix.exs 添加依赖项

  defp deps do
    [
      ...
      {:jason, "~> 1.2"},
      {:plug_cowboy, "~> 2.5"},

      {:emqtt, github: "emqx/emqtt", tag: "1.4.4", system_env: [{"BUILD_WITHOUT_QUIC", "1"}]},
      {:contex, github: "mindok/contex"} # We will need this for SVG charts
    ]
  end

向 config/dev.exs 添加一些设置:

config :weather_dashboard, :emqtt,
  host: '127.0.0.1',
  port: 1883

config :weather_dashboard, :sensor_id, "weather_sensor"

# Period for chart
config :weather_dashboard, :timespan, 60

现在我们生成一个 LiveView 控制器:

mix phx.gen.live Measurements Temperature temperatures  --no-schema --no-context

这会生成很多文件,但并非都是必需的,我们需要的是一个带有图表的单页应用程序。

rm lib/weather_dashboard_web/live/temperature_live/form_component.*
rm lib/weather_dashboard_web/live/temperature_live/show.*
rm lib/weather_dashboard_web/live/live_helpers.ex

同时从lib/weather_dashboard_web.ex 中删除import WeatherDashboardWeb.LiveHelpers。

更新我们页面的模板(lib/weather_dashboard_web/live/temperature_live/index.html.heex):

<div>
  <%= if @plot do %>
    <%= @plot %>
  <% end %>
</div>

<div>
  <form phx-submit="set-interval">
    <label for="interval">Interval</label>
    <input type="text" name="interval" value={@interval}/>
    <input type="submit" value="Set interval"/>
  </form>
</div>

我们有一个图表和输入控件,用于向此页面上的“设备”发送命令。

现在更新主要部分 LiveView 控制器(lib/weather_dashboard_web/live/temperature_live/index.ex):

defmodule WeatherDashboardWeb.TemperatureLive.Index do
  use WeatherDashboardWeb, :live_view

  require Logger

  @impl true
  def mount(_params, _session, socket) do
    reports = []
    emqtt_opts = Application.get_env(:weather_dashboard, :emqtt)
    {:ok, pid} = :emqtt.start_link(emqtt_opts)
    {:ok, _} = :emqtt.connect(pid)
    # Listen reports
    {:ok, _, _} = :emqtt.subscribe(pid, "reports/#")
    {:ok, assign(socket,
      reports: reports,
      pid: pid,
      plot: nil,
      interval: nil
    )}
  end

  @impl true
  def handle_params(_params, _url, socket) do
    {:noreply, socket}
  end

  @impl true
  def handle_event("set-interval", %{"interval" => interval_s}, socket) do
    case Integer.parse(interval_s) do
      {interval, ""} ->
        id = Application.get_env(:weather_dashboard, :sensor_id)
        # Send command to device
        topic = "commands/#{id}/set_interval"
        :ok = :emqtt.publish(
          socket.assigns[:pid],
          topic,
          interval_s,
          retain: true
        )
        {:noreply, assign(socket, interval: interval)}
      _ ->
        {:noreply, socket}
    end
  end

  def handle_event(name, data, socket) do
    Logger.info("handle_event: #{inspect([name, data])}")
    {:noreply, socket}
  end

  @impl true
  def handle_info({:publish, packet}, socket) do
    handle_publish(parse_topic(packet), packet, socket)
  end

  defp handle_publish(["reports", id, "temperature"], %{payload: payload}, socket) do
    if id == Application.get_env(:weather_dashboard, :sensor_id) do
      report = :erlang.binary_to_term(payload)
      {reports, plot} = update_reports(report, socket)
      {:noreply, assign(socket, reports: reports, plot: plot)}
    else
      {:noreply, socket}
    end
  end

  defp update_reports({ts, val}, socket) do
    new_report = {DateTime.from_unix!(ts, :millisecond), val}
    now = DateTime.utc_now()
    deadline = DateTime.add(DateTime.utc_now(), - 2 * Application.get_env(:weather_dashboard, :timespan), :second)
    reports =
      [new_report | socket.assigns[:reports]]
      |> Enum.filter(fn {dt, _} -> DateTime.compare(dt, deadline) == :gt end)
      |> Enum.sort()

    {reports, plot(reports, deadline, now)}
  end

  defp parse_topic(%{topic: topic}) do
    String.split(topic, "/", trim: true)
  end

  defp plot(reports, deadline, now) do
    x_scale =
      Contex.TimeScale.new()
      |> Contex.TimeScale.domain(deadline, now)
      |> Contex.TimeScale.interval_count(10)

    y_scale =
      Contex.ContinuousLinearScale.new()
      |> Contex.ContinuousLinearScale.domain(0, 30)

    options = [
      smoothed: false,
      custom_x_scale: x_scale,
      custom_y_scale: y_scale,
      custom_x_formatter: &x_formatter/1,
      axis_label_rotation: 45
    ]

    reports
    |> Enum.map(fn {dt, val} -> [dt, val] end)
    |> Contex.Dataset.new()
    |> Contex.Plot.new(Contex.LinePlot, 600, 250, options)
    |> Contex.Plot.to_svg()
  end

  defp x_formatter(datetime) do
    datetime
    |> Calendar.strftime("%H:%M:%S")
  end

end

特别说明如下:

  • 我们创建了一个 LiveView 处理程序来为我们的应用程序的主页提供服务。
  • 通常,Phoenix.PubSub 用于更新 LiveView 进程状态。 但是,我们做了一个特殊设置:由于 MQTT broker 已经提供了一个发布订阅模式,我们直接从 LiveView 进程连接到它。
  • 收到新的温度数据后,服务器更新温度图表。
  • 收到用户的表单更新后,我们会向命令主题发送更新的时间间隔。

最后,在lib/weather_dashboard_web/router.ex 中设置路由,以便我们的控制器能够处理根页面:

  scope "/", WeatherDashboardWeb do
    pipe_through :browser

    live "/", TemperatureLive.Index
  end

模块集成

现在,我们已准备设置并运行所有内容。

我们运行一个 MQTT broker。 由于我们不需要任何特定的设置,最简单的方法是使用 docker 运行代 broker。

docker run -d --name emqx -p 1883:1883 emqx/emqx:4.3.10

现在运行我们的“设备”:

cd weather_sensor
export BUILD_WITHOUT_QUIC=1
iex -S mix
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

....

13:17:24.461 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/commands/weather_sensor/set_interval", %{nl: 0, qos: 1, rap: 0, rh: 0}}]}, :undefined}

13:17:24.463 [debug] emqtt(weather_sensor): RECV Data: <<144, 3, 0, 2, 1>>

13:17:25.427 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 179, 156, 178, 158, 125, 1, 70, 64, 38, 106, 91, 64, 234, 212, 185>>}

13:17:26.428 [debug] emqtt(weather_sensor): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 3, false, 0, false}, {:mqtt_packet_publish, "/reports/weather_sensor/temperature", :undefined, :undefined}, <<131, 104, 2, 110, 6, 0, 156, 160, 178, 158, 125, 1, 70, 64, 39, 115, 221, 187, 144, 192, 31>>}
...

我们看到我们的传感器立即开始发送报告。

现在运行我们的控制台:

cd weather_dashboard
export BUILD_WITHOUT_QUIC=1
iex -S mix phx.server
Erlang/OTP 24 [erts-12.1.2] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [jit] [dtrace]

[info] Running WeatherDashboardWeb.Endpoint with cowboy 2.9.0 at 127.0.0.1:4000 (http)
[info] Access WeatherDashboardWeb.Endpoint at http://localhost:4000
Interactive Elixir (1.12.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> [watch] build finished, watching for changes...

让我们导航到 http://localhost:4000

我们看到相应的 LiveView 进程挂载,连接到代理,并开始接收温度数据:

[info] GET /
[info] Sent 200 in 145ms
[info] CONNECTED TO Phoenix.LiveView.Socket in 129μs
  Transport: :websocket
  Serializer: Phoenix.Socket.V2.JSONSerializer
  Parameters: %{"_csrf_token" => "cwoROxAwKFo7NEcSdgMwFlgaZ1AlBxUa6FIRhAbjHA6XORIF-EUiIRqU", "_mounts" => "0", "_track_static" => %{"0" => "http://localhost:4000/assets/app.css", "1" => "http://localhost:4000/assets/app.js"}, "vsn" => "2.0.0"}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 1, false, 0, false}, {:mqtt_packet_connect, "MQTT", 4, false, true, false, 0, false, 60, %{}, "emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130", :undefined, :undefined, :undefined, :undefined, :undefined}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<32, 2, 0, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): SEND Data: {:mqtt_packet, {:mqtt_packet_header, 8, false, 1, false}, {:mqtt_packet_subscribe, 2, %{}, [{"/reports/#", %{nl: 0, qos: 0, rap: 0, rh: 0}}]}, :undefined}
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<144, 3, 0, 2, 0>>
[debug] emqtt(emqtt-MacBook-Pro-iaveryanov-86405372ddbf17052130): RECV Data: <<48, 58, 0, 35, 47, 114, 101, 112, 111, 114, 116, 115, 47, 119, 101, 97, 116,
  104, 101, 114, 95, 115, 101, 110, 115, 111, 114, 47, 116, 101, 109, 112, 101,
  114, 97, 116, 117, 114, 101, 131, 104, 2, 110, 6, 0, 180, 251, 188, 158, 125,
...

此外,该页面立即开始更新:

Phoenix

如果我们更新间隔,我们看到设备节点立即收到命令并开始更频繁地更新:

Phoenix

现在我们演示一件重要的事情:让我们停止我们的“设备”节点,稍等片刻,然后重新启动它。 我们看到节点继续以更新的频率发送数据。

Phoenix

怎么会这样? 其实很简单,秘诀就在于我们发送到命令主题的命令消息的 retain 标志。

:ok = :emqtt.publish(
  socket.assigns[:pid],
  topic,
  interval_s,
  retain: true
)

当我们向主题发送带有 retain 标志的消息时,该消息也成为“默认”消息,并保留在 broker 上。该主题的每个订阅者都会在订阅时收到此消息。

对于可能经常离线且没有任何易于使用的本地存储来保持其状态的嵌入式设备,此功能非常重要。 这是在连接时正确配置它们的方法。

结论

这篇文章介绍了如下内容:

  • 展示了一种与嵌入式设备交互的流行方式——MQTT 协议;
  • 我们介绍了它在 Elixir 中的用法;
  • 我们还展示了 MQTT 的一些优势,例如发布订阅模式和消息保留。

即使在简单的设置中,我们也可能想要使用的强大功能是:

  • 将主题数据流传输到数据库中,这样我们可以显示连接历史,无需“手动”保存;
  • 使用 MQTT.js 通过 WebSockets 从前端直接连接到 broker。

所有代码都可以在 https://github.com/savonarola/mqtt-article 上查阅。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/mqtt-for-elixir

  嵌入式 最新文章
基于高精度单片机开发红外测温仪方案
89C51单片机与DAC0832
基于51单片机宠物自动投料喂食器控制系统仿
《痞子衡嵌入式半月刊》 第 68 期
多思计组实验实验七 简单模型机实验
CSC7720
启明智显分享| ESP32学习笔记参考--PWM(脉冲
STM32初探
STM32 总结
【STM32】CubeMX例程四---定时器中断(附工
上一篇文章      下一篇文章      查看所有文章
加:2022-02-01 20:46:36  更:2022-02-01 20:48:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 3:39:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码