How can I recover unacknowledged AMQP messages from other channels than my connection's own?

RabbitmqCeleryAmqpPikaCeleryd

Rabbitmq Problem Overview


It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    
    
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Rabbitmq Solutions


Solution 1 - Rabbitmq

Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.

Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.

Solution 2 - Rabbitmq

If messages are unacked there are only two ways to get them back into the queue:

  1. basic.nack

    This command will cause the message to be placed back into the queue and redelivered.

  2. Disconnect from the broker

    This action will force all unacked messages from this channel to be put back into the queue.

NOTE: basic.recover will try to republish unacked messages on the same channel (to the same consumer), which is sometimes the desired behaviour.

RabbitMQ spec for basic.recover and basic.nack


The real question is: Why are the messages unacknowledged?

Possible scenarios to cause unacked messages:

  1. Consumer fetching too many messages, then not processing and acking them quickly enough.

    Solution: Prefetch as few messages as appropriate.

  2. Buggy client library (I have this issue currently with pika 0.9.13. If the queue has a lot of messages, a certain number of messages will get stuck unacked, even hours later.

    Solution: I have to restart the consumer several times until all unacked messages are gone from the queue.

Solution 3 - Rabbitmq

All the unacknowledged messages will go to ready state once all the workers/consumers are stopped.

Ensure all workers are stopped by confirming with a grep on ps aux output, and stopping/killing them if found.

If you are managing workers using supervisor, which shows as worker is stopped, you may want to check for zombies. Supervisor reports the worker to be stopped but still you will find zombie processes running when grepped on ps aux output. Killing the zombie processes will bring messages back to ready state.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionWill OlbrysView Question on Stackoverflow
Solution 1 - RabbitmqBrian KellyView Answer on Stackoverflow
Solution 2 - RabbitmqIvanDView Answer on Stackoverflow
Solution 3 - RabbitmqVenkat KotraView Answer on Stackoverflow