Comment 0 for bug 1338732

Revision history for this message
Noel Burton-Krahn (noelbk) wrote :

Icehouse
oslo-messaging 1.3.0 (stable/icehouse branch from github, c21a2c2)
rabbitmq-server 3.1.3

Nova rpc calls fail often after rabbit restarts. I've tracked it down to oslo/rabbit/kombu timing out if it's forced to reconnect to rabbit. The code below times out waiting for a reply if the topic has been used in a previous run. The reply always arrives the first time a topic is used, or if the topic is none. But, the second run with the same topic will hang with this error:

MessagingTimeout: Timed out waiting for a reply to message ID ...

This problem seems too basic to not be caught earlier in oslo, but the program below does really reproduce the same symptoms we see in nova when run against a live rabbit server.

Here's a log from a test run: https://gist.github.com/noelbk/12adcfe188d9f54f971d

#! /usr/bin/python

from oslo.config import cfg
import threading
from oslo import messaging
import logging
import time
log = logging.getLogger(__name__)

class OsloTest():
    def test(self):
        # The code below times out waiting for a reply if the topic
        # has been used in a previous run. The reply always arrives
        # the first time a topic is used, or if the topic is none.
        # But, the second run with the same topic will hang with this
        # error:
        #
        # MessagingTimeout: Timed out waiting for a reply to message ID ...
        #
        topic = 'will_hang_on_second_usage'
        #topic = None # never hangs

        url = "%(proto)s://%(user)s:%(password)s@%(host)s/" % dict(
            proto = 'rabbit',
            host = '1.2.3.4',
            password = 'xxxxxxxx',
            user = 'rabbit-mq-user',
            )
        transport = messaging.get_transport(cfg.CONF, url)
        driver = transport._driver

        target = messaging.Target(topic=topic)
        listener = driver.listen(target)
        ctxt={"context": True}
        timeout = 10

        def send_main():
            log.debug("sending msg")
            reply = driver.send(target,
                                ctxt,
                                {'send': 1},
                                wait_for_reply=True,
                                timeout=timeout)

            # times out if topic was not None and used before
            log.debug("received reply=%r" % (reply,))

        send_thread = threading.Thread(target=send_main)
        send_thread.daemon = True
        send_thread.start()

        msg = listener.poll()
        log.debug("received msg=%r" % (msg,))

        msg.reply({'reply': 1})

        log.debug("sent reply")

        send_thread.join()

if __name__ == '__main__':
    FORMAT = '%(asctime)-15s %(process)5d %(thread)5d %(filename)s %(funcName)s %(message)s'
    logging.basicConfig(level=logging.DEBUG, format=FORMAT)
    OsloTest().test()