
交换机和队列支持持久化。现在我们也需要给消息设计元数据
DeliveryMode
设置为2,表示支持消息的持久化。


=======================================================================================
接上一边博文。 修改文件:
发送者:
package org.example.sender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消息生产者 发送消息
*/
@Component
public class MessageSender {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param info
*/
public void send(String info)
{
System.out.println("发送消息>>>"+info);
// CorrelationData correlationData = new CorrelationData();
//
// String uuid = UUID.randomUUID().toString();
// System.out.println(uuid);
//
// correlationData.setId(uuid);
// rabbitTemplate.convertAndSend("amqp-topic","huawei.a",info,correlationData);
/**
* public static int toInt(MessageDeliveryMode mode) {
* switch (mode) {
* case NON_PERSISTENT:
* return 1;
* case PERSISTENT:
* return 2;
* default:
* return -1;
*/
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setExpiration("20000"); // 设置过期时间20秒
messageProperties.setAppId("abc123456");
messageProperties.setHeader("国家1","中国1");
messageProperties.setHeader("国家2","中国2");
Message message = new Message(info.getBytes(), messageProperties);
System.out.println(message);
rabbitTemplate.convertAndSend("amqp-topic", "huawei.a", message);
}
}



消费者:
package org.example.receiver;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class TopicReceiver {
//分别监听名称为xiaomi、huawei的队列
@RabbitListener(queues = "xiaomi")
public void handlerXM(Message message,String msg, Channel channel) throws IOException {
System.out.println("小米:"+msg);
//手动签收,不启动批量签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println(message.getMessageProperties().getDeliveryTag());
}
@RabbitListener(queues = "huawei")
public void handlerHW(Message message,String msg, Channel channel) throws IOException {
System.out.println("华为:"+msg);
System.out.println(message.getMessageProperties().getHeaders());
System.out.println((String) message.getMessageProperties().getHeader("国家2"));
System.out.println(message);
System.out.println(message.getMessageProperties().getExpiration());
System.out.println(message.getMessageProperties().getAppId());
//手动签收,不启动批量签收
//告诉rmq签收的消息的id。以及是否批量签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}



======================================================================================
参考:https://blog.csdn.net/qq_43623492/article/details/124259773




所有评论(0)