Skip to main content

Testing - Python SDK feature guide

The Testing section of the Temporal Application development guide describes the frameworks that facilitate Workflow and integration testing.

In the context of Temporal, you can create these types of automated tests:

  • End-to-end: Running a Temporal Server and Worker with all its Workflows and Activities; starting and interacting with Workflows from a Client.
  • Integration: Anything between end-to-end and unit testing.
    • Running Activities with mocked Context and other SDK imports (and usually network requests).
    • Running Workers with mock Activities, and using a Client to start Workflows.
    • Running Workflows with mocked SDK imports.
  • Unit: Running a piece of Workflow or Activity code (a function or method) and mocking any code it calls.

We generally recommend writing the majority of your tests as integration tests.

Because the test server supports skipping time, use the test server for both end-to-end and integration tests with Workers.

Test frameworks

Some SDKs have support or examples for popular test frameworks, runners, or libraries.

One recommended framework for testing in Python for the Temporal SDK is pytest, which can help with fixtures to stand up and tear down test environments, provide useful test discovery, and make it easy to write parameterized tests.

Testing Activities

An Activity can be tested with a mock Activity environment, which provides a way to mock the Activity context, listen to Heartbeats, and cancel the Activity. This behavior allows you to test the Activity in isolation by calling it directly, without needing to create a Worker to run the Activity.

Run an Activity

If an Activity references its context, you need to mock that context when testing in isolation.

To run an Activity in a test, use the ActivityEnvironment class.

This class allows you to run any callable inside an Activity context. Use it to test the behavior of your code under various conditions.

Listen to Heartbeats

When an Activity sends a Heartbeat, be sure that you can see the Heartbeats in your test code so that you can verify them.

To test a Heartbeat in an Activity, use the on_heartbeat() property of the ActivityEnvironment class. This property sets a custom function that is called every time the activity.heartbeat() function is called within the Activity.

@activity.defn
async def activity_with_heartbeats(param: str):
activity.heartbeat(f"param: {param}")
activity.heartbeat("second heartbeat")

env = ActivityEnvironment()
heartbeats = []
# Set the `on_heartbeat` property to a callback function that will be called for each Heartbeat sent by the Activity.
env.on_heartbeat = lambda *args: heartbeats.append(args[0])
# Use the run method to start the Activity, passing in the function that contains the Heartbeats and any necessary parameters.
await env.run(activity_with_heartbeats, "test")
# Verify that the expected Heartbeats are received by the callback function.
assert heartbeats == ["param: test", "second heartbeat"]

Testing Workflows

How to mock Activities

Mock the Activity invocation when unit testing your Workflows.

When integration testing Workflows with a Worker, you can mock Activities by providing mock Activity implementations to the Worker.

Provide mock Activity implementations to the Worker.

import uuid
from temporalio.client import Client
from temporalio.worker import Worker

# Import your Activity Definition and real implementation
from hello.hello_activity import (
ComposeGreetingInput,
GreetingWorkflow,
compose_greeting,
)

# Define your mocked Activity implementation
@activity.defn(name="compose_greeting")
async def compose_greeting_mocked(input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name} from mocked activity!"

async def test_mock_activity(client: Client):
task_queue_name = str(uuid.uuid4())
# Provide the mocked Activity implementation to the Worker
async with Worker(
client,
task_queue=task_queue_name,
workflows=[GreetingWorkflow],
activities=[compose_greeting_mocked],
):
# Execute your Workflow as usual
assert "Hello, World from mocked activity!" == await client.execute_workflow(
GreetingWorkflow.run,
"World",
id=str(uuid.uuid4()),
task_queue=task_queue_name,
)

The mocked Activity implementation should have the same signature as the real implementation (including the input and output types) and the same name. When the Workflow invokes the Activity, it invokes the mocked implementation instead of the real one, allowing you to test your Workflow isolated.

How to skip time

Some long-running Workflows can persist for months or even years. Implementing the test framework allows your Workflow code to skip time and complete your tests in seconds rather than the Workflow's specified amount.

For example, if you have a Workflow sleep for a day, or have an Activity failure with a long retry interval, you don't need to wait the entire length of the sleep period to test whether the sleep function works. Instead, test the logic that happens after the sleep by skipping forward in time and complete your tests in a timely manner.

The test framework included in most SDKs is an in-memory implementation of Temporal Server that supports skipping time. Time is a global property of an instance of TestWorkflowEnvironment: skipping time (either automatically or manually) applies to all currently running tests. If you need different time behaviors for different tests, run your tests in a series or with separate instances of the test server. For example, you could run all tests with automatic time skipping in parallel, and then all tests with manual time skipping in series, and then all tests without time skipping in parallel.

Skip time automatically

You can skip time automatically in the SDK of your choice. Start a test server process that skips time as needed. For example, in the time-skipping mode, Timers, which include sleeps and conditional timeouts, are fast-forwarded except when Activities are running.

Use the start_time_skipping() method to start a test server process and skip time automatically.

Use the start_local() method for a full local Temporal Server.

Use the from_client() method for an existing Temporal Server.

Skip time manually

Learn to skip time manually in the SDK of your choice.

To implement time skipping, use the start_time_skipping() static method.

from temporalio.testing import WorkflowEnvironment

async def test_manual_time_skipping():
async with await WorkflowEnvironment.start_time_skipping() as env:
# Your code here
# You can use the env.sleep(seconds) method to manually advance time
await env.sleep(3) # This will advance time by 3 seconds
# Your code here

Assert in Workflow

The assert statement is a convenient way to insert debugging assertions into the Workflow context.

The assert method is available in Python and TypeScript.

For information about assert statements in Python, see assert in the Python Language Reference.

How to Replay a Workflow Execution

Replay recreates the exact state of a Workflow Execution. You can replay a Workflow from the beginning of its Event History.

Replay succeeds only if the Workflow Definition is compatible with the provided history from a deterministic point of view.

When you test changes to your Workflow Definitions, we recommend doing the following as part of your CI checks:

  1. Determine which Workflow Types or Task Queues (or both) will be targeted by the Worker code under test.
  2. Download the Event Histories of a representative set of recent open and closed Workflows from each Task Queue, either programmatically using the SDK client or via tctl.
  3. Run the Event Histories through replay.
  4. Fail CI if any error is encountered during replay.

The following are examples of fetching and replaying Event Histories:

To replay Workflow Executions, use the replay_workflows or replay_workflow methods, passing one or more Event Histories as arguments.

In the following example (which, as of server v1.18, requires Advanced Visibility to be enabled), Event Histories are downloaded from the server and then replayed. If any replay fails, the code raises an exception.

workflows = client.list_workflows(f"TaskQueue=foo and StartTime > '2022-01-01T12:00:00'")
histories = workflows.map_histories()
replayer = Replayer(
workflows=[MyWorkflowA, MyWorkflowB, MyWorkflowC]
)
await replayer.replay_workflows(histories)

In the next example, a single history is loaded from a JSON string:

replayer = Replayer(workflows=[YourWorkflow])
await replayer.replay_workflow(WorkflowHistory.from_json(history_json_str))

In both examples, if Event History is non-deterministic, an error is thrown. You can choose to wait until all histories have been replayed with replay_workflows by setting the fail_fast option to false.

note

If the Workflow History is exported by Temporal Web UI or through tctl, you can pass the JSON file history object as a JSON string or as a Python dictionary through the json.load() function, which takes a file object and returns the JSON object.