Interceptors - Python SDK
The behavior of the Python SDK can be customized in many useful ways by modifying inbound and outbound calls using Interceptors. This is similar to the use of "middleware" in web frameworks such as Django, Starlette, and Flask.
The methods you implement on your Interceptor classes can perform arbitrary side effects. Interceptors can also perform arbitrary modifications to incoming and outgoing data before it is received by the SDK's "real" implementation.
There are five categories of inbound and outbound calls that you can modify in this way:
Outbound Client calls
start_workflow()
signal_workflow()
list_workflows()
update_schedule()
This is not an exhaustive list; refer to the Python SDK methods for more.
Inbound Workflow calls
execute_workflow()
(i.e. handle a Workflow Task that is starting a new Workflow Execution)handle_query()
handle_signal()
handle_update_handler()
handle_update_validator()
Outbound Workflow calls
start_activity()
start_child_workflow()
signal_child_workflow()
signal_external_workflow()
start_nexus_operation()
start_local_activity()
Inbound Activity calls
execute_activity()
- i.e. handle a task to execute an Activity (this is the only Inbound Activity call)
Outbound Activity calls
info()
heartbeat()
The first of these categories is a Client call, and the remaining 4 are Worker calls.
Client call Interceptors
To modify outbound Client calls, define a class inheriting from client.Interceptor
, and implement the method intercept_client()
to return an instance of OutboundInterceptor
that implements the subset of outbound Client calls that you wish to modify.
This example implements an Interceptor on outbound Client calls that sets a certain key in the outbound headers
field.
A User ID is context-propagated by being sent in a header field with outbound requests:
class ContextPropagationInterceptor(
temporalio.client.Interceptor, temporalio.worker.Interceptor
):
def __init__(
self,
payload_converter: temporalio.converter.PayloadConverter = temporalio.converter.default().payload_converter,
) -> None:
self._payload_converter = payload_converter
def intercept_client(
self, next: temporalio.client.OutboundInterceptor
) -> temporalio.client.OutboundInterceptor:
return _ContextPropagationClientOutboundInterceptor(
next, self._payload_converter
)
def set_header_from_context(
input: _InputWithHeaders, payload_converter: temporalio.converter.PayloadConverter
) -> None:
user_id_val = user_id.get()
if user_id_val:
input.headers = {
**input.headers,
HEADER_KEY: payload_converter.to_payload(user_id_val),
}
class _ContextPropagationClientOutboundInterceptor(
temporalio.client.OutboundInterceptor
):
def __init__(
self,
next: temporalio.client.OutboundInterceptor,
payload_converter: temporalio.converter.PayloadConverter,
) -> None:
super().__init__(next)
self._payload_converter = payload_converter
async def start_workflow(
self, input: temporalio.client.StartWorkflowInput
) -> temporalio.client.WorkflowHandle[Any, Any]:
set_header_from_context(input, self._payload_converter)
return await super().start_workflow(input)
It often happens that your Worker and Client interceptors will share code because they implement closely related logic.
In the Python SDK, you will typically want to create an interceptor class that inherits from both client.Interceptor
and worker.Interceptor
as above, since their method sets do not overlap.
You can then pass this in the interceptors
argument of Client.connect()
in your client/starter code:
client = await Client.connect(
"localhost:7233",
interceptors=[interceptor.ContextPropagationInterceptor()],
)
The interceptors
list can contain multiple interceptors.
In this case they form a chain: a method implemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on to the corresponding method on the next interceptor in the list.
Your interceptor classes need not implement every method; the default implementation is always to pass the data on to the next method in the interceptor chain. During execution, when the SDK encounters an Inbound Activity call, it will look to the first Interceptor instance, get hold of the appropriate intercepted method, and call it. The intercepted method will perform its function then call the same method on the next Interceptor in the chain. At the end of the chain the SDK will call the "real" SDK method.
Worker call Interceptors
To modify inbound and outbound Workflow and Activity calls, define a class inheriting from worker.Interceptor
.
This is an interface with two methods named intercept_activity
and workflow_interceptor_class
, which you can use to configure interceptions of Activity and Workflow calls, respectively.
intercept_activity
returns an ActivityInboundInterceptor
.
This example demonstrates using an interceptor to measure Schedule-To-Start latency:
from temporalio.worker import (
ActivityInboundInterceptor,
ExecuteActivityInput,
Interceptor,
Worker,
)
class SimpleWorkerInterceptor(Interceptor):
def intercept_activity(
self, next: ActivityInboundInterceptor
) -> ActivityInboundInterceptor:
return CustomScheduleToStartInterceptor(next)
class CustomScheduleToStartInterceptor(ActivityInboundInterceptor):
async def execute_activity(self, input: ExecuteActivityInput):
schedule_to_start = (
activity.info().started_time
- activity.info().current_attempt_scheduled_time
)
meter = activity.metric_meter()
histogram = meter.create_histogram_timedelta(
"custom_activity_schedule_to_start_latency",
description="Time between activity scheduling and start",
unit="duration",
)
histogram.record(
schedule_to_start, {"workflow_type": activity.info().workflow_type}
)
return await self.next.execute_activity(input)
client = await Client.connect(
"localhost:7233",
)
worker = Worker(
client,
interceptors=[SimpleWorkerInterceptor()],
# ...
)
If you are inheriting methods from both client.Interceptor
and worker.Interceptor
, you should not pass your Interceptors directly to the Worker()
constructor — instead, pass it to Client.connect()
.
This will allow a Worker to use them from the underlying Client.
In other words, only pass the Interceptors to the Worker()
if you are not using Client methods.
The workflow_interceptor_class
returns a WorkflowInboundInterceptor
that works similarly to ActivityInboundInterceptor
.