Skip to main content

Versioning - Python SDK feature guide

The definition code of a Temporal Workflow must be deterministic because Temporal uses event sourcing to reconstruct the Workflow state by replaying the saved history event data on the Workflow definition code. This means that any incompatible update to the Workflow Definition code could cause a non-deterministic issue if not handled correctly.

Introduction to Versioning

Because we design for potentially long running Workflows at scale, versioning with Temporal works differently. We explain more in this optional 30 minute introduction: https://www.youtube.com/watch?v=kkP899WxgzY

How to use the Python SDK Patching API

In principle, the Python SDK's patching mechanism operates similarly to other SDKs in a "feature-flag" fashion. However, the "versioning" API now uses the concept of "patching in" code.

To understand this, you can break it down into three steps, which reflect three stages of migration:

  • Running pre_patch_activity code while concurrently patching in post_patch_activity.
  • Running post_patch_activity code with deprecation markers for my-patch patches.
  • Running only the post_patch_activity code.

Let's walk through this process in sequence.

Suppose you have an initial Workflow version called pre_patch_activity:

View the source code in the context of the rest of the application code.

from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from activities import pre_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

Now, you want to update your code to run post_patch_activity instead. This represents your desired end state.

View the source code in the context of the rest of the application code.

from datetime import timedelta

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from activities import post_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

Problem: You cannot deploy post_patch_activity directly until you're certain there are no more running Workflows created using the pre_patch_activity code, otherwise you are likely to cause a non-deterministic error.

Instead, you'll need to deploy post_patched_activity and use the patched function to determine which version of the code to execute.

Implementing patching involves three steps:

  1. Use patched to patch in new code and run it alongside the old code.
  2. Remove the old code and apply deprecate_patch.
  3. Once you're confident that all old Workflows have finished executing, remove deprecate_patch.
View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

Patching in new code

Using patched inserts a marker into the Workflow History.

image

During replay, if a Worker encounters a history with that marker, it will fail the Workflow task when the Workflow code doesn't produce the same patch marker (in this case, my-patch). This ensures you can safely deploy code from post_patch_activity as a "feature flag" alongside the original version (pre_patch_activity).

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
if workflow.patched("my-patch"):
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
else:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

Understanding deprecated Patches in the Python SDK

After ensuring that all Workflows started with pre_patch_activity code have finished, you can deprecate the patch.

Deprecated patches serve as a bridge between pre_patch_activity and post_patch_activity. They function similarly to regular patches by adding a marker to the Workflow History. However, this marker won't cause a replay failure when the Workflow code doesn't produce it.

If, during the deployment of post_patch_activity, there are still live Workers running pre_patch_activity code and these Workers pick up Workflow histories generated by post_patch_activity, they will safely use the patched branch.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
workflow.deprecate_patch("my-patch")
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

Safe Deployment of post_patch_activity

You can safely deploy post_patch_activity once all Workflows labeled my-patch or earlier are finished, based on the previously mentioned assertion.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)

How to use Worker Versioning in Python

caution

Worker Versioning is currently in Pre-release, and backwards-incompatible changes are coming to the Worker Versioning APIs. For now, you need to provide dynamic configuration parameters to your Cluster to enable Worker Versioning:

temporal server start-dev \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
--dynamic-config-value worker.buildIdScavengerEnabled=true

To use Worker Versioning in Python, you need to do the following:

  1. Determine and assign a Build ID to your built Worker code, and opt in to versioning.
  2. Tell the Task Queue your Worker is listening on that Build ID, and whether it's compatible with an existing Build ID.

Assign a Build ID to your Worker

Let's say you've chosen deadbeef as your Build ID, which might be a short git commit hash (a reasonable choice as Build ID). To assign it in your Worker code, assign at least the following Worker Options:

# ...
worker = Worker(
task_queue="your_task_queue_name",
build_id="deadbeef",
use_worker_versioning=True,
# ... register workflows & activities, etc
)
# ...

That's all you need to do in your Worker code. Importantly, if you start this Worker, it won't receive any tasks yet. That's because you need to tell the Task Queue about your Worker's Build ID first.

Tell the Task Queue about your Worker's Build ID

Now you can use the SDK (or the Temporal CLI) to tell the Task Queue about your Worker's Build ID. You might want to do this as part of your CI deployment process.

# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewDefault("deadbeef")
)

This code adds the deadbeef Build ID to the Task Queue as the sole version in a new version set, which becomes the default for the queue. New Workflows execute on Workers with this Build ID, and existing ones will continue to process by appropriately compatible Workers.

If, instead, you want to add the Build ID to an existing compatible set, you can do this:

# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewCompatible("deadbeef", "some-existing-build-id")
)

This code adds deadbeef to the existing compatible set containing some-existing-build-id and marks it as the new default Build ID for that set.

You can also promote an existing Build ID in a set to be the default for that set:

# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteBuildIdWithinSet("deadbeef")
)

You can also promote an entire set to become the default set for the queue. New Workflows will start using that set's default build.

# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteSetByBuildId("deadbeef")
)

Specify versions for Commands

By default, Activities, Child Workflows, and Continue-as-New use the same compatible version set as the Workflow that invoked them if they're also using the same Task Queue.

If you want to override this behavior, you can specify your intent via the versioning_intent argument available on the methods you use to invoke these Commands.

For more information refer to the conceptual documentation.

For example, if you want to use the latest default version for an Activity, you can call it like so:

# ...
await workflow.execute_activity(
say_hello,
"hi",
versioning_intent=VersioningIntent.DEFAULT,
start_to_close_timeout=timedelta(seconds=5),
)
# ...