Notes on codes, projects and everything

Latest Post

Learning asyncio by redoing my messaging bot

I used to develop a bot, partly for work, that fetches current latest petrol retail price in Malaysia. The bot was really an experiment, but at the time it worked well. Then a few years later, out of boredom, I revisited the project after finding the telegram bot library is moving towards asyncio. It was great (at least a lot of people rave about it), but also at the same time intimidating, I learned about coroutines and used gevent in the past, but not asyncio itself.

While glancing through the documentation, I realized that discord’s python library already uses asyncio, so I thought maybe there’s a way to run both together. I didn’t want to fix the original bot, so I started a new project, and cram everything from the tutorials in it, verified both works, then called it a day. It was mostly a hack, and I didn’t put in time and effort into understanding how everything works. So I had something like this (I’m keeping only the asyncio related code in all examples in this post):

import discord
from telegram.ext import ApplicationBuilder

client = discord.Client()

@client.event
async def on_ready():
    """
    Get telegram setup done here
    """
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()

    # -- snipped handlers registration --

    await application.initialize()
    await application.start()

    queue = await application.updater.start_polling()

    while True:
        update = await queue.get()
        logger.info("TG update=%s", update)
        queue.task_done()

if __name__ == "__main__":
    client.run(os.environ["DISCORD_TOKEN"])

It worked as intended, and I was happy. It really didn’t matter that the code does not make much sense, and I am happy I didn’t have to deal with all the asyncio calls.

Fast-forward to last weekend, I finally took some time to see if I can spend some time to fix it properly, after some attempts to add new useless features to it (well, it was an experimental bot anyway).

Firstly, I needed to find a way to break off the abstraction both library does to asyncio, so I can properly put them into the event loop in a way that makes sense. So to my future self, an event loop works somewhat like a timeline, where it holds tasks. Concurrency is achieved by strategically scheduling tasks to the event loop. By using an event loop, we can manage multiple tasks at the same time, with tools provided by asyncio.

So what happened with the first iteration is that, the event loop is kinda being abstracted away from discord’s python library. The hack somehow worked still for some reason, as the telegram application managed to figure out how to hook onto the event loop somehow. Obviously, the first thing to do was to remove the abstraction, and start managing the event loop on my own.

Then for a few days, my code looks something like this, after reading a lot of documentation, random tutorial all across the internet, and stackoverflow.

client = discord.Client(..)

def telegram_run() -> Application:
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()

    # -- snipped handlers registration --

    await application.initialize()
    await application.start()

    if application.updater:
        queue = await application.updater.start_polling()

        while True:
            update = await queue.get()
            logger.info(update=update)
            queue.task_done()

async def main():
    await asyncio.gather(
        client.start(os.environ["DISCORD_TOKEN"]),
        telegram_run(),
    )

if __name__ == "__main__":
    asyncio.run(main())

Apparently asyncio.gather is a new tool provided by asyncio, so developers do not have to deal with the event loop directly, as shown in the code above. So no more bizarre loop.run_forever() or loop.run_until_complete() calls in code, and so far it reads somewhat like some script I wrote in the past that uses ThreadPoolExecutor in the past (off-topic: kinda look forward to the nogil thing to land in near future).

The new code works mostly fine, I don’t like it, but I continued working on it, adding new feature, and eventually decided to rewrite the telegram part with a custom webhook. More complexity, yay. I then found some tutorials online, picked one that seems to make sense to me, and started coding away. Thankfully, there’s a section in the wiki of telegram python library that briefly discuss how to run a custom webhook with it. Then we have a new coroutine added to the event loop.

from aiohttp import web

client = discord.Client(..)

async def telegram_run() -> NoReturn:
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()

    # -- snipped handlers registration --

    await application.bot.set_webhook(
        f'{os.environ["WEBHOOK_URL"]}/telegram',
        allowed_updates=Update.ALL_TYPES,
        secret_token=SECRET_TOKEN,
    )

    await application.initialize()

    async with application:
        await application.start()
        while True:
            await asyncio.sleep(3600)


async def web_run() -> NoReturn:
    application = web.Application()

    runner = web.AppRunner(application)
    await runner.setup()

    site = web.TCPSite(runner, port=8080)
    await site.start()

    while True:
        await asyncio.sleep(3600)


async def main():
    global client

    await asyncio.gather(
        web_run(web_init()),
        client.start(os.environ["DISCORD_TOKEN"]),
        telegram_run(),
    )


if __name__ == "__main__":
    asyncio.run(main())

So if you are paying attention, you see two infinite loops in the code, that practically does nothing. The one in web_run which is the aiohttp server code, is added because I saw it in the tutorial. Then for telegram, according to the example, I should run the web server between the application.start() and application.stop() coroutines, but I would rather want the web server to run directly in main.

Now that I am running a custom webhook (which can be a good thing, as I can get it to host other endpoints not exclusively for telegram), and the decision to not put the web server coroutine call within the two coroutines, I decided to just put in an infinite loop there, just like the web server. Due to await application.start() ends immediately (unlike the .start() coroutine in discord that runs indefinitely), and if the function telegram_webhook exits after the await call (means I am skipping the await application.stop() call altogether, like in the current iteration), the telegram bot probably thinks the application died, and hence anything sent by the webhook endpoint will fail.

Another reason I didn’t like the current state of the code is that, there are actually some calls I needed to make to properly end the session for all these clients/applications. However, due to the infinite loop (for both telegram and webhook application), and lack of control of the discord client’s start coroutine, I didn’t know what to do. I was quite close to rewrite all these components into 3 separate scripts, or throw them into separate processes.

So eventually I came across this, and started looking into subscribing to SIGTERM then somehow properly exit the coroutines. In one of the stackoverflow answers, I saw a mention of this tutorial, that gave me some idea to do it properly. After some quick reading, I decided to somehow cheat by listening to asyncio.CancelledEvent. While it is not exactly what is discussed in the article, but I think it was a good beginning. So, the code became this:

client = discord.Client(..)

async def discord_run():
    global client

    try:
        logger.info("DISCORD: Starting")
        await client.start(os.environ["DISCORD_TOKEN"])

    except asyncio.CancelledError:
        if not client.is_closed():
            logger.info("DISCORD: Stopping")
            await client.close()

            logger.info("DISCORD: Stopped")

async def telegram_run() -> NoReturn:
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()

    # -- snipped handlers registration --

    await application.bot.set_webhook(
        f'{os.environ["WEBHOOK_URL"]}/telegram',
        allowed_updates=Update.ALL_TYPES,
        secret_token=settings.SECRET_TOKEN,
    )

    try:
        async with application:
            logger.info("TELEGRAM: Starting")
            await application.start()

            while True:
                await asyncio.sleep(3600)

    except (RuntimeError, asyncio.CancelledError):
        if application.running:
            logger.info("TELEGRAM: Stopping")
            await application.stop()

            logger.info("TELEGRAM: Stopped")

async def web_run() -> NoReturn:
    application = web.Application()

    logger.info("WEBHOOK: Starting", url=os.environ["WEBHOOK_URL"])
    web_runner = web.AppRunner(application)
    await web_runner.setup()

    web_site = web.TCPSite(web_runner, port=8080)
    await web_site.start()

    try:
        while True:
            await asyncio.sleep(3600)

    except asyncio.CancelledError:
        logger.info("WEBHOOK: Stopping")
        await web_site.stop()
        await web_runner.cleanup()

        logger.info("WEBHOOK: Stopped")

async def shutdown_handler(tasks, loop: asyncio.AbstractEventLoop) -> None:
    for task in tasks:
        if task is not asyncio.current_task():
            task.cancel()

async def main(loop: asyncio.AbstractEventLoop) -> None:
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(telegram_run()),
            tg.create_task(discord_run()),
            tg.create_task(web_run(web_init(telegram.application))),
        ]

        for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                s, lambda: asyncio.create_task(shutdown_handler(tasks, loop))
            )


if __name__ == "__main__":
    with asyncio.Runner() as runner:
        runner.run(main(runner.get_loop()))

Closer to ideal execution, as in when I attempt to terminate the process it doesn’t throw me exceptions. However I still don’t find it appealing, it still feels like a hack, then I started to put in the time and patience to start reading through the tutorial from the beginning, and re-learn things the proper way. It is a very well-written article by Lynn Root, so please go have a look if you are interested in learning more about asyncio in general.

In short, the final iteration looks like this

async def discord_run(exit_event: asyncio.Event):
    global client

    logger.info("DISCORD: Starting")
    asyncio.create_task(client.start(os.environ["DISCORD_TOKEN"]))

    await exit_event.wait()

    logger.info("DISCORD: Stopping")
    await client.close()

async def telegram_run(exit_event: asyncio.Event) -> None:
    application = ApplicationBuilder().token(os.environ["TELEGRAM_TOKEN"]).build()

    # -- snipped handlers registration --

    asyncio.create_task(
        application.bot.set_webhook(
            f'{os.environ["WEBHOOK_URL"]}/telegram',
            allowed_updates=Update.ALL_TYPES,
            secret_token=settings.SECRET_TOKEN,
        )
    )

    async with application:
        logger.info("TELEGRAM: Starting")
        await application.start()

        await exit_event.wait()

        logger.info("TELEGRAM: Stopping")
        await application.stop()

async def web_run(exit_event: asyncio.Event) -> None:
    application = web.Application()

    logger.info("WEBHOOK: Starting", url=os.environ["WEBHOOK_URL"])
    web_runner = web.AppRunner(application)
    await web_runner.setup()

    web_site = web.TCPSite(web_runner, port=8080)
    await web_site.start()

    await exit_event.wait()

    logger.info("WEBHOOK: Stopping")
    await web_site.stop()
    await web_runner.cleanup()

def exception_handler(
    loop: asyncio.AbstractEventLoop, context: dict[str, Any], exit_event: asyncio.Event
) -> None:
    message = context.get("exception", context["message"])
    logger.error("Caught exception", message=message)

    logger.error("MAIN: Shutting down")
    asyncio.create_task(shutdown_handler(loop, exit_event))


async def shutdown_handler(
    loop: asyncio.AbstractEventLoop, exit_event: asyncio.Event
) -> None:
    logger.info("MAIN: Sending exit event")
    exit_event.set()

    await asyncio.sleep(5)

    tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

    loop.stop()

def main() -> None:
    loop, exit_event = asyncio.get_event_loop(), asyncio.Event()

    loop.set_exception_handler(partial(exception_handler, exit_event=exit_event))

    for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            s,
            lambda: asyncio.create_task(shutdown_handler(loop, exit_event)),
        )

    try:
        loop.create_task(telegram_run(exit_event))
        loop.create_task(discord_run(exit_event))
        loop.create_task(web_run(exit_event, web_init(telegram.application)))
        loop.run_forever()
    finally:
        loop.close()


if __name__ == "__main__":
    main()

Firstly I removed the asyncio.run(main()) call and have full control of the event loop in this iteration. The reason I stopped using TaskGroup is that it will immediately causes all tasks in the group to cancel (resulting in a CancelledError) if any of them fail. Now that I am handling the quitting part on my own, this behavior is not ideal, especially in the current iteration I removed all CancelledEvent exception handling. Secondly, I removed the infinite loop eyesore (yeap, very subjective, I know), and used an exit Event instead to gracefully ending the coroutines. Lastly, I also added a global exception handler as shown in the tutorial, which allow the program to exit properly if anything bad happens.

While asyncio is very powerful, however so far I rarely see library developers providing much support to it. On the other hand, a lot of them offer a layer of abstraction, that either makes them the only application that can run on the event loop, or wanting to be the manager for other third party coroutines. I find it rather unfortunate given how useful it is, especially when one needs to develop a solution to a concurrency problem. I see there are efforts to make it easier to adapt (through asyncio.run, asyncio.gather etc.), but would be nice if there are more tutorials available somewhere, and library developers put in effort to provide support to work with other libraries without trying to assert exclusive control over the event loop.

If you are interested in the silly bot project, please have a look at the github page. There’s nothing much it can do right now, the only thing it does that can be considered useful is to fetch the latest petrol retail price in Malaysia, that’s it. Do add the bot in your channel/server if you are interested, and shoot me a ping somehow.

Leave a comment

Other new Posts

Click to change color scheme