Notes on codes, projects and everything

Adding complexity to the bot: threads and processes

Being new to asyncio, after publishing the previous post on running multiple applications in one event loop, I also cross posted it to the discussion board for feedback. So apparently instead of cramming everything to the same event loop, it would be better if each application run on a separate thread. That makes sense, considering all the code that was written for that.

So, for a very long time, the code read as follows:

def done_handler(task: Future, exit_event: settings.Event) -> None:
    if task.exception() is not None:
        logger.exception(task.exception())

    shutdown_handler(None, None, exit_event)

def shutdown_handler_threadpool(_signum, _frame, exit_event: settings.Event) -> None:
    logger.info("MAIN: Sending exit event")
    exit_event.set()

def main():
    with ThreadPoolExecutor(max_workers=10) as executor:
        for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
            signal.signal(
                s, partial(shutdown_handler_threadpool, exit_event=exit_event)
            )

        tasks = [
            executor.submit(lambda: asyncio.run(telegram_run(exit_event))),
            executor.submit(lambda: asyncio.run(discord_run(exit_event))),
            executor.submit(lambda: asyncio.run(web_run(exit_event))),
        ]

        for task in tasks:
            task.add_done_callback(partial(done_handler, exit_event=exit_event))

As this is not really a mission-critical application, and the main goal of the exercise is simplicity, I removed most of the code managing coroutines, and went with just asyncio.run for now. Everything then worked properly, and all I only needed to do in addition to the above was to port all the events, queues, and locks to not block while running in this setup (with use of asyncio.get_event_loop().run_in_executor()).

No big deal.

I continued adding experimental features to it. And one day I realized I could get IFTTT to return some LLM generated text. The way it works is rather complicated, as it involves a few http requests. So in my setup, I had another web application running, separate from this application, just to handle IFTTT requests. So if I were to plug the bot to the service, the flow would work as follows:

  1. The user sends a command through the bot
  2. The bot receives the command, and relay it to IFTTT through their webhook
  3. IFTTT receives the command, process it and generate result
  4. IFTTT sends the result to my web application endpoint
  5. My web endpoint collects the result, and send it back to the chatroom

So with the setup, I would need to store another set of bot tokens and secrets in a separate application, which isn’t ideal, and I also needed to rewrite quite a portion of that application because it was not asyncio-ready. Then I realized my bot runs a web application too at the moment, to purely receive update messages from telegram.

It might be a good idea to just merge these two projects, I figured.

There were some existing applets that weren’t really in use, and were set up for experimental purpose. I started cleaning and archiving a lot of them, then I started porting the code over to my bot. Everything worked fine, and I was quite happy with it.

Until I decided to adopt FastAPI.

You see, FastAPI has been a new tool everyone is talking about these days. So I got curious, and started to explore how easy it was to port the code, which was written for aiohttp. The bot’s web application did not really need a database, so porting was trivial (still prefer Django for any web projects requiring a database), and I like how easy it is to read input from query strings, headers, and even the request body, through function parameters.

After finishing the web application, I put in code to run it through uvicorn.run. This was when I started to see a lot of weird problems. Coroutines started to be scheduled to the wrong event loop, which worked perfectly fine before. After a lot of struggle, I found out in a bug report that says uvicorn does not like to run in a thread. There perhaps were fixes to the problem, but I eventually decided to just went a step further and started putting it in a process.

The plan was to run 2 different processes, one for the FastAPI web application served by uvicorn, another runs the telegram and discord bot, each in a separate thread, similar to the previous setup. Surprisingly, the resulting code isn’t too complicated.

def done_handler(
    future: Future,
    name: str,
    exit_event: settings.Event | settings.PEvent,
    is_process=False,
) -> None:
    logger.info(
        "MAIN: Task is done, prompting others to quit",
        name=name,
        is_process=is_process,
        future=future,
    )

    if future.exception() is not None:
        logger.exception(future.exception())

    shutdown_handler(None, None, exit_event, is_process)


def shutdown_handler(
    _signum, _frame, exit_event: settings.Event | settings.PEvent, is_process=False
) -> None:
    logger.info("MAIN: Sending exit event to all tasks in pool")
    exit_event.set()


async def bot_run(pexit_event: settings.PEvent) -> None:
    exit_event = settings.Event()

    with ThreadPoolExecutor(max_workers=10) as executor:
        task_submit(
            executor,
            exit_event,
            "bot.telegram",
            lambda: asyncio.run(telegram_run(exit_event)),
        )
        task_submit(
            executor,
            exit_event,
            "bot.discord",
            lambda: asyncio.run(discord_run(exit_event)),
        )

        await pexit_event.wait()

        logger.info("MAIN: Received process exit signal, sending exit event to threads")
        exit_event.set()


def process_run(func, pexit_event: settings.PEvent) -> None:
    asyncio.run(func(pexit_event))


def task_submit(
    executor: ProcessPoolExecutor | ThreadPoolExecutor,
    exit_event: settings.PEvent | settings.Event,
    name: str,
    *task,
) -> Future:
    is_process, future = (
        isinstance(executor, ProcessPoolExecutor),
        executor.submit(*task),
    )

    future.add_done_callback(
        partial(
            done_handler,
            name=name,
            is_process=is_process,
            exit_event=exit_event,
        )
    )
    logger.info(
        "MAIN: Task is submitted", name=name, is_process=is_process, future=future
    )

    return future


def main():
    manager = multiprocessing.Manager()
    pexit_event = settings.PEvent(manager.Event())

    with ProcessPoolExecutor(max_workers=3) as executor:
        for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
            signal.signal(s, partial(shutdown_handler, exit_event=pexit_event))

        task_submit(executor, pexit_event, "bot", process_run, bot_run, pexit_event)
        task_submit(executor, pexit_event, "web", process_run, web_run, pexit_event)

Nothing much noteworthy, except I made pexit_event.wait a non-blocking operation (which is why bot_run is also a coroutine). I did not foresee the complexity of this project to grow to this extent, but I am happy that at least for now it still remains rather readable. Next, I am planning to introduce database operations to it, but am not quite happy with sqlmodel as it doesn’t have much information/tutorials about migrations.

Related Posts Plugin for WordPress, Blogger...

leave your comment

name is required

email is required

have a blog?

This blog uses scripts to assist and automate comment moderation, and the author of this blog post does not hold responsibility in the content of posted comments. Please note that activities such as flaming, ungrounded accusations as well as spamming will not be entertained.

Click to change color scheme