1、什么是MQ
MQ(Message Quene):
翻译为j消息队列,通过典型的生产者和消费者模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松的实现系统间解耦。别名为消息中间件—通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
#1.ActiveMQ
ActiveNQ是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JNS规范的的消息中间件。丰富的APT,多种集群架构模式让ActivelQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
#2.Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。e.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。.
#3.RocketMQ
RocketNQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketNQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
#4.RabbitMQ
RabbitMQ比Kafila可靠,Kafika更适合10高吞吐的处理,一般应用在大数据日志处理或对实时性〈少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
RabbitMQ比Kafila可靠,Kafika更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性〈少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2、安装配置
官方文档:https://www.rabbitmq.com/download.html
2.1、安装ERlang
第一种模型
最简单的消费模型点对点
package helloword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
@Test
public void testSendMessage() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("47.");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("s");
connectionFactory.setPassword("");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",null,"hello rabbitmq1".getBytes());
channel.close();
connection.close();
}
}
package helloword;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("");
connectionFactory.setPassword("");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
可以发现获取连接的时候,代码都是相同的,可以封装一个工具类
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("");
connectionFactory.setPassword("");
}
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChanel(Channel channel,Connection conn){
try {
if (channel != null) channel.close();
if (conn != null) conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
package workquene;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for (int i =1; i<20; i++) {
channel.basicPublish("","work",null,(i + "hello work quene").getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
package workquene;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1:"+new String(body));
}
});
}
}
package workquene;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-2:"+new String(body));
}
});
}
}
两个消费者以平均分配的方式进行,如果一个消费者执行的慢,那么就出现一个消费者执行完成等另一个消费者的情况
能者多劳
package workquene;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者-1:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");
channel.basicPublish("logs","",null,"fanout type message".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
package fanout;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs","fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者-1"+ new String(body));
}
});
}
}
结果发现:所有的消费者都接收到了广播
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String routingkey = "error";
channel.basicPublish("logs_direct",routingkey,null,("这是direct模型发布的:["+ routingkey+"]类型").getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
package direct;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs_direct","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+new String(body));
}
});
}
}
package topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String routekey = "user.find.save";
channel.basicPublish("topics",routekey,null,("这里是topic动态路由模型,routekey:【"+routekey+"】").getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
package topic;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
public class Customer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1:"+ new String(body));
}
});
}
}
springboot整合rabbitmq
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 47.94.156.204
port: 5672
username: ems
password: 123
virtual-host: /ems
方式一:
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
rabbitTemplate.convertAndSend("hello","hello world");
}
}
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {
@RabbitHandler
public void recivel(String message) {
System.out.println("message = " + message);
}
}
|