rabbitmq多个消费者同时接收_RabbitMQ实现公平派遣任务的方法(rabbitmq,编程语言)

时间:2024-05-03 10:24:51 作者 : 石家庄SEO 分类 : 编程语言
  • TAG :

    rabbitmq%E5%A4%9A%E4%B8%AA%E6%B6%88%E8%B4%B9%E8%80%85%E5%90%8C%E6%97%B6%E6%8E%A5%E6%94%B6_RabbitMQ%E5%AE%9E%E7%8E%B0%E5%85%AC%E5%B9%B3%E6%B4%BE%E9%81%A3%E4%BB%BB%E5%8A%A1%E7%9A%84%E6%96%B9%E6%B3%95

先来看一下 RabbitMQ 工作队列和竞争消费者简化模型:


P:(producer/ publisher):生产者,一个发送消息的用户应用程序。
C1:消费者1,一个主要用来等待接收消息的用户应用程序1。
C2:消费者2,一个主要用来等待接收消息的用户应用程序2。
(C1 和 C2 指代多个消费者(工作人员))
工作队列(任务队列):主要用于在多个工作人员(消费者)之间分配耗时的任务,主要思想是避免必须等待某些立即执行的资源密集型任务完成的尴尬局面发生。将任务封装为消息并将其发送到队列,这样,安排的任务可以在以后完成。当有很多消费者时,任务将在他们之间共享,一个消息只能被一个消费者获取。

这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求窗口内处理复杂的任务。

生产者发送新任务消息:

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//循环发送 50 条信息
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 50; i++) {
String message = "task..." + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x]Sent:" + message);
try {
Thread.sleep(i * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

channel.close();
connection.close();

}
}
消费者1(消费者1设置消费耗时):

假设消耗 1 秒来处理消费过程

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 设置每个消费者同时只能处理一条消息
//channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消费者1] Received '" + msg + "'");
try{
Thread.sleep(1000); // 模拟消费者耗时
}catch (InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
消费者2(消费者2不设置消费耗时):

public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 设置每个消费者同时只能处理一条消息
//channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消费者2] Received '" + msg + "'");
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
首先启动消费者1(Worker)和消费者2(Worker2)应用程序,再启动生产者(NewTask)。

运行结果如下:

消费者1:


消费者2:


由结果可见,两个消费者各自消费了 25 条信息,消息各不相同,从而实现了任务的分发。

公平派遣任务

由结果,我们注意到,工作者1(消费者1)的处理任务比较忙碌(消费耗时),而消费者2却有大量时间处于空闲状态不做任何任务。而 RabbitMQ 对此一无所知,进行平均分配消息,这是因为 RabbitMQ 在消息进入队列才进行信息调度,不会进行未确认消息数,只是盲目地将消息发送给消费者。

为了解决这一点,可以使用 basicQos 方法并将入参参数设置为 1,这样做的意义是:RabbitMQ 一次不要给消费者一个以上的消息。换句话即是在消费者处理并确认上一条信息之前,不要向其发送新的消息,而是分派给不繁忙的消费者进行消费。

在消费者1和消费者2加入一段代码:

// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
再次运行结果:

消费者1:


消费者2:


因此,通过设置 basicQos ,可以让 RabbitMQ 实现分轻重地对任务进行分派。

本文:rabbitmq多个消费者同时接收_RabbitMQ实现公平派遣任务的方法的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:Web前端中JavaScript常见BUG及修复方法是怎样的下一篇:

21 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18