Being new to asyncio, after publishing the previous post on running multiple applications in one event loop, I also cross posted it to the discussion board for feedback. So apparently instead of cramming everything to the same event loop, it would be better if each application run on a separate thread. That makes sense, considering all the code that was written for that.
So, for a very long time, the code read as follows:
def done_handler(task: Future, exit_event: settings.Event) -> None:
    if task.exception() is not None:
        logger.exception(task.exception())
    shutdown_handler(None, None, exit_event)
def shutdown_handler_threadpool(_signum, _frame, exit_event: settings.Event) -> None:
    logger.info("MAIN: Sending exit event")
    exit_event.set()
def main():
    with ThreadPoolExecutor(max_workers=10) as executor:
        for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
            signal.signal(
                s, partial(shutdown_handler_threadpool, exit_event=exit_event)
            )
        tasks = [
            executor.submit(lambda: asyncio.run(telegram_run(exit_event))),
            executor.submit(lambda: asyncio.run(discord_run(exit_event))),
            executor.submit(lambda: asyncio.run(web_run(exit_event))),
        ]
        for task in tasks:
            task.add_done_callback(partial(done_handler, exit_event=exit_event))
As this is not really a mission-critical application, and the main goal of the exercise is simplicity, I removed most of the code managing coroutines, and went with just asyncio.run for now. Everything then worked properly, and all I only needed to do in addition to the above was to port all the events, queues, and locks to not block while running in this setup (with use of asyncio.get_event_loop().run_in_executor()).
No big deal.
I continued adding experimental features to it. And one day I realized I could get IFTTT to return some LLM generated text. The way it works is rather complicated, as it involves a few http requests. So in my setup, I had another web application running, separate from this application, just to handle IFTTT requests. So if I were to plug the bot to the service, the flow would work as follows:
- The user sends a command through the bot
- The bot receives the command, and relay it to IFTTT through their webhook
- IFTTT receives the command, process it and generate result
- IFTTT sends the result to my web application endpoint
- My web endpoint collects the result, and send it back to the chatroom
So with the setup, I would need to store another set of bot tokens and secrets in a separate application, which isn’t ideal, and I also needed to rewrite quite a portion of that application because it was not asyncio-ready. Then I realized my bot runs a web application too at the moment, to purely receive update messages from telegram.
It might be a good idea to just merge these two projects, I figured.
There were some existing applets that weren’t really in use, and were set up for experimental purpose. I started cleaning and archiving a lot of them, then I started porting the code over to my bot. Everything worked fine, and I was quite happy with it.
Until I decided to adopt FastAPI.
You see, FastAPI has been a new tool everyone is talking about these days. So I got curious, and started to explore how easy it was to port the code, which was written for aiohttp. The bot’s web application did not really need a database, so porting was trivial (still prefer Django for any web projects requiring a database), and I like how easy it is to read input from query strings, headers, and even the request body, through function parameters.
After finishing the web application, I put in code to run it through uvicorn.run. This was when I started to see a lot of weird problems. Coroutines started to be scheduled to the wrong event loop, which worked perfectly fine before. After a lot of struggle, I found out in a bug report that says uvicorn does not like to run in a thread. There perhaps were fixes to the problem, but I eventually decided to just went a step further and started putting it in a process.
The plan was to run 2 different processes, one for the FastAPI web application served by uvicorn, another runs the telegram and discord bot, each in a separate thread, similar to the previous setup. Surprisingly, the resulting code isn’t too complicated.
def done_handler(
    future: Future,
    name: str,
    exit_event: settings.Event | settings.PEvent,
    is_process=False,
) -> None:
    logger.info(
        "MAIN: Task is done, prompting others to quit",
        name=name,
        is_process=is_process,
        future=future,
    )
    if future.exception() is not None:
        logger.exception(future.exception())
    shutdown_handler(None, None, exit_event, is_process)
def shutdown_handler(
    _signum, _frame, exit_event: settings.Event | settings.PEvent, is_process=False
) -> None:
    logger.info("MAIN: Sending exit event to all tasks in pool")
    exit_event.set()
async def bot_run(pexit_event: settings.PEvent) -> None:
    exit_event = settings.Event()
    with ThreadPoolExecutor(max_workers=10) as executor:
        task_submit(
            executor,
            exit_event,
            "bot.telegram",
            lambda: asyncio.run(telegram_run(exit_event)),
        )
        task_submit(
            executor,
            exit_event,
            "bot.discord",
            lambda: asyncio.run(discord_run(exit_event)),
        )
        await pexit_event.wait()
        logger.info("MAIN: Received process exit signal, sending exit event to threads")
        exit_event.set()
def process_run(func, pexit_event: settings.PEvent) -> None:
    asyncio.run(func(pexit_event))
def task_submit(
    executor: ProcessPoolExecutor | ThreadPoolExecutor,
    exit_event: settings.PEvent | settings.Event,
    name: str,
    *task,
) -> Future:
    is_process, future = (
        isinstance(executor, ProcessPoolExecutor),
        executor.submit(*task),
    )
    future.add_done_callback(
        partial(
            done_handler,
            name=name,
            is_process=is_process,
            exit_event=exit_event,
        )
    )
    logger.info(
        "MAIN: Task is submitted", name=name, is_process=is_process, future=future
    )
    return future
def main():
    manager = multiprocessing.Manager()
    pexit_event = settings.PEvent(manager.Event())
    with ProcessPoolExecutor(max_workers=3) as executor:
        for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
            signal.signal(s, partial(shutdown_handler, exit_event=pexit_event))
        task_submit(executor, pexit_event, "bot", process_run, bot_run, pexit_event)
        task_submit(executor, pexit_event, "web", process_run, web_run, pexit_event)
Nothing much noteworthy, except I made pexit_event.wait a non-blocking operation (which is why bot_run is also a coroutine). I did not foresee the complexity of this project to grow to this extent, but I am happy that at least for now it still remains rather readable. Next, I am planning to introduce database operations to it, but am not quite happy with sqlmodel as it doesn’t have much information/tutorials about migrations.
