Be careful with Python asyncio.wait

Recently, I stumbled upon an annoying issue with Python's asyncio module, that is asyncio.wait terminating your program with "success" status when error occurs.

What is abnormal here?

Normally, when exception occurs, and you don't want to recover from it (with try... except), your program will exit as failure (non-zero exit code). However, if you have this code:

asyncio.wait(futures)

and one of the future encounters exception, your program will exit, but with status 0 (success).

Tip: One way to know your program exit status is to see from shell. I use zsh as shell, and use one of the theme from oh-my-zsh, which has been configured to show exit status of executed command.

Exit status

How is it important?

Let start with my story. At AgriConnect, I am developing an IoT application for farming. One module of the application, which I call Collector, is for capturing sensor data, via serial connection and MQTT. This module is written in asynchronous, coroutine-based style, with asyncio. The brief structure of the program is like this:

async def collect_serial():
    # Implementation here with pyserial.
    # Repeatly read data from serial connection.
    pass

async def collect_mqtt():
    # Implementation here with hbmqtt.
    # Subscribe to some topics and receive messages.
    pass


# -- Start main program -- #
if __name__ == '__main__':  # Avoid loading the block below in test
    coroutines = []
    if not args.no_serial:
        coroutines.append(collect_serial())
    if not args.no_mqtt:
        coroutines.append(collect_mqtt())
    if coroutines:
        try:
            loop.run_until_complete(asyncio.wait(coroutines))
        except CancelledError:
            pass

    print('\nBye!')

When it is to run many coroutines (collect_serial and collect_mqtt) at the same times, we have two options: asyncio.wait and asyncio.gather. At first, I used asyncio.wait because I don't really need to collect the returned values of the two coroutines. They actually have infinite while loop. Also, its way of call asyncio.wait(coroutines) looks nicer than the gather counterpart asyncio.gather(*coroutines).

I'm aware that there will be some error, because the program is processing data come from outside. For things that I can foresee, I have code to handle. But for error that I cannot foresee or it will make my code complex to handle, I decide to let it interrupt my program, and have some mechanism to restart my program. I take advantage of systemd for this. Here is part of *.service file to tell systemd to launch my app at start up:

[Service]
User=debian
Group=debian

Type=simple
WorkingDirectory=/home/debian/PlantingHouse/ControlView
ExecStart=/usr/bin/python3.6 -m collector
TimeoutStopSec=20
Restart=on-failure
RestartSec=2

By that, if some errors make my program to stop as failure, systemd will restart the program for me 2 seconds later.

One of the errors that this solution wants to cover is a hbmqtt's bug. However, when deployed to production, sometimes my Collector was stopped and stays unlaunched. It caused a big impact to our product, because we missed sensor data, and that missing made the greenshouse controller act wrongly. By looking to systemd-journald log, I know that the bug above happened and terminated my program. But mysteriously, systemd didn't do its job as I planned. Investigated further and I found that my Collector exited with 0 (success) in such case, so systemd directive Restart=on-failure cannot apply.

I know that I can apply a workaround, changing that directive to Restart=always. But I don't like workaround. I want a thorough solution, that is:

  1. hbmqtt bug is fixed.
  2. The exit code has to be correct.

The 1st part is out of my reach, because I'm still not master in asyncio yet, I cannot fix hbmqtt myself yet (neither the author). And actually, fixing the 2nd part is already enough.

After many times of research, try this method, try that method, finally I found out the weird behaviour of async.wait.

What is solution?

Yes, after found out the root cause, the solution becomes simple: just replacing asyncio.wait with asyncio.gather.

Before solving the issue of unexpected exit status, I actually have another option: revert my Collector back to callback style, using already-stable paho-mqtt. But I like to train myself in new skill, coroutine. I need to practice more and more, even so adventurous that I use it in production code.