5_RabbitMQ连接工具类封装
定义工具类package helloworld.utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurre
·
定义工具类
package helloworld.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author: xj0927
* @Description:
* @Date Created in 2021-01-08 15:54
* @Modified By:
*/
public class RabbitMQUtils {
private static ConnectionFactory factory;
//静态代码块:类加载时执行一次
static {
factory = new ConnectionFactory();
factory.setHost("192.168.77.138");
factory.setPort(5672);
factory.setUsername("ems");
factory.setPassword("123");
factory.setVirtualHost("/ems");
}
//获取连接对象
public static Connection getConnection() {
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//关闭通道连接和连接对象
public static void closeConnection(Channel channel, Connection conn) {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
使用工具类
1. 开发生产者
public class Producer {
@Test
public void pro() throws IOException, TimeoutException {
//获取连接对象===>使用工具类
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1: 队列名称 如果队列不存在自动创建
//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
//参数3: exclusive 是否独占队列 true 独占队列 false 不独占
//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5: 额外附加参数
channel.queueDeclare("hello", true, false, false, null);
//发布消息
//参数1: 交换机名称 参数2:队列名称 参数3:传递息额外设置 参数4:消息的具体内容
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitmq".getBytes());
//关闭连接===>使用工具类
RabbitMQUtils.closeConnection(channel, connection);
}
}
2. 开发消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象===>使用工具类
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定队列:与生产端一致
channel.queueDeclare("hello", true, false, false, null);
//获取消息
//参数1: 消费那个队列的消息 队列名称
//参数2: 开始消息的自动确认机制
//参数3: 消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("取出消息:===>" + new String(body));
}
});
}
}
更多推荐
所有评论(0)