Skip to main content

Features - Python SDK feature guide

The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.

In this section you can find the following:

How to develop with Signals

A Signal is a message sent to a running Workflow Execution.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

A Signal has a name and can have arguments.

  • The name, also called a Signal type, is a string.
  • The arguments must be serializable. To define a Signal, set the Signal decorator @workflow.signal on the Signal function inside your Workflow.

Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.

Return values from Signal methods are ignored.

Customize names

You can have a name parameter to customize the Signal's name, otherwise it defaults to the name of the Signal method.

View the source code in the context of the rest of the application code.

from temporalio import workflow
# ...
# ...
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)

@workflow.signal
def exit(self) -> None:
# ...
@workflow.signal(name="Custom Signal Name")
async def custom_signal(self, name: str) -> None:
await self._pending_greetings.put(name)

Workflows listen for Signals by the Signal's name.

To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.

View the source code in the context of the rest of the application code.

from temporalio.client import Client
# ...
# ...
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

How to send a Signal from a Temporal Client

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.

To send a Signal from the Client, use the signal() function on the Workflow handle.

To get the Workflow handle, you can use any of the following options.

View the source code in the context of the rest of the application code.

from temporalio.client import Client
# ...
# ...
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
GreetingWorkflow.run,
id="your-greeting-workflow",
task_queue="signal-tq",
)
await handle.signal(GreetingWorkflow.submit_greeting, "User 1")

How to send a Signal from a Workflow

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

Use get_external_workflow_handle_for to get a typed Workflow handle to an existing Workflow by its identifier. Use get_external_workflow_handle when you don't know the type of the other Workflow.

note

The Workflow Type passed is only for type annotations and not for validation.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class WorkflowB:
@workflow.run
async def run(self) -> None:
handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
await handle.signal(WorkflowA.your_signal, "signal argument")

How to Signal-With-Start

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

To send a Signal-With-Start in Python, use the start_workflow() method and pass the start_signal argument with the name of your Signal.

View the source code in the context of the rest of the application code.

from temporalio.client import Client
# ...
# ...
async def main():
client = await Client.connect("localhost:7233")
await client.start_workflow(
GreetingWorkflow.run,
id="your-signal-with-start-workflow",
task_queue="signal-tq",
start_signal="submit_greeting",
start_signal_args=["User Signal with Start"],
)

How to develop with Queries

A Query is a synchronous operation that is used to get the state of a Workflow Execution.

How to define a Query

A Query has a name and can have arguments.

  • The name, also called a Query type, is a string.
  • The arguments must be serializable.

To define a Query, set the Query decorator @workflow.query on the Query function inside your Workflow.

Customize names

You can have a name parameter to customize the Query's name, otherwise it defaults to the name of the Query method.

note

You can either set the name or the dynamic parameter in a Query's decorator, but not both.

View the source code in the context of the rest of the application code.

# ...
@workflow.query
def greeting(self) -> str:
return self._greeting

How to handle a Query

Queries are handled by your Workflow.

Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

To send a Query to the Workflow, use the query method from the WorkflowHandle class.

View the source code in the context of the rest of the application code.


# ...
result = await handle.query(GreetingWorkflow.greeting)

How to send a Query

Queries are sent from a Temporal Client.

To send a Query to a Workflow Execution from Client code, use the query() method on the Workflow handle.

View the source code in the context of the rest of the application code.

# ...
result = await handle.query(GreetingWorkflow.greeting)

How to develop with Updates

An Update is an operation that can mutate the state of a Workflow Execution and return a response.

How to define an Update

Workflow Updates handlers are methods in your Workflow Definition designed to handle updates. These updates can be triggered during the lifecycle of a Workflow Execution.

Define an Update Handler

To define an update handler, use the @workflow.update decorator on a method within your Workflow. This decorator can be applied to both asynchronous and synchronous methods.

  • Decorator Usage: Apply @workflow.update to the method intended to handle updates.
  • Overriding: If a method with this decorator is overridden, the overriding method should also be decorated with @workflow.update.
  • Validator Method: Optionally, you can define a validator method for the update handler. This validator is specified using @update_handler_method_name.validator and is invoked before the update handler.
  • Method Parameters: Update handlers should only use positional parameters. For non-dynamic methods, it's recommended to use a single parameter that is an object or data class, which allows for future expansion of fields.
  • Return Values: The update handler can return a serializable value. This value is sent back to the caller of the update.
# ...
@workflow.update
async def update_workflow_status(self) -> str:
self.is_complete = True
return "Workflow status updated"

How to send an Update from a Temporal Client

To send a Workflow Update from a Temporal Client, call the execute_update method on the WorkflowHandle class.

# ...
update_result = await handle.execute_update(
HelloWorldWorkflow.update_workflow_status
)
print(f"Update Result: {update_result}")

What is a Dynamic Handler

Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.

Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.

caution

Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.

Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.

Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.

How to set a Dynamic Workflow

A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered. A Workflow can be made dynamic by adding dynamic=True to the @workflow.defn decorator. You must register the Workflow with the Worker before it can be invoked.

The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

How to set a Dynamic Activity

A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered. An Activity can be made dynamic by adding dynamic=True to the @activity.defn decorator. You must register the Activity with the Worker before it can be invoked.

The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

View the source code in the context of the rest of the application code.

# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

How to set a Dynamic Signal

A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered. A Signal can be made dynamic by adding dynamic=True to the @signal.defn decorator.

The Signal Handler should accept self, a string input, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

View the source code in the context of the rest of the application code.

# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)

How to set a Dynamic Query

A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered. A Query can be made dynamic by adding dynamic=True to the @query.defn decorator.

The Query Handler should accept self, a string name, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

View the source code in the context of the rest of the application code.

# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting

Workflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Workflow timeouts are set when starting the Workflow Execution.

Set the timeout to either the start_workflow() or execute_workflow() asynchronous methods.

Available timeouts are:

  • execution_timeout
  • run_timeout
  • task_timeout
View the source code in the context of the rest of the application code.

# ...
result = await client.execute_workflow(
YourWorkflow.run,
"your timeout argument",
id="your-workflow-id",
task_queue="your-task-queue",
# Set Workflow Timeout duration
execution_timeout=timedelta(seconds=2),
# run_timeout=timedelta(seconds=2),
# task_timeout=timedelta(seconds=2),
)

Workflow retries

A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Use a Retry Policy to retry a Workflow Execution in the event of a failure.

Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.

Set the Retry Policy to either the start_workflow() or execute_workflow() asynchronous methods.

View the source code in the context of the rest of the application code.

# ...
handle = await client.execute_workflow(
YourWorkflow.run,
"your retry policy argument",
id="your-workflow-id",
task_queue="your-task-queue",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)

How to set Activity timeouts

Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.

The following timeouts are available in the Activity Options.

An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.

Activity options are set as keyword arguments after the Activity arguments.

Available timeouts are:

  • schedule_to_close_timeout
  • schedule_to_start_timeout
  • start_to_close_timeout
View the source code in the context of the rest of the application code.

# ...
activity_timeout_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Activity Timeout option"),
# Activity Execution Timeout
start_to_close_timeout=timedelta(seconds=10),
# schedule_to_start_timeout=timedelta(seconds=10),
# schedule_to_close_timeout=timedelta(seconds=10),
)

How to set an Activity Retry Policy

A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Activity Executions are automatically associated with a default Retry Policy if a custom one is not provided.

To create an Activity Retry Policy in Python, set the RetryPolicy class within the start_activity() or execute_activity() function.

View the source code in the context of the rest of the application code.

from temporalio.common import RetryPolicy
# ...
activity_result = await workflow.execute_activity(
your_activity,
YourParams(greeting, "Retry Policy options"),
start_to_close_timeout=timedelta(seconds=10),
# Retry Policy
retry_policy=RetryPolicy(
backoff_coefficient=2.0,
maximum_attempts=5,
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=2),
# non_retryable_error_types=["ValueError"],
),
)

How to Heartbeat an Activity

An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Cluster. Each Heartbeat informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttled by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain a details field describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.

To Heartbeat an Activity Execution in Python, use the heartbeat() API.

@activity.defn
async def your_activity_definition() -> str:
activity.heartbeat("heartbeat details!")

In addition to obtaining cancellation information, Heartbeats also support detail data that persists on the server for retrieval during Activity retry. If an Activity calls heartbeat(123, 456) and then fails and is retried, heartbeat_details returns an iterable containing 123 and 456 on the next Run.

How to set a Heartbeat Timeout

A Heartbeat Timeout works in conjunction with Activity Heartbeats.

heartbeat_timeout is a class variable for the start_activity() function used to set the maximum time between Activity Heartbeats.

workflow.start_activity(
activity="your-activity",
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)

execute_activity() is a shortcut for start_activity() that waits on its result.

To get just the handle to wait and cancel separately, use start_activity(). execute_activity() should be used in most cases unless advanced task capabilities are needed.

workflow.execute_activity(
activity="your-activity",
name,
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)

How to asynchronously complete an Activity

Asynchronous Activity Completion enables the Activity Function to return without the Activity Execution completing.

There are three steps to follow:

  1. The Activity provides the external system with identifying information needed to complete the Activity Execution. Identifying information can be a Task Token, or a combination of Namespace, Workflow Id, and Activity Id.
  2. The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
  3. The Temporal Client is used to Heartbeat and complete the Activity.

To mark an Activity as completing asynchronously, do the following inside the Activity.

# Capture token for later completion
captured_token = activity.info().task_token
activity.raise_complete_async()

To update an Activity outside the Activity, use the get_async_activity_handle() method to get the handle of the Activity.

handle = my_client.get_async_activity_handle(task_token=captured_token)

Then, on that handle, you can call the results of the Activity, heartbeat, complete, fail, or report_cancellation method to update the Activity.

await handle.complete("Completion value.")

Cancel an Activity from a Workflow

Canceling an Activity from within a Workflow requires that the Activity Execution sends Heartbeats and sets a Heartbeat Timeout. If the Heartbeat is not invoked, the Activity cannot receive a cancellation request. When any non-immediate Activity is executed, the Activity Execution should send Heartbeats and set a Heartbeat Timeout to ensure that the server knows it is still working.

When an Activity is canceled, an error is raised in the Activity at the next available opportunity. If cleanup logic needs to be performed, it can be done in a finally clause or inside a caught cancel error. However, for the Activity to appear canceled the exception needs to be re-raised.

note

Unlike regular Activities, Local Activities can be canceled if they don't send Heartbeats. Local Activities are handled locally, and all the information needed to handle the cancellation logic is available in the same Worker process.

To cancel an Activity from a Workflow Execution, call the cancel() method on the Activity handle that is returned from start_activity().

@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
try:
while True:
print("Heartbeating cancel activity")
await asyncio.sleep(0.5)
activity.heartbeat("some details")
except asyncio.CancelledError:
print("Activity cancelled")
raise


@activity.defn
async def run_activity(input: ComposeArgsInput):
print("Executing activity")
return input.arg1 + input.arg2

@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: ComposeArgsInput) -> None:
activity_handle = workflow.start_activity(
cancellable_activity,
ComposeArgsInput(input.arg1, input.arg2),
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
)

await asyncio.sleep(3)
activity_handle.cancel()
note

The Activity handle is a Python task. By calling cancel(), you're essentially requesting the task to be canceled.

How to interrupt a Workflow Execution

You can interrupt a Workflow Execution in one of the following ways:

The following are the main differences between canceling and terminating a Workflow in Temporal:

Cancel

Canceling a Workflow provides a graceful way to stop Workflow Execution. This action resembles sending a SIGTERM to a process.

  • The system records a WorkflowExecutionCancelRequested event in the Workflow History.
  • A Workflow Task gets scheduled to process the cancelation.
  • The Workflow code can handle the cancelation and execute any cleanup logic.
  • The system doesn't forcefully stop the Workflow.

For more information, see How to cancel a Workflow Execution.

Terminate

Terminating a Workflow forcefully stops Workflow Execution. This action resembles killing a process.

  • The system records a WorkflowExecutionTerminated event in the Workflow History.
  • The termination forcefully and immediately stops the Workflow Execution.
  • The Workflow code gets no chance to handle termination.
  • A Workflow Task doesn't get scheduled.

For more information, see How to terminate a Workflow Execution.

Summary

In summary:

  • Canceling provides a graceful way to stop the Workflow and allows it to handle cancelation logic.
  • Termination forcefully stops the Workflow and prevents any further events.

In most cases, canceling is preferable because it allows the Workflow to finish gracefully. Terminate only if the Workflow is stuck and cannot be canceled normally.

How to cancel a Workflow Execution in Python

To cancel a Workflow Execution in Python, use the cancel() function on the Workflow handle.

await client.get_workflow_handle("your_workflow_id").cancel()

How to terminate a Workflow Execution in Python

To terminate a Workflow Execution in Python, use the terminate() function on the Workflow handle.

await client.get_workflow_handle("your_workflow_id").terminate()

How to start a Child Workflow Execution

A Child Workflow Execution is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.

When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiated, ChildWorkflowExecutionStarted, ChildWorkflowExecutionCompleted, etc...) are logged in the Workflow Execution Event History.

Always block progress until the ChildWorkflowExecutionStarted Event is logged to the Event History to ensure the Child Workflow Execution has started. After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close Policy set in the Child Workflow Options.

To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.

Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.

To spawn a Child Workflow Execution in Python, use the execute_child_workflow() function which starts the Child Workflow and waits for completion or use the start_child_workflow() function to start a Child Workflow and return its handle. This is useful if you want to do something after it has only started, or to get the Workflow/Run ID, or to be able to signal it while running.

note

execute_child_workflow() is a helper function for start_child_workflow() plus await handle.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class ComposeGreetingWorkflow:
@workflow.run
async def run(self, input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_child_workflow(
ComposeGreetingWorkflow.run,
ComposeGreetingInput("Hello", name),
id="hello-child-workflow-workflow-child-id",
# ...
)

How to set a Parent Close Policy

A Parent Close Policy determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).

The default Parent Close Policy option is set to terminate the Child Workflow Execution.

Set the parent_close_policy parameter inside the start_child_workflow function or the execute_child_workflow() function to specify the behavior of the Child Workflow when the Parent Workflow closes.

View the source code in the context of the rest of the application code.

from temporalio.workflow import ParentClosePolicy
# ...
# ...
@workflow.defn
class ComposeGreetingWorkflow:
@workflow.run
async def run(self, input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_child_workflow(
ComposeGreetingWorkflow.run,
ComposeGreetingInput("Hello", name),
id="hello-child-workflow-workflow-child-id",
parent_close_policy=ParentClosePolicy.ABANDON,
)

How to Continue-As-New

Continue-As-New enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.

To Continue-As-New in Python, call the continue_as_new() function from inside your Workflow, which will stop the Workflow immediately and Continue-As-New.

View the source code in the context of the rest of the application code.

# ...
@workflow.defn
class LoopingWorkflow:
@workflow.run
async def run(self, iteration: int) -> None:
if iteration == 5:
return
await asyncio.sleep(10)
workflow.continue_as_new(iteration + 1)

What is a Timer?

A Workflow can set a durable timer for a fixed time period. In some SDKs, the function is called sleep(), and in others, it's called timer().

A Workflow can sleep for months. Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep() call will resolve and your code will continue executing.

Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.

To set a Timer in Python, call the asyncio.sleep() function and pass the duration in seconds you want to wait before continuing.

View the source code in the context of the rest of the application code.

# ...
await asyncio.sleep(10)

How to Schedule a Workflow

Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes

Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.

How to Create a Scheduled Workflow

The create action enables you to create a new Schedule. When you create a new Schedule, a unique Schedule ID is generated, which you can use to reference the Schedule in other Schedule commands.

To create a Scheduled Workflow Execution in Python, use the create_schedule() asynchronous method on the Client. Then pass the Schedule ID and the Schedule object to the method to create a Scheduled Workflow Execution. Set the action parameter to ScheduleActionStartWorkflow to start a Workflow Execution. Optionally, you can set the spec parameter to ScheduleSpec to specify the schedule or set the intervals parameter to ScheduleIntervalSpec to specify the interval. Other options include: cron_expressions, skip, start_at, and jitter.

View the source code in the context of the rest of the application code.

# ...
async def main():
client = await Client.connect("localhost:7233")

await client.create_schedule(
"workflow-schedule-id",
Schedule(
action=ScheduleActionStartWorkflow(
YourSchedulesWorkflow.run,
"my schedule arg",
id="schedules-workflow-id",
task_queue="schedules-task-queue",
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
),
state=ScheduleState(note="Here's a note on my Schedule."),
),
)

How to Backfill a Scheduled Workflow

The backfill action executes Actions ahead of their specified time range. This command is useful when you need to execute a missed or delayed Action, or when you want to test the Workflow before its scheduled time.

To Backfill a Scheduled Workflow Execution in Python, use the backfill() asynchronous method on the Schedule Handle.

View the source code in the context of the rest of the application code.

import asyncio
from datetime import datetime, timedelta

from temporalio.client import Client, ScheduleBackfill, ScheduleOverlapPolicy



async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)
now = datetime.utcnow()
(
await handle.backfill(
ScheduleBackfill(
start_at=now - timedelta(minutes=10),
end_at=now - timedelta(minutes=9),
overlap=ScheduleOverlapPolicy.ALLOW_ALL,
),
),
)

How to Delete a Scheduled Workflow

The delete action enables you to delete a Schedule. When you delete a Schedule, it does not affect any Workflows that were started by the Schedule.

To delete a Scheduled Workflow Execution in Python, use the delete() asynchronous method on the Schedule Handle.

View the source code in the context of the rest of the application code.



async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.delete()

How to Describe a Scheduled Workflow

The describe action shows the current Schedule configuration, including information about past, current, and future Workflow Runs. This command is helpful when you want to get a detailed view of the Schedule and its associated Workflow Runs.

To describe a Scheduled Workflow Execution in Python, use the describe() asynchronous method on the Schedule Handle. You can get a complete list of the attributes of the Scheduled Workflow Execution from the ScheduleDescription class.

View the source code in the context of the rest of the application code.

# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

desc = await handle.describe()

print(f"Returns the note: {desc.schedule.state.note}")

How to List a Scheduled Workflow

The list action lists all the available Schedules. This command is useful when you want to view a list of all the Schedules and their respective Schedule IDs.

To list all schedules, use the list_schedules() asynchronous method on the Client. If a schedule is added or deleted, it may not be available in the list immediately.

View the source code in the context of the rest of the application code.

# ...
async def main() -> None:
client = await Client.connect("localhost:7233")
async for schedule in await client.list_schedules():
print(f"List Schedule Info: {schedule.info}.")

How to Pause a Scheduled Workflow

The pause action enables you to pause and unpause a Schedule. When you pause a Schedule, all the future Workflow Runs associated with the Schedule are temporarily stopped. This command is useful when you want to temporarily halt a Workflow due to maintenance or any other reason.

To pause a Scheduled Workflow Execution in Python, use the pause() asynchronous method on the Schedule Handle. You can pass a note to the pause() method to provide a reason for pausing the schedule.

View the source code in the context of the rest of the application code.

# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.pause(note="Pausing the schedule for now")

How to Trigger a Scheduled Workflow

The trigger action triggers an immediate action with a given Schedule. By default, this action is subject to the Overlap Policy of the Schedule. This command is helpful when you want to execute a Workflow outside of its scheduled time.

To trigger a Scheduled Workflow Execution in Python, use the trigger() asynchronous method on the Schedule Handle.

View the source code in the context of the rest of the application code.

# ...
async def main():
client = await Client.connect("localhost:7233")
handle = client.get_schedule_handle(
"workflow-schedule-id",
)

await handle.trigger()

How to Update a Scheduled Workflow

The update action enables you to update an existing Schedule. This command is useful when you need to modify the Schedule's configuration, such as changing the start time, end time, or interval.

Create a function that takes ScheduleUpdateInput and returns ScheduleUpdate. To update a Schedule, use a callback to build the update from the description. The following example updates the Schedule to use a new argument.

View the source code in the context of the rest of the application code.

# ...
async def update_schedule_simple(input: ScheduleUpdateInput) -> ScheduleUpdate:
schedule_action = input.description.schedule.action

if isinstance(schedule_action, ScheduleActionStartWorkflow):
schedule_action.args = ["my new schedule arg"]
return ScheduleUpdate(schedule=input.description.schedule)

How to use Temporal Cron Jobs

A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.

A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.

You can set each Workflow to repeat on a schedule with the cron_schedule option from either the start_workflow() or execute_workflow() asynchronous methods.

View the source code in the context of the rest of the application code.

# ...
result = await client.execute_workflow(
CronWorkflow.run,
id="your-workflow-id",
task_queue="your-task-queue",
cron_schedule="* * * * *",
)
print(f"Results: {result}")

How to use Start Delay

Use the start_delay to schedule a Workflow Execution at a specific one-time future point rather than on a recurring schedule.

Use the start_delay option in either the start_workflow() or execute_workflow() asynchronous methods in the Client.

async def main():
client = await Client.connect("localhost:7233")

result = await client.execute_workflow(
YourWorkflow.run,
"your name",
id="your-workflow-id",
task_queue="your-task-queue",
start_delay=timedelta(hours=1, minutes=20, seconds=30)
)

print(f"Result: {result}")


if __name__ == "__main__":
asyncio.run(main())