地址
官方地址:https://www.rabbitmq.com 官方代码demo地址:https://www.rabbitmq.com/getstarted.html
使用Docker安装
docker安装RabbitMQ
docker pull rabbitmq:3.7.7-management
docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.7.7-management
个人简单总结
hello word :方式–简单方式 使用默认转发 声明一个队列即可 一个生产者对应一个消费者,也可以多个消费者
work queue 一个生产者对应多个消费者 使用非自动确认&basicQos 限制每个通道的消费数量,达到视消费能力来分配资源
Publish/Subscribe(发布订阅模式–一般一个消费者一个队列) 创建一个路由器exchange ,生产者将消息全部发送到这个路由器 消费者创建随机队列(独占、自动删除、不持久化):当创建的时候就在监听
Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。 这个* # 的匹配是以.号分隔为单词的的 22.22.22 这里是三个词 22.* 匹配的是22.22 不会匹配222
Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 Headers:我们可以不考虑它。
Routing–在发布订阅模式中加入rout_key Direct Exchange:直接路由
Topics-在Routing模式中加入模式匹配(类似*这种符号)
java代码使用
hello word
流程
生产者
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQProducter {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
connectionFactory.setConnectionTimeout(10000);
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = String.valueOf(random.nextInt(1000));
channel.queueDeclare("hello", true, false, false, null);
long start = System.currentTimeMillis();
int i = 0;
while(true) {
message = String.valueOf(i++);
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
System.out.println("send:\t"+message);
Thread.sleep(500);
if( (System.currentTimeMillis() - start)> 10*1000 ) {
break;
}
}
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
消费者
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQCustomer {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("get\t" + message);
}
};
channel.basicConsume("hello", true, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
Work queues
流程
生产者
代码使用Hello Word 的即可
消费者
消费者1
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQCustomerWork_1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("work-1 get\t" + message);
doWork();
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(2,false);
channel.basicConsume("hello", false, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
public static void doWork() {
try {
System.out.println("work_1-----");
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者2
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQCustomerWork_2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, false, null);
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("work-2 get\t" + message);
doWork();
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(2,false);
channel.basicConsume("hello", false, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
public static void doWork() {
try {
System.out.println("work_2-----");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Publish/Subscribe
流程
生产者
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ExchangeProducter {
static final String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
connectionFactory.setConnectionTimeout(10000);
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = String.valueOf(random.nextInt(1000));
long start = System.currentTimeMillis();
int i = 0;
while(true) {
message = String.valueOf(i++);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("send:\t"+message);
Thread.sleep(500);
if( (System.currentTimeMillis() - start)> 10*1000 ) {
break;
}
}
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
消费者
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ExchangeConsumer_1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, ExchangeProducter.EXCHANGE_NAME, "");
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Consumer-1 get\t" + message);
doWork();
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(2,false);
channel.basicConsume(queueName, false, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
public static void doWork() {
try {
System.out.println("Consumer-----");
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Routing
流程
生产者
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DirectExchangeProducter {
static final String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
connectionFactory.setConnectionTimeout(10000);
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = String.valueOf(random.nextInt(1000));
long start = System.currentTimeMillis();
String roule = null;
int i = 0;
while(true) {
message = String.valueOf(i++);
switch (i%3) {
case 0:
roule = "0";
break;
case 1:
roule = "1";
break;
case 2:
roule = "2";
break;
default:
roule = "3";
break;
}
channel.basicPublish(EXCHANGE_NAME, roule, null, message.getBytes("UTF-8"));
System.out.println("send:\t"+message);
Thread.sleep(500);
if( (System.currentTimeMillis() - start)> 10*1000 ) {
break;
}
}
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
消费者
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DirectExchangeConsumer_1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
String[] bindRoultKeys = {"1","2"};
for (String key : bindRoultKeys) {
channel.queueBind(queueName, DirectExchangeProducter.EXCHANGE_NAME, key);
}
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Consumer-1 get\t" + message);
doWork();
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(2,false);
channel.basicConsume(queueName, false, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
public static void doWork() {
try {
System.out.println("Consumer-----");
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Topics
流程
生产者
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicExchangeProducter {
static final String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
connectionFactory.setConnectionTimeout(10000);
Random random = new Random();
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = String.valueOf(random.nextInt(1000));
long start = System.currentTimeMillis();
String roule = null;
int i = 0;
while(true) {
message = String.valueOf(i++);
switch (i%3) {
case 0:
roule = "1.1.1";
break;
case 1:
roule = "1.2";
break;
case 2:
roule = "2.1";
break;
default:
roule = "2.2";
break;
}
channel.basicPublish(EXCHANGE_NAME, roule, null, message.getBytes("UTF-8"));
System.out.println("send:\t"+message+"\t roult:"+roule);
Thread.sleep(500);
if( (System.currentTimeMillis() - start)> 10*1000 ) {
break;
}
}
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
消费者
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicExchangeConsumer_1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(()->{
ConnectionFactory connectionFactory =new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("192.168.81.132");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, TopicExchangeProducter.EXCHANGE_NAME, "1.*");
long start = System.currentTimeMillis();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Consumer-1 get\t" + message);
doWork();
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(2,false);
channel.basicConsume(queueName, false, consumer);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
public static void doWork() {
try {
System.out.println("Consumer-----");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
|