学习内容:学习消息队列和Swagger2(Day73)
1、ActiveMQ的基本概念 2、RabbitMQ的基本概念 3、安装RabbitMQ 4、Java 客户端访问 5、Swagger2自动生成API文档
1、ActiveMQ的基本概念
(1)JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
(2)JMS优点 异步:JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。 可靠:JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题,只是避免而不是杜绝,所以在一些糟糕的环境下还是有可能会出现重复。
(3)JMS数据交互的两种方式 点对点消息模型: 1.消息只有一个接收者 2.息发送者和消息接受者并没有时间依赖性。 3.当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息; 4.接收者收到消息的时候,会发送确认收到通知(acknowledgement)。
发布/订阅消息模型: 1.消息可以传递给多个订阅者 2.布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。 3.了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
(4)消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 1.当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。 2.消息队列主要解决了应用耦合、异步处理、流量削锋等问题。 3.当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能。
2、RabbitMQ的基本概念
(1)RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
(2)RabbitMQ的特点 1.可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 2.灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 3.消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 4.高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 5.多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 6.多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 7.管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 8.跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 9.插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
(3)所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
1.Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 2.Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。 3.Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 4.Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 5.Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 6.Connection网络连接,比如一个TCP连接。 7.Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 8.Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 9.Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是 AMQP 概念的基础,必须在连接时指定,RabbitMQ默认的 vhost 是 / 。 10.Broker:表示消息队列服务器实体。
(4)AMQP 中的消息路由:AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
(5)Exchange 类型 Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型: 1.direct:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。 2.fanout:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。 3.topic:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。
主题交换机是非常强大的,当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。 所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
3、安装RabbitMQ
(1)Linux安装:官网地址 https://www.rabbitmq.com/download.html,下载安装Erlang环境http://www.erlang.org/downloads 文件上传,上传到/usr/local/rabbitmq目录下,安装文件 rpm -ivh erlang-21.3-1.el7.x86_64.rpm yum install socat -y rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm 添加开机启动 RabbitMQ 服务chkconfig rabbitmq-server on 启动服务 /sbin/service rabbitmq-server start 查看服务状态 /sbin/service rabbitmq-server status 停止服务(选择执行) /sbin/service rabbitmq-server stop 开启 web 管理插件 rabbitmq-plugins enable rabbitmq_management 开启后可以用浏览器访问,推荐使用谷歌浏览器访问,不然可能访问失败 用默认账号密码(guest)访问地址 http://192.168.7.5:15672/出现权限问题 添加一个新的用户 rabbitmqctl add_user admin 123 设置用户角色 rabbitmqctl set_user_tags admin administrator 设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 查看当前用户和角色 rabbitmqctl list_users 关闭应用的命令为 rabbitmqctl stop_app 清除的命令为 rabbitmqctl reset 重新启动命令为 rabbitmqctl start_app
4、Java 客户端访问
(1)RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。 maven工程的pom文件中添加依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
</dependencies>
消息生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123");
factory.setHost("192.168.7.5");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hello";
for (int i = 0; i < 1000; i++) {
byte[] messageBodyBytes = ("这是一段消息" + i).getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
conn.close();
}
}
消息消费者
@Slf4j
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123");
factory.setHost("192.168.7.5");
Connection conn = factory.newConnection();
final Channel channel = conn.createChannel();
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hello";
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
log.info("消费的路由键:" + routingKey);
log.info("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
log.info("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
log.info(bodyStr);
}
});
}
}
}
5、Swagger2自动生成API文档
(1)swagger可以让我们更好的书写API文档的框架。 Swagger优点 1.代码变,文档变。只需要少量的注解,Swagger 就可以根据代码自动生成 API 文档,很好的保证了文档的时效性。 2.跨语言性,支持 40 多种语言。 3.Swagger UI 呈现出来的是一份可交互式的 API 文档,我们可以直接在文档页面尝试 API 的调用,省去了准备复杂的调用参数的过程。 4.还可以将文档规范导入相关的工具(例如 SoapUI), 这些工具将会为我们自动地创建自动化测试。
(2)常用注解 @Api:修饰整个类,描述Controller的作用 @ApiOperation:描述一个类的一个方法,或者说一个接口 @ApiParam:单个参数描述 @ApiModel:用对象来接收参数 @ApiProperty:用对象接收参数时,描述对象的一个字段 @ApiResponse:HTTP响应其中1个描述 @ApiResponses:HTTP响应整体描述 @ApiIgnore:使用该注解忽略这个API @ApiError :发生错误返回的信息 @ApiImplicitParam:一个请求参数 @ApiImplicitParams:多个请求参数 @Api:用在请求的类上,表示对类的说明。tags=“说明该类的作用,可以在UI界面上看到的注解”,value=“该参数没什么意义,在UI界面上也看到,所以不需要配置” @ApiOperation:用在请求的方法上,说明方法的用途、作用。value=“说明方法的用途、作用”,notes=“方法的备注说明” @ApiImplicitParams:用在请求的方法上,表示一组参数说明 @ApiImplicitParam:用在@ApiImplicitParams注解中,指定一个请求参数的各个方面。name:参数名,value:参数的汉字说明、解释,required:参数是否必须传,paramType:参数放在哪个地方。header --> 请求参数的获取:@RequestHeader。query --> 请求参数的获取:@RequestParam。path(用于restful接口)–> 请求参数的获取:@PathVariable body。form(不常用)。dataType:参数类型,默认String,其它值dataType=“Integer” 。defaultValue:参数的默认值 @ApiResponses:用在请求的方法上,表示一组响应 @ApiResponse:用在@ApiResponses中,一般用于表达一个错误的响应信息。code:数字,例如400。message:信息,例如"请求参数没填好"。response:抛出异常的类 @ApiModel:用于响应类上,表示一个返回响应数据的信息(这种一般用在post创建的时候,使用@RequestBody这样的场景,请求参数无法使用@ApiImplicitParam注解进行描述的时候) @ApiModelProperty:用在属性上,描述响应类的属性
(3)Springboot项目pom中添加
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
SwaggerConfig配置文件
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.swagger.swaggerdemo"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("标题")
.description("描述")
.contact(new Contact("笑脸???!", null, null))
.version("版本")
.build();
}
}
使用案例
@Api(value = "SwaggerTestController",description = "Swagger")
@RequestMapping("/test")
@RestController
public class SwaggerTestController {
@ApiOperation(value = "Swagger2入门")
@ApiImplicitParams(
@ApiImplicitParam(name = "name",value = "这是一个参数",required = true,paramType = "query")
)
@RequestMapping(value = "/helloSwagger2", method = RequestMethod.GET)
public Object helloSwagger2(@RequestParam String name){
Map map = new HashMap();
map.put("hello",name);
return map;
}
}
启动项目,浏览器访问http://localhost:8080/swagger-ui.html
|