Consumer is not receiving message from MQ when message is sent before consumer is listening

I am using MQs for the first time and attempting to implement a logging system with RabbitMQ. My implementation involves a 'sender'

/*
 * This class sends messages over MQ
 */
public class MQSender {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String[] levels = {"green", "orange", "red", "black"};
        for (String log_level : levels) {
            String message = "This is a " + log_level + " message";
            System.out.println("Sending " + log_level + " message");
            //publish the message with each of the bindings in levels
            channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
        }

        channel.close();
        connection.close();
    }
}

Which sends one message for each of my colors to the exchange, where the color will be used as bindings. And it involves a 'receiver'

public class MQReceiver {
    private final static String EXCHANGE_NAME = "mm_exchange";
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        receiveMessagesFromQueue(2);
    }

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        /*
         * Boilerplate stuff
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //declare the exchange that messages pass through, type=direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //generate random queue
        String queueName = channel.queueDeclare().getQueue();

        //set bindings from 0 to maxLevel for the queue
        for (int level = 0; level <= maxLevel; level++) {
            channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
        }

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while(true) {
            //waits until a message is delivered then gets that message
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

which is given as a parameter a number representing which color bindings I would like it to be fed from the exchange.

In my implementation, and in RabbitMQ in general, it seems like messages are stored in the exchange until the Consumer asks for them, at which point they are distributed to their respective queues and then sent one at a time to the client (or consumer in MQ lingo). My problem is that when I run the MQSender class before running the MQReceiver class the messages never get delivered. But when I run the MQReceiver class first the messages are received. From my understanding of MQ I would think that the messages should be stored on the server until the MQReceiver class is run, then the messages should be delivered to their consumers, however this is not what is happening. My main question is whether these messages can be stored in an exchange and if not, where should they be stored so that they will be delivered once a consumer (i.e. my MQReceiver class) is called?

Thanks for your help!

Answers


RabbitMQ discards messages if their routing key doesn't match any queues bound to the exchange. When you start MQSender first, no queues are bound, so the messages it sends are lost. When you start MQReceiver, it binds queues to the exchange, so RabbitMQ has a place to put the message from MQSender. When you stop MQReceiver, since you created an anonymous queue, the queue and all bindings are removed from the exchange.

If you want messages to be stored on the server while MQReceiver is not running, you need the receiver to create a named queue, and bind the routing keys to that queue. Note that creating a named queue is idempotent, and the queue won't be created if it already exists. Then you need the receiver to pull messages off the named queue.

Change your code to look something like this:

MQSender

....
String namedQueue = "logqueue";
//declare named queue and bind log level routing keys to it.
//RabbitMQ will put messages with matching routing keys in this queue
channel.queueDeclare(namedQueue, false, false, false, null);
for (int level = 0; level < LOG_LEVELS.length; level++) {
   channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]);
}
...

MQReceiver

...
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

QueueingConsumer consumer = new QueueingConsumer(channel);

//Consume messages off named queue instead of anonymous queue
String namedQueue = "logqueue";
channel.basicConsume(namedQueue, true, consumer);

while(true) {
...

Need Your Help

Find non-english channels in YouTube API

c# youtube youtube-api

I'm trying to find Dutch YouTube channels using the YouTube api v3.

How to bypass “Adobe Flash Player has stopped a potentially unsafe operation.”?

flash actionscript-3 urlloader urlrequest

I have created an advent calendar for the blog of a friend, and to fetch/get/download the gifts of each day, the flash animation requests a file located at "http://domain.com/folder/etcetc/gifts.ph...

About UNIX Resources Network

Original, collect and organize Developers related documents, information and materials, contains jQuery, Html, CSS, MySQL, .NET, ASP.NET, SQL, objective-c, iPhone, Ruby on Rails, C, SQL Server, Ruby, Arrays, Regex, ASP.NET MVC, WPF, XML, Ajax, DataBase, and so on.