Olso amqpdriver doesn't always honor reply timeout

Bug #1379394 reported by Brent Tang
10
This bug affects 1 person
Affects Status Importance Assigned to Milestone
oslo.messaging
Fix Released
Undecided
Joshua Harlow

Bug Description

The way the Oslo amqpdriver is set up for callers that are waiting for replies/responses from a message is that the first thread in will sit in the _poll_connection method in the amqpdriver.py module calling down to the underlying messaging driver with the timeout that that caller specified, and then all other callers waiting for replies on the same reply queue will just get blocked waiting for the given reply.

This ReplyWaiter's _poll_connection method will on a reply check to see if this is the message this caller is waiting for and return if it is otherwise will notify the other callers waiting of the message and will go back waiting with the original timeout specified, and will only timeout if no messages come in to that reply queue within the overall timeout specified. The issue with this is that the original timeout specified is only really honored if no other messages come into that reply queue in the period of time, otherwise at minimum it will end up waiting a little longer than specified and at worst will hang forever.

This is the case that happens in for example nova-compute where you have a thread doing the service updates to the conductor and also have periodic tasks doing updates to the conductor. If the reply to one of those threads (says the service update thread) never responds (the services or message broker get restarted in the middle, some other action hangs, network issue, etc) or even takes a really long time, that thread will be stuck in the _poll_connection method, and then the periodic task thread comes in and makes calls to the conductor and the replies are received. If the interval for that other thread is less than the timeout is set to for replies, the service update thread will be stuck in _poll_connection forever.

To fix this issue, I think a similar fix needs to be done as is done in the AMQPListener's "poll" method in the same module, where it keeps track of the time that it has waited in that method and only will try to wait for the amount of time delta, so that it will honor the original timeout. This isn't ideal in a situation where the time changes (forward or backwards) on the system, but there seems to already be precedence for this in the poll method.

Here is the code snippet for the change that seems to work to me:

   def _poll_connection(self, msg_id, timeout):
        if timeout is not None and timeout > 0: <===== Line Added
            deadline = time.time() + timeout <===== Line Added
        else: <===== Line Added
            deadline = None <===== Line Added
        while True:
            while self.incoming:
                message_data = self.incoming.pop(0)

                incoming_msg_id = message_data.pop('_msg_id', None)
                if incoming_msg_id == msg_id:
                    return self._process_reply(message_data)

                self.waiters.put(incoming_msg_id, message_data)

            try:
                if deadline is not None: <===== Line Added
                    timeout = deadline - time.time() <===== Line Added
                    if timeout <= 0: <===== Line Added
                        raise rpc_common.Timeout() <===== Line Added
                self.conn.consume(limit=1, timeout=timeout)
            except rpc_common.Timeout:
                raise messaging.MessagingTimeout('Timed out waiting for a '
                                                 'reply to message ID %s'
                                                 % msg_id)

Changed in oslo.messaging:
assignee: nobody → Mehdi Abaakouk (sileht)
status: New → In Progress
Changed in oslo.messaging:
assignee: Mehdi Abaakouk (sileht) → Joshua Harlow (harlowja)
Revision history for this message
Vish Ishaya (vishvananda) wrote :
Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Fix merged to oslo.messaging (master)

Reviewed: https://review.openstack.org/137456
Committed: https://git.openstack.org/cgit/openstack/oslo.messaging/commit/?id=f1c7e78a56794f016ad63c28b036dff12e259953
Submitter: Jenkins
Branch: master

commit f1c7e78a56794f016ad63c28b036dff12e259953
Author: Joshua Harlow <email address hidden>
Date: Wed Nov 26 11:40:02 2014 -0800

    Have the timeout decrement inside the wait() method

    Currently it appears the timeout provided to the wait()
    method will be reused X number of times (where X may be
    indeterminate depending on reconnections, loss of messages
    and so-on) so instead of reusing it which can potentially
    result in a infinite number of calls have a new object be
    used that will cause the subsequent timeouts used elsewhere
    in the wait function to actually decay correctly.

    Closes-bug: #1379394

    Change-Id: I12c4ea1eef6b857d12246db0483adaf7c87e740c

Changed in oslo.messaging:
status: In Progress → Fix Committed
Mehdi Abaakouk (sileht)
Changed in oslo.messaging:
milestone: none → 1.5.0
status: Fix Committed → Fix Released
To post a comment you must log in.
This report contains Public information  
Everyone can see this information.

Other bug subscribers

Remote bug watches

Bug watches keep track of this bug in other bug trackers.