时间:2020-12-20 12:55:51 | 栏目:JAVA代码 | 点击:次
简介
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
模式:
SpringBoot集成RabbitMQ
一、引入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.2.RELEASE</version> </dependency>
二、配置application.properties
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualHost = /message-test/
三、编写AmqpConfiguration配置文件
package message.test.configuration;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfiguration {
/**
* 消息编码
*/
public static final String MESSAGE_ENCODING = "UTF-8";
public static final String EXCHANGE_ISSUE = "exchange_message_issue";
public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";
public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";
public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";
public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";
public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";
public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";
public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";
public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";
public static final String EXCHANGE_PUSH = "exchange_message_push";
public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";
@Autowired
private RabbitProperties rabbitProperties;
@Bean
public Queue issueUserQueue() {
return new Queue(QUEUE_ISSUE_USER);
}
@Bean
public Queue issueAllUserQueue() {
return new Queue(QUEUE_ISSUE_ALL_USER);
}
@Bean
public Queue issueAllDeviceQueue() {
return new Queue(QUEUE_ISSUE_ALL_DEVICE);
}
@Bean
public Queue issueCityQueue() {
return new Queue(QUEUE_ISSUE_CITY);
}
@Bean
public Queue pushResultQueue() {
return new Queue(QUEUE_PUSH_RESULT);
}
@Bean
public DirectExchange issueExchange() {
return new DirectExchange(EXCHANGE_ISSUE);
}
@Bean
public DirectExchange pushExchange() {
// 参数1:队列
// 参数2:是否持久化
// 参数3:是否自动删除
return new DirectExchange(EXCHANGE_PUSH, true, true);
}
@Bean
public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
}
@Bean
public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
}
@Bean
public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
}
@Bean
public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,
@Qualifier("issueExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
}
@Bean
public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,
@Qualifier("pushExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).withQueueName();
}
@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory)
{
return new RabbitTemplate(connectionFactory);
}
}
三、编写生产者
body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);
rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,
AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);
四、编写消费者
@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)
public void handlePushResult(@Payload byte[] data, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
}