Being new to asyncio, after publishing the previous post on running multiple applications in one event loop, I also cross posted it to the discussion board for feedback. So apparently instead of cramming everything to the same event loop, it would be better if each application run on a separate thread. That makes sense, considering all the code that was written for that.
So, for a very long time, the code read as follows:
def done_handler(task: Future, exit_event: settings.Event) -> None:
if task.exception() is not None:
logger.exception(task.exception())
shutdown_handler(None, None, exit_event)
def shutdown_handler_threadpool(_signum, _frame, exit_event: settings.Event) -> None:
logger.info("MAIN: Sending exit event")
exit_event.set()
def main():
with ThreadPoolExecutor(max_workers=10) as executor:
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
signal.signal(
s, partial(shutdown_handler_threadpool, exit_event=exit_event)
)
tasks = [
executor.submit(lambda: asyncio.run(telegram_run(exit_event))),
executor.submit(lambda: asyncio.run(discord_run(exit_event))),
executor.submit(lambda: asyncio.run(web_run(exit_event))),
]
for task in tasks:
task.add_done_callback(partial(done_handler, exit_event=exit_event))
As this is not really a mission-critical application, and the main goal of the exercise is simplicity, I removed most of the code managing coroutines, and went with just asyncio.run
for now. Everything then worked properly, and all I only needed to do in addition to the above was to port all the events, queues, and locks to not block while running in this setup (with use of asyncio.get_event_loop().run_in_executor()
).
No big deal.
I continued adding experimental features to it. And one day I realized I could get IFTTT to return some LLM generated text. The way it works is rather complicated, as it involves a few http requests. So in my setup, I had another web application running, separate from this application, just to handle IFTTT requests. So if I were to plug the bot to the service, the flow would work as follows:
- The user sends a command through the bot
- The bot receives the command, and relay it to IFTTT through their webhook
- IFTTT receives the command, process it and generate result
- IFTTT sends the result to my web application endpoint
- My web endpoint collects the result, and send it back to the chatroom
So with the setup, I would need to store another set of bot tokens and secrets in a separate application, which isn’t ideal, and I also needed to rewrite quite a portion of that application because it was not asyncio-ready. Then I realized my bot runs a web application too at the moment, to purely receive update messages from telegram.
It might be a good idea to just merge these two projects, I figured.
There were some existing applets that weren’t really in use, and were set up for experimental purpose. I started cleaning and archiving a lot of them, then I started porting the code over to my bot. Everything worked fine, and I was quite happy with it.
Until I decided to adopt FastAPI.
You see, FastAPI has been a new tool everyone is talking about these days. So I got curious, and started to explore how easy it was to port the code, which was written for aiohttp. The bot’s web application did not really need a database, so porting was trivial (still prefer Django for any web projects requiring a database), and I like how easy it is to read input from query strings, headers, and even the request body, through function parameters.
After finishing the web application, I put in code to run it through uvicorn.run
. This was when I started to see a lot of weird problems. Coroutines started to be scheduled to the wrong event loop, which worked perfectly fine before. After a lot of struggle, I found out in a bug report that says uvicorn does not like to run in a thread. There perhaps were fixes to the problem, but I eventually decided to just went a step further and started putting it in a process.
The plan was to run 2 different processes, one for the FastAPI web application served by uvicorn
, another runs the telegram and discord bot, each in a separate thread, similar to the previous setup. Surprisingly, the resulting code isn’t too complicated.
def done_handler(
future: Future,
name: str,
exit_event: settings.Event | settings.PEvent,
is_process=False,
) -> None:
logger.info(
"MAIN: Task is done, prompting others to quit",
name=name,
is_process=is_process,
future=future,
)
if future.exception() is not None:
logger.exception(future.exception())
shutdown_handler(None, None, exit_event, is_process)
def shutdown_handler(
_signum, _frame, exit_event: settings.Event | settings.PEvent, is_process=False
) -> None:
logger.info("MAIN: Sending exit event to all tasks in pool")
exit_event.set()
async def bot_run(pexit_event: settings.PEvent) -> None:
exit_event = settings.Event()
with ThreadPoolExecutor(max_workers=10) as executor:
task_submit(
executor,
exit_event,
"bot.telegram",
lambda: asyncio.run(telegram_run(exit_event)),
)
task_submit(
executor,
exit_event,
"bot.discord",
lambda: asyncio.run(discord_run(exit_event)),
)
await pexit_event.wait()
logger.info("MAIN: Received process exit signal, sending exit event to threads")
exit_event.set()
def process_run(func, pexit_event: settings.PEvent) -> None:
asyncio.run(func(pexit_event))
def task_submit(
executor: ProcessPoolExecutor | ThreadPoolExecutor,
exit_event: settings.PEvent | settings.Event,
name: str,
*task,
) -> Future:
is_process, future = (
isinstance(executor, ProcessPoolExecutor),
executor.submit(*task),
)
future.add_done_callback(
partial(
done_handler,
name=name,
is_process=is_process,
exit_event=exit_event,
)
)
logger.info(
"MAIN: Task is submitted", name=name, is_process=is_process, future=future
)
return future
def main():
manager = multiprocessing.Manager()
pexit_event = settings.PEvent(manager.Event())
with ProcessPoolExecutor(max_workers=3) as executor:
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
signal.signal(s, partial(shutdown_handler, exit_event=pexit_event))
task_submit(executor, pexit_event, "bot", process_run, bot_run, pexit_event)
task_submit(executor, pexit_event, "web", process_run, web_run, pexit_event)
Nothing much noteworthy, except I made pexit_event.wait
a non-blocking operation (which is why bot_run
is also a coroutine). I did not foresee the complexity of this project to grow to this extent, but I am happy that at least for now it still remains rather readable. Next, I am planning to introduce database operations to it, but am not quite happy with sqlmodel as it doesn’t have much information/tutorials about migrations.