Streamlit, asyncio and MongoDB

Enabling async MongoDB operations in Streamlit.

Handmade Software
7 min readMay 21, 2024

--

Streamlit is a wonderful tool for building dashboards with its peculiar execution model, but using asyncio data sources with it can be a real pain. This article is about how you correctly use those two technologies together.

Streamlit and beanie: What are those?

Streamlit is a Python library designed to make it easy for developers to create beautiful, interactive web applications quickly, all using only Python code. It’s popular for data science projects, allowing for rapid prototyping and sharing of results.

We started using it for developing expert systems and dashboards, for which the library is perfect: quick prototyping, nice testing utils and responsiveness, everything a backend-heavy project’s interface needs.

Beanie is an asynchronous Python Object-Document Mapper (ODM) for MongoDB. It leverages the power of Pydantic for data validation and the async/await syntax of modern Python. Beanie makes it straightforward to work with MongoDB documents as Python objects while handling asynchronous operations seamlessly.

Problem

As amazing as Streamlit is, there is one major problem with it: it entirely ignores asyncio. And don’t get me wrong, in my not very humble opinion asyncio implementation in Python is extremely unfortunate and bulky, and just not pythonic in so many ways. But well, it gives you the benefits of particularly swift input-output, which isn’t so easy to achieve with other tools.

The funny part is, that Streamlit is based on asyncio and utilizes Tornado with asyncio engine in the background, but the event loop is not exposed to the eventlets and therefore cannot be used to start the coroutines. The frontend elements are rendered from top to the bottom with protobuffed data and web sockets as transport: every eventlet has an event stream in both directions and synchronized in real time. Isn’t that the web 3.0???

Seemingly this entire edifice is just screaming for asyncio, but the framework was created to be accessible for scientists first and let’s be honest, asyncio as it is right now would confuse even a professional software developer if they never touched that before. So the entire API for creating interface elements kept synchronous. And well, with tornado’s execution model, it’s not that much of a deal, in fact it’s how asyncio should have been done in Python in the first place.

Possible solutions

So let’s say you need to execute an asyncio routine fetching some MongoDB data, and it takes some significant time:

async def fetch_data():
count = await Product.find(fetch_links=True).count()
if not count:
await Product(name='test').save()
return await Product.find(fetch_links=True).to_list()

Solution 1: asyncio.run(), okay for simple cases

An obvious approach would be to use the sync-async bridge API of asyncio and just execute the code where it’s needed. asyncio.run() executes the code in a new event loop every time the requests comes, and that’s alright for simple cases. Full example:

import asyncio

from beanie import Document, init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
import streamlit as st


def get_client(event_loop=None):
if event_loop:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
io_loop=event_loop,
)
else:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
)
return client


class Product(Document):
name: str


async def init_database(client):
database = client.get_database(name='asyncio_streamlit_db')
await init_beanie(database=database, document_models=[Product])


async def fetch_data():
count = await Product.find(fetch_links=True).count()
if not count:
await Product(name='test').save()
return await Product.find(fetch_links=True).to_list()


async def main():
client = get_client()
await init_database(client)
products = await fetch_data()
for product in products:
st.write(product)


if __name__ == '__main__':
asyncio.run(main())

Where it breaks:

  • creating a separate connection for every web socket request can clearly become a luxury once your project scales
  • init_beanie can also take some while if you have many models to take in account
  • it breaks with concurrent operations: tornado is a multithreaded application, and multithreading doesn’t go so simple with asyncio.

Solution 2: optimized database initialization, one client per session?

So instead of creating a separate connection with the database and initializing beanie in every request, we could kind of cache it. Streamlit runs are isolated, so no context will be preserved between the runs. Solution: session state. So what if we cache our connection for each session?

import asyncio

from beanie import Document, init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
import streamlit as st


def get_client(event_loop=None):
if event_loop:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
io_loop=event_loop,
)
else:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
)
return client


class Product(Document):
name: str


async def init_database(client):
database = client.get_database(name='asyncio_streamlit_db')
await init_beanie(database=database, document_models=[Product])

COUNT = 10

async def fetch_data():
count = await Product.find(fetch_links=True).count()
if count < COUNT:
for i in range(COUNT):
await Product(name='test').save()
return await Product.find(fetch_links=True).limit(COUNT).to_list()


async def main():
if not st.session_state.get('client'):
st.session_state.client = get_client()
await init_database(st.session_state.client)
products = await fetch_data()
for product in products:
st.write(product)
st.button("Quick rerun")

if __name__ == '__main__':
asyncio.run(main())

For static data the solution is just fine, and it’s quicker than the previous one. But once you add buttons, especially with a default setting of quick reruns being on, code re-exution leads to:

File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 600, in _run_script
exec(code, module.__dict__)
File "/home/thorin/PycharmProjects/streamlit_asyncio/solution2.py", line 49, in <module>
asyncio.run(main())
File "/home/thorin/.pyenv/versions/3.12.3/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/home/thorin/.pyenv/versions/3.12.3/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/thorin/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/solution2.py", line 43, in main
products = await fetch_data()
^^^^^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/solution2.py", line 32, in fetch_data
count = await Product.find(fetch_links=True).count()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/beanie/odm/interfaces/find.py", line 273, in find
return cls.find_many(
^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/beanie/odm/interfaces/find.py", line 197, in find_many
args = cls._add_class_id_filter(args, with_children)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/beanie/odm/interfaces/find.py", line 458, in _add_class_id_filter
and cls._inheritance_inited
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/pydantic/_internal/_model_construction.py", line 242, in __getattr__
raise AttributeError(item)

Which is the cryptic error for beanie being not initialized. Well, that’s logical, we cache the connection, but not the event loop, which is created every single time with asyncio.run(). Should I cache the event loop too?

Solution 3: cache the event loop

import asyncio

from beanie import Document, init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
import streamlit as st


def get_client(event_loop=None):
if event_loop:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
io_loop=event_loop,
)
else:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
)
return client


class Product(Document):
name: str


async def init_database(client):
database = client.get_database(name='asyncio_streamlit_db')
await init_beanie(database=database, document_models=[Product])

COUNT = 10

async def fetch_data():
count = await Product.find(fetch_links=True).count()
if count < COUNT:
for i in range(COUNT):
await Product(name='test').save()
return await Product.find(fetch_links=True).limit(COUNT).to_list()

def get_event_loop():
return asyncio.new_event_loop()


if not st.session_state.get('event_loop'):
st.session_state.event_loop = get_event_loop()

if not st.session_state.get('client'):
st.session_state.client = get_client(event_loop=st.session_state.event_loop)

async def main():
await init_database(st.session_state.client)
products = await fetch_data()
for product in products:
st.write(product)
st.button("Quick rerun")

if __name__ == '__main__':
st.session_state.event_loop.run_until_complete(main())

What I’ve done is caching the event loop in the session state and pass it to the motor client. The asyncio.run() was replaced with

st.session_state.event_loop.run_until_complete(main())

Now every session has its own event loop. Unfortunately seemingly working solution will break, once you’ll try to execute multiple actions in parallel, you can easily achieve it by clicking the button rapidly multiple times.

RuntimeError: This event loop is already running
Traceback:
File "/home/thorin/PycharmProjects/streamlit_asyncio/venv/lib/python3.12/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 600, in _run_script
exec(code, module.__dict__)
File "/home/thorin/PycharmProjects/streamlit_asyncio/solution3.py", line 57, in <module>
st.session_state.event_loop.run_until_complete(main())
File "/home/thorin/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 663, in run_until_complete
self._check_running()
File "/home/thorin/.pyenv/versions/3.12.3/lib/python3.12/asyncio/base_events.py", line 622, in _check_running
raise RuntimeError('This event loop is already running')

Where it breaks:

  • we still have to call init_beanie every single time, which will slow down operations significantly
  • Once the event loop is started with run_until_complete no other eventlet can use it. You can, of course, wait until the task is finished, but this is not the path of a true samurai.

Solution 4: cache everything globally and execute in a separate thread (best so far)

Besides the session state, Streamlit also has a function to cache resources globally, st.cache_resource. If I cache the event loop globally just like that, I will run into the same problem as with the previous solution, but the collision is even more probable: now all the sessions share the same event loop.

The recommended solution for a multithreading application to enable asyncio properly is to use a worker thread. The thread is started with event loop’s run_forever and accepts asyncio tasks from other threads with asyncio.run_corouting_threadsafe, that sends the tasks to the worker thread and waits for the result.

For now, this solution works the best for me. This assures the rest of the code remains synchronous and only invokes so-unwanted asyncio functions when it’s needed.

Besides that, if you move your beanie code (models and init function) out of the script directory, its state won’t be reset each rerun, leaving init_beanie the result persistent. Full solution:

Script itself and utils.py

import asyncio
from asyncio import run_coroutine_threadsafe
from threading import Thread
import os
import sys
from motor.motor_asyncio import AsyncIOMotorClient
import streamlit as st

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from utils import Product, init_database


def get_client(event_loop=None):
if event_loop:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
io_loop=event_loop,
)
else:
client = AsyncIOMotorClient(
"mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.2",
)
return client


COUNT = 10


async def fetch_data():
count = await Product.find(fetch_links=True).count()
if count < COUNT:
for i in range(COUNT):
await Product(name='test').save()
await asyncio.sleep(5)
return await Product.find(fetch_links=True).limit(COUNT).to_list()


def get_event_loop():
return asyncio.new_event_loop()


@st.cache_resource(show_spinner=False)
def create_loop():
loop = asyncio.new_event_loop()
thread = Thread(target=loop.run_forever)
thread.start()
return loop, thread


st.session_state.event_loop, worker_thread = create_loop()


def run_async(coroutine):
return run_coroutine_threadsafe(coroutine, st.session_state.event_loop).result()


@st.cache_resource(show_spinner=False)
def setup_database():
if client := st.session_state.get("db_client"):
return client
client = get_client(event_loop=st.session_state.event_loop)
run_async(init_database(client=client))
return client


st.session_state.db_client = setup_database()


def main():
products = run_async(fetch_data())
for product in products:
st.write(product)
st.button("Quick rerun")


if __name__ == '__main__':
main()
from beanie import init_beanie, Document


class Product(Document):
name: str
async def init_database(client):
database = client.get_database(name='asyncio_streamlit_db')
await init_beanie(database=database, document_models=[Product])

Where it breaks:

  • Streamlit doesn’t have a way to catch the application or session shutdown or any kind of shutdown, so joining the worker thread is not possible. The thread remains alive and will keep the 8501 busy. Streamlit will connect to the next available port, but well, it’s not really optimal. Not a problem for Docker, but still a potential source of problems.

Final words

All the examples can be found here: https://github.com/thorin-schiffer/streamlit_asyncio/tree/master

Don’t make the mistake I made, trying to keep my entire Streamlit app async. It will break on multiple levels, I’ve tried to reach out to the Streamlit developers myself to encourage them to think of native asyncio support in some form, utility like run_async can be a part of the framework easily. This seems quite surprising to me because usually the Streamlit devs are quite responsive to the community needs and having an asyncio supercharged data source for data science dashboards, for which it was created, doesn’t seem such an unrealistic scenario.

In the current state, Streamlit doesn’t expect the code to be async, it breaks on many levels, one of the most annoying ones are the decorators like cache_resource or recently added majestic fragments are not going to work with asyncio coroutines. The same is unfortunately correct for the callbacks.

Please feel free to add your suggestions and comments, let’s find the best solution together! Ciao 🖖

Written by Thorin Schiffer.

--

--