IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python rabbitmq demo -> 正文阅读

[Python知识库]python rabbitmq demo

#!/usr/bin/env python
# -*- coding=utf8 -*-

import asyncio
from typing import Union

from yarl import URL
from pamqp import specification as spec
from aio_pika.exceptions import ChannelClosed
from aio_pika import RobustConnection, RobustChannel, Message
from aiormq.types import ConsumerCallback, ArgumentsType, TimeoutType

from common import config, logger


class AChannel(RobustChannel):

    async def publish(
        self,
        message: Message,
        routing_key: str,
        *,
        mandatory: bool = True,
        immediate: bool = False,
        timeout: Union[int, float] = None
    ):
        default_exchange = self.default_exchange
        if default_exchange is None:
            logger.error("PublishMessageFailed: channel not connected")
            return None
        return await default_exchange.channel.publish(
            message,
            routing_key,
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout
        )

    async def basic_publish(
        self,
        body: bytes,
        *,
        exchange: str = "",
        routing_key: str = "",
        properties: spec.Basic.Properties = None,
        mandatory: bool = False,
        immediate: bool = False,
        timeout: TimeoutType = None
    ):
        default_exchange = self.default_exchange
        if default_exchange is None:
            logger.error("PublishMessageFailed: channel not connected")
            return None
        return await default_exchange.channel.basic_publish(
            body,
            exchange=exchange,
            routing_key=routing_key,
            properties=properties,
            mandatory=mandatory,
            immediate=immediate,
            timeout=timeout
        )

    async def basic_consume(
        self,
        queue: str,
        consumer_callback: ConsumerCallback,
        *,
        no_ack: bool = False,
        exclusive: bool = False,
        arguments: ArgumentsType = None,
        consumer_tag: str = None,
        timeout: TimeoutType = None
    ):
        default_exchange = self.default_exchange
        if default_exchange is None:
            logger.error("ConsumeMessageFailed: channel not connected")
            return None
        return await default_exchange.channel.basic_consume(
            queue,
            consumer_callback,
            no_ack=no_ack,
            exclusive=exclusive,
            arguments=arguments,
            consumer_tag=consumer_tag,
            timeout=timeout
        )

    async def basic_get(
        self,
        queue: str = "",
        no_ack: bool = False,
        timeout: TimeoutType = None
    ):
        default_exchange = self.default_exchange
        if default_exchange is None:
            logger.error("ConsumeMessageFailed: channel not connected")
            return None
        return await default_exchange.channel.basic_get(
            queue,
            no_ack,
            timeout
        )

    async def ensure_queue(self, name: str):
        try:
            queue = await self.get_queue(name, ensure=True)
            return queue
        except ChannelClosed:
            logger.error("GetQueueFailed: queue %s not found" % name)
            await self.reopen()
            return None

    async def iter_queue(self, name: str, **kwargs):
        queue = await self.ensure_queue(name)
        if queue is None:
            return
        async with queue.iterator(**kwargs) as q:
            async for message in q:
                yield message


class ARabbitmq(RobustConnection):
    instance = None
    CHANNEL_CLASS = AChannel

    async def channel(
        self,
        channel_number: int = None,
        publisher_confirms: bool = True,
        on_return_raises=False,
        timeout: Union[int, float] = None
    ):
        if self.connection is None:
            await self.connect(
                timeout=self.kwargs.get("timeout"),
                client_properties=self.kwargs.get("client_properties"),
                loop=self.loop
            )
        channel = super().channel(channel_number, publisher_confirms, on_return_raises)
        if channel._channel is None:
            await channel.initialize(timeout)
        return channel

    @classmethod
    def create_rabbitmq(cls):
        if cls.instance is None:
            # query params
            kw = {
                "heartbeat": 10
            }
            # build url
            url = URL.build(
                scheme="amqp",
                host=config.rabbitmq_host,
                port=config.rabbitmq_port,
                user=config.rabbitmq_username,
                password=config.rabbitmq_password,
                # yarl >= 1.3.0 requires path beginning with slash
                path="/" + (config.rabbitmq_vhost != "/" and config.rabbitmq_vhost or ""),
                query=kw,
            )
            # instantiation rabbitmq connection
            cls.instance = cls(
                url=url,
                loop=asyncio.get_event_loop(),
            )
        return cls.instance

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-10-16 19:36:13  更:2021-10-16 19:38:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 10:36:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计