Getting comfortable to asyncio takes a bit of practice, so I revisited a practice project I did when I was working for my previous company. Suppose I want to build a very simple websocket application, without use of any web application library/framework. In order to keep it simple, I also opt to just build the frontend with minimal setup (just plain ES6 without webpack/vite).
Just in case I don’t get confused, I am following the definition of coroutine function and object from the documentation. Also for the sake of simplicity in explanining, I am assuming each visitor makes exactly 1 websocket connection to the application.
To start, I need to figure out how to build the application itself. The corresponding specification I am looking for is ASGI – Asynchronous Server Gateway Interface. For the scope of the project, my only concern is on the application side, so essentially it is just a coroutine function with 3 arguments:
async def application(scope, receive, send):
match scope["type"]:
case "lifespan":
await lifespan(scope, receive, send)
case "http":
await http(scope, receive, send)
case "websocket":
await websocket(scope, receive, send)
case rest:
logger.info(rest)
For the sake of completeness, I add a rest
case, but for our very simple experimental project we only care about both http
and websocket
part. However, let’s take a brief moment and talk about lifespan
first.
Lifespan
The lifespan
case is mostly for setting up things in a real web application, for example a database and/or cache server connection. For every lifespan event we received through the receive
coroutine function, we can send responses to the server to indicate support for lifespan events.
LIFESPAN_STARTUP = "lifespan.startup"
LIFESPAN_SHUTDOWN = "lifespan.shutdown"
async def lifespan(scope, receive, send):
while event := await receive():
if event["type"] == LIFESPAN_STARTUP:
logger.info("lifespan startup")
await send({"type": "lifespan.startup.complete"})
elif event["type"] == LIFESPAN_SHUTDOWN:
logger.info("lifespan shutdown")
await send({"type": "lifespan.shutdown.complete"})
break
Some servers may issue a warning if it fails to receive responses for lifespan events, for example uvicorn which is the one I am using for this example. In our case, it doesn’t really matter, I am just including the code as a stub, which essentially does nothing remarkable. Also notice how I am putting this into a loop, as we are really expecting lifespan
coroutine function to be called exactly once.
HTTP
Next, let’s cover http, as it is relatively simpler. Previously, in WSGI – Web Server Gateway Interface it is just a function with two arguments (example taken from PEP 3333)
def simple_app(environ, start_response):
"""Simplest possible application object"""
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
Shifting to ASGI further breaks the simplicity a little bit, it is still a 2-step process, but they are now all done through the send
coroutine function. In our case, it would look something like this
HTTP_REQUEST = "http.request"
HTTP_DISCONNECT = "http.disconnect"
async def http(scope, receive, send):
while event := await receive():
logger.info("Incoming event", **event, path=scope["path"].lstrip(scope["root_path"]))
if event["type"] == HTTP_REQUEST and scope["path"].lstrip(scope["root_path"]) == "/":
await index(send)
elif event["type"] == HTTP_REQUEST:
await not_found(send)
elif event["type"] == HTTP_DISCONNECT:
break
async def not_found(send):
await send({"type": "http.response.start", "status": 404})
await send({"type": "http.response.body", "body": None})
async def index(send):
with open(Path(__file__).parent / "index.html", "rb") as template:
await send({"type": "http.response.start", "status": 200})
await send({"type": "http.response.body", "body": template.read()})
Like lifespan, the server calls the application once for each http request. We then figure out when the request starts, and ends through the receive
coroutine function. Then through the use of send
coroutine function, we return the header (as well as status code) and body separately.
For the record, the html file is just simply:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<link
href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css"
rel="stylesheet"
integrity="sha384-QWTKZyjpPEjISv5WaRU9OFeRpok6YctnYmDr5pNlyT2bRjXh0JMhjY6hW+ALEwIH"
crossorigin="anonymous"
/>
<title>Test Chatroom</title>
</head>
<body>
<div class="container">
<div class="d-flex flex-column vh-100">
<h1 class="flex-shrink-0">Test chatroom</h1>
<div id="messages" class="flex-grow-1 overflow-y-scroll"></div>
<form id="send" class="flex-shrink-0">
<input
class="form-control form-control-lg"
type="text"
aria-label=".form-control-lg example"
/>
</form>
</div>
</div>
<script src=" https://cdn.jsdelivr.net/npm/[email protected]/chance.min.js "></script>
<script
type="text/javascript"
src="https://code.jquery.com/jquery-3.7.1.js"
></script>
<script src=" https://cdn.jsdelivr.net/npm/[email protected]/lodash.min.js "></script>
<script
src="https://cdn.jsdelivr.net/npm/@popperjs/[email protected]/dist/umd/popper.min.js"
integrity="sha384-I7E8VVD/ismYTF4hNIPjVp/Zjvgyol6VFvRkX/vR+Vc4jQkC+hVqc2pM8ODewa9r"
crossorigin="anonymous"
></script>
<script
src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js"
integrity="sha384-0pUGZvbkm6XF6gxjEnlmuGrJXVbNuzT9qBBavbLwCsOGabYfZo0T0to5eqruptLy"
crossorigin="anonymous"
></script>
<script id="message" type="text/template">
<div>
<p><strong><%= sender %>:</strong> <%= message %></p>
</div>
</script>
<script async type="text/javascript">
document.addEventListener("DOMContentLoaded", () => {
const ws = new WebSocket("/chat");
const sender = chance.name();
ws.addEventListener("message", (e) => {
document
.getElementById("messages")
.dispatchEvent(
new CustomEvent("update", { detail: JSON.parse(e.data) })
);
});
const elem = document.querySelector("#send input");
elem.setAttribute("placeholder", sender.concat(":"));
elem.focus();
document.getElementById("send").addEventListener("submit", (e) => {
e.preventDefault();
ws.send(
JSON.stringify({
sender,
message: $("input", e.target).val(),
})
);
e.target.querySelector("input").value = "";
});
document.getElementById("messages").addEventListener("update", (e) => {
const elem = new DOMParser().parseFromString(
_.template(document.getElementById("message").innerHTML)({
sender: e.detail.sender,
message: e.detail.message,
}),
"text/html"
).body.firstChild;
e.target.appendChild(elem);
e.target.lastElementChild.scrollIntoView(false);
jQuery(elem).hide().fadeIn();
});
});
</script>
</body>
</html>
I probably don’t need popper.js and bootstrap.js, but keeping them there anyway. I could have also skipped jQuery, but I couldn’t find a simple way to fade text into the chatroom with just one line, so I am keeping it there. Just a sidenote, I did the first iteration of the script with jQuery, and eventually port them over to the new DOM methods and es6 syntax, it really makes me appreciate how things are made easier these days (yes I feel old).
Nothing much to talk about in the frontend, it is just purely establishing connection, as well as sending and receiving chat messages.
Websocket
So let say in our fictional project, we are expecting multiple users to come to the chatroom and chat their problems away. So we are expecting our server, in this case uvicorn, can serve them efficiently. In order to do that, we run multiple workers, which in turns each running in their own process. Hence we need to set up our application to listen to chat messages submitted by visitors, and broadcast them to everyone. We would then need 2 things to keep track of them
manager = multiprocessing.Manager()
queue = manager.Queue()
subscribers = manager.dict()
The first is a queue
that collects the chat messages from our visitors. Secondly, is a list of subscribers
, which is the list of currently connected visitors. While we are going to consume the messages in the main process, the worker processes would be the one producing the messages, hence we need multiprocessing.Manager()
here to ensure process-safety.
The corresponding application code is here as follows:
WEBSOCKET_CONNECT = "websocket.connect"
WEBSOCKET_ACCEPT = "websocket.accept"
WEBSOCKET_RECEIVE = "websocket.receive"
WEBSOCKET_SEND = "websocket.send"
WEBSOCKET_DISCONNECT = "websocket.disconnect"
async def websocket(scope, receive, send):
subscriber = "".join(random.choices(string.ascii_letters, k=7))
subqueue = manager.Queue()
exit_event = asyncio.Event()
asyncio.create_task(consume(subqueue, send, exit_event))
while event := await receive():
logger.info("Incoming event", **event, path=get_path(scope))
if event["type"] == WEBSOCKET_CONNECT and get_path(scope) == "/chat":
await asyncio.to_thread(partial(setitem, subscribers, subscriber, subqueue))
await accept(send)
elif event["type"] == WEBSOCKET_RECEIVE and get_path(scope) == "/chat":
asyncio.create_task(asyncio.to_thread(partial(queue.put, event["text"])))
elif event["type"] == WEBSOCKET_DISCONNECT:
exit_event.set()
asyncio.create_task(
asyncio.to_thread(partial(delitem, subscribers, subscriber))
)
break
I am being lazy here, so for each websocket connection I am creating a new queue. Based on our assumption, this new queue is exclusively made for the visitor, to receive the broadcast of all chat messages. We consume the queue through this coroutine function:
async def consume(subqueue, send, exit_event):
while not exit_event.is_set():
with contextlib.suppress(Empty):
text = await asyncio.to_thread(partial(subqueue.get, timeout=3))
asyncio.create_task(send({"type": WEBSOCKET_SEND, "text": text}))
logger.info("Stopped subscription to messages")
While the data structure is process-safe, but waiting for a value is going to block the asynchronous code, so we are wrapping the work and throw it to another thread through asyncio.to_thread
here. A timeout
is set to allow other scheduled coroutines in the event loop to run, when we have nothing to read from the queue. The exit_event
is simply there to let the coroutine to exit when the websocket connection closes.
Like HTTP, each connection is one function call, hence I use a loop to listen to events throughout the connection. Every time we receive a connection request to our endpoint /chat
, we first add the previously created queue to the collection of subscribers we created previously. Then, we respond to the connection request, indicating that we are ready by sending the first message:
async def accept(send):
await send({"type": WEBSOCKET_ACCEPT})
await send(
{
"type": WEBSOCKET_SEND,
"text": json.dumps({"sender": "server", "message": "You are connected"}),
}
)
For subsequent messages we receives from our visitors, we would love to queue them to the broadcast queue. In this case we just want to make sure it is scheduled, but don’t really care when it ends, so we just let asyncio to schedule it for us, through the asyncio.create_task
function (yay asynchronous programming!).
Lastly, when the connection closes, we send an exit event to stop the consume
coroutine function above. We also remove the subscriber queue from the collection of subscribers.
Now let’s run the application
async def repeat(coro_function: Callable[[], Awaitable[None]]):
while True:
await coro_function()
async def broadcast():
with contextlib.suppress(Empty):
text = await asyncio.to_thread(partial(queue.get, timeout=3))
for name, subscriber in await asyncio.to_thread(subscribers.items):
logger.info("broadcasting to consumers", name=name, text=text)
asyncio.create_task(asyncio.to_thread(subscriber.put, text))
async def main(app):
asyncio.create_task(repeat(broadcast))
server = uvicorn.Server(
uvicorn.Config(
app=app,
host="0.0.0.0",
port=8080,
log_level="info",
workers=4,
)
)
await server.serve()
if __name__ == "__main__":
asyncio.run(main(application))
This is a toy project, so I am keeping this simple. So I started by creating a task to broadcast received chat messages to other visitors. Then while that is scheduled and running in the event loop repeatedly, we start our server to serve our application through uvicorn. The good thing about ASGI is that, if I am not happy with uvicorn, theoretically I should be able to swap it with something else like Hypercorn.