Testing async MongoDB AWS applications with pytest

Practical guide and snippets

Handmade Software
6 min readJun 7, 2024

--

This article shows real life techniques and fixtures needed to make the test suite of your MongoDB and AWS-based application usable and performant.

Unit tests

Unit tests are the foundation of reliable software. They allow us to test individual components in isolation. For async applications, pytest supports async tests using the pytest-asyncio plugin.

pytest-asyncio will handle the boilerplate of your async tests needing a running event loop, as they are async coroutines unlike the usual pytest tests. This can be further configured, but the most cases mode auto is enough.

Parallel execution

For parallel execution of tests, we will take the very well known xdist plugin, that allows to start multiple interpreters and through that increases the performance of our tests.

This means, in the first place, that the entire execution structure of our test suite will revolve around the idea of a bucket, the set of tests executed in one process and therefore sharing the same set of resources.

Database

Beanie, the async ODM based on motor, async driver for MongoDB will set up the database and collections for you, including the indexes (that behavior is enabled by default and can be disabled).

Here are the pytest fixtures you could use in your test suite:

@pytest.fixture(scope="function")
async def db_client(event_loop):
client = AsyncIOMotorClient(os.environ["MONGODB_URI"], io_loop=event_loop)
client.get_io_loop = asyncio.get_running_loop
return client
@pytest.fixture(scope="session")
async def db_name(worker_id):
return f"test_{DEFAULT_DATABASE}_{worker_id}"

db_name this makes sure the databases are isolated from each other and there is no overhead creating them again and again.

Initialize beanie with this fixture:

@pytest.fixture(scope="function")
async def beanie(event_loop, db_client, db_name: str):
await flush_database(db_name, db_client)
yield await init_database(database_name=db_name, client=db_client)

init_database is your function calling the init_beanie function. Use it wherever your beanie models are used in the tests like this:


@pytest.fixture
async def organization(beanie):
return await Organization(name="ProductFlight").save()

Mocking AWS resources with moto

Moto is a remarkable Python package for mocking the AWS resources for local unit tests, and unfortunately, it doesn’t support asyncio. In general, boto support for asyncio is limited and there are wrapper packages like aioboto3 that make it possible. Built-in moto patchers and fixtures are incompatible with that wrapper, so the only way is to use the moto standalone solution, in a separate process or threaded moto server. You can try to start a threaded moto server a single time for the entire test suite, or start per process, how it’s done in the following fixture:

@pytest.fixture(scope="session")
def moto_server(worker_id):
port_offset = 0 if worker_id == "master" else int(worker_id.replace("gw", ""))
port = 5000 + port_offset
server = ThreadedMotoServer(ip_address="localhost", port=port)
server.start()
aws_url = f"http://localhost:{port}"
os.environ["AWS_URL"] = aws_url
yield aws_url
server.stop()

worker_id is a xdist fixture, that allows you to distinguish different processes from each other while running the test suite in multiple processes in parallel. Extracting the number gives us the opportunity to set the correct AWS URL for underlying libraries.

The following three fixtures create a single account and STS role per test, making sure the tests are isolated from each other while executing:

@pytest.fixture
async def aws_account_number():
global account_number
account_number += 1
return account_number


@pytest.fixture
async def sts_role(sts_client, aws_account_number):
return await sts_client.assume_role(
RoleArn=f"arn:aws:iam::{aws_account_number}:role/my-role",
RoleSessionName="test-session-name",
ExternalId="test-external-id",
)

@pytest.fixture
async def aws_credentials(sts_role, monkeypatch):
credentials = sts_role["Credentials"]
monkeypatch.setenv("AWS_ACCESS_KEY_ID", credentials["AccessKeyId"])
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", credentials["SecretAccessKey"])
monkeypatch.setenv("AWS_SESSION_TOKEN", credentials["SessionToken"])
return credentials

With this util function allowing to apply the credentials:

async def get_async_client(service, credentials=None):
global aioboto_session
aioboto_session = aioboto_session or aioboto3.Session(region_name=REGION)
if credentials:
return aioboto_session.client(
service,
region_name=REGION,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
endpoint_url=os.getenv("AWS_URL") or None,
)
else:
return aioboto_session.client(
service,
region_name=REGION,
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
endpoint_url=os.getenv("AWS_URL") or None,
)

You can write a fixture for every client you need:

@pytest.fixture
async def sns_client(aws_credentials):
async with await get_async_client("sns", credentials=aws_credentials) as client:
yield client


@pytest.fixture
async def sqs_client(aws_credentials):
async with await get_async_client("sqs", credentials=aws_credentials) as client:
yield client


@pytest.fixture
async def sf_client(aws_credentials):
async with await get_async_client("stepfunctions", credentials=aws_credentials) as client:
yield client

Those fixtures are especially useful in the context of a lambda-based architecture.

Integration tests

Together with functionality checking unit tests, we also accompany our projects with integration tests. The entire concept has relatively blurry borders to other tests in my perception, so the key learnings I’ve made from the dozens of projects in the past:

  • end-to-end tests don’t work. They are always broken and are incredibly slow. Also, it’s challenging to understand if the test is broken or the code is broken.
  • integration tests should be small and easy to repeat. They should exactly test the smaller pieces of software individually and should clearly identify the culprit.
  • integration tests should use isolated data
  • no functional testing should be done at this point, unit tests are doing that already

Event log

So for event-oriented lambda-based AWS apps, we have come with the concept of event logging. Those are records in the backend of your choice, MongoDB or DynamoDB or Redis or whatever, with reasonable TTL that hold information about the events happening inside your backend. Such log as a beanie model could like following:

class Event(BaseDocument):
status: EventStatus = EventStatus.STARTED
batch_id: str | None = None
error_message: str | None = None
message: type[Message] | Message | None = None
scope: str = None

class Settings:
indexes = [
IndexModel(["created_at"], expireAfterSeconds=EVENT_EXPIRE_AFTER),
IndexModel([("batch_id", ASCENDING), ("scope", ASCENDING), ("created_at", ASCENDING)]),
]

Mind the TTL index with expireAfterSeconds and the batch_id — a field to track all events associated with a particular outsider trigger.

Once that is established, we could use a async context manager like that:

@asynccontextmanager
async def event_log(message: Message, scope: str):
event_message = EventMessage.from_message(message)
await Event(batch_id=event_message.batch_id, message=event_message, scope=scope).save()
try:
yield
except Exception as ex:
await Event(
batch_id=event_message.batch_id,
status=EventStatus.ERROR,
error_message=f"{ex.__class__.__name__}: {ex}",
message=event_message,
scope=scope,
).save()
raise
else:
await Event(batch_id=event_message.batch_id, status=EventStatus.OK, message=event_message, scope=scope).save()

to track the events within a lambda function like this:

async with event_log(message, scope=scope):
message.context = context
return await function(message)

where the message looks like this:

message = Message(
gtin=product.gtin,
batch_id=f"downloader-test-{integration}-{uuid.uuid4()}",
organization_id=organization.id,
product_id=product.id,
domain_id=domains[integration].id,
)

In general, if you practice a single entry point for all lambdas, it’s easier to introduce such middleware behavior, like event log creation.

Once we can guarantee that all calls are wrapped like that, we know every call will create a log entry.

Fixtures

In opposite to unit test fixtures, integral test fixtures are not ephemeral, meaning the created data is not disappearing before each test execution, which can pose quite a challenge for test isolation. Your fixtures have to be provisioned before the test starts. So compare two fixtures from unit test and from integration test:

@pytest.fixture
async def organization(beanie):
return await Organization(name="ProductFlight").save()
@pytest.fixture(scope="session")
async def organization(database):
return await Organization.find(Organization.slug == DEFAULT_ORGANIZATION).first_or_none()

Integration tests will persist the data, and every of your tests should use a different existing object in the test environment to execute the test in an isolated manner. Especially if you execute the tests in parallel, using pytest-xdist.

Success and failure assertion

Using the event log creation approach from above, we can come up with a following function for assertion:

async def assert_success_or_failure(
batch_id,
scope,
success_status=EventStatus.OK,
failure_status=EventStatus.ERROR,
timeout=60,
):
start = datetime.now()
events = []
while (elapsed := (datetime.now() - start).total_seconds()) < timeout:
logger.debug(f"{batch_id} waiting for {success_status} for {elapsed:.1f} sec")
events = await Event.get_for_batch_id(batch_id, scope)
failure = [e for e in events if e.status == failure_status]
if failure:
event_log = [f"[{e.status.value}][{e.scope}] {e.error_message or ''}" for e in events]
event_log = "\n".join(event_log)
pytest.fail(
f"[{batch_id}] [scope] ERROR,\nlogs available:\n\n{event_log}",
)
success = [e for e in events if e.status == success_status]
if success:
logger.info(f"[{batch_id}] {scope} test OK")
return
await asyncio.sleep(0.5)

pytest.fail(f"State assert failed, timeout, {batch_id} {scope}: {events}")

This function polls the lookup for event logs with a predefined batch_id. You could theoretically stream that events, but for integral tests, which execute on a scale of minute, 0.5 seconds polling doesn’t seem such a big deal.

Once that is set up, you can start the component you test through boto functionality for lambdas or step functions, like in the following example:

@pytest.mark.asyncio(scope="session")
@pytest.mark.enable_socket
@pytest.mark.parametrize(
"integration",
[integration for integration in source_integrations],
)
async def test_download_product(pulumi_outputs, lambda_client, organization, integration, get_product, domains):
function_arn_output = f"{integration.upper()}_DOWNLOAD_PRODUCT_ARN"
product = await get_product(integration, sample_gtin_index=0)

message = ProductMessage(
gtin=product.gtin,
batch_id=f"downloader-test-{integration}-{uuid.uuid4()}",
organization_id=organization.id,
product_id=product.id,
domain_id=domains[integration].id,
)
lambda_client.invoke(
FunctionName=pulumi_outputs[function_arn_output],
InvocationType="RequestResponse",
Payload=message.model_dump_json(),
)
await assert_success_or_failure(batch_id=message.batch_id, scope="download_product")

We use pulumi to get the function name from a corresponding test environment, you could use your own deployment system to achieve that.

Conclusion

Testing software is as important as creating it. That becomes even more important for the cloud environments, where many of the infrastructure objects cannot be imitated locally and a swift and effective test suite guarantees your software to be functional. Share your techniques and experiences in the comment section below. Ciao 🖖

Written by Thorin Schiffer

--

--