Skip to main content

Developer's guide - Features

The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.

WORK IN PROGRESS

This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.

If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.

In this section you can find the following:

Signals

A SignalLink preview iconWhat is a Signal?

A Signal is an asynchronous request to a Workflow Execution.

Learn more is a message sent to a running Workflow Execution.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

Define Signal

A Signal has a name and can have arguments.

Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter. The Receive() method on the Data Converter decodes the data into the Struct within the Workflow. Only public fields are serializable.

MySignal struct {
Message string // serializable
message string // not serializable
}

Handle Signal

Workflows listen for Signals by the Signal's name.

Use the GetSignalChannel() API from the go.temporal.io/sdk/workflow package to get the Signal Channel.

func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
signalChan.Receive(ctx, &signal)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}

In the example above, the Workflow code uses workflow.GetSignalChannel to open a workflow.Channel for the Signal type (identified by the Signal name).

Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost.

Send Signal from Client

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.

Use the SignalWorkflow() method on an instance of the Go SDK Temporal Client to send a SignalLink preview iconWhat is a Signal?

A Signal is an asynchronous request to a Workflow Execution.

Learn more to a Workflow Execution.

Pass in both the Workflow IdLink preview iconWhat is a Workflow Id?

A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.

Learn more and Run IdLink preview iconWhat is a Run Id?

A Run Id is a globally unique, platform-level identifier for a Workflow Execution.

Learn more to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.

// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...

Possible errors:

  • serviceerror.NotFound
  • serviceerror.Internal
  • serviceerror.Unavailable

Send Signal from Workflow

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow API from the go.temporal.io/sdk/workflow package.

// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}

Signal-With-Start

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

Use the SignalWithStartWorkflow() API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.

Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter

// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}

Queries

A QueryLink preview iconWhat is a Query?

A Query is a synchronous operation that is used to report the state of a Workflow Execution.

Learn more is a synchronous operation that is used to get the state of a Workflow Execution.

Define Query

A Query has a name and can have arguments.

In Go, a Query type, also called a Query name, is a string value.

queryType := "your_query_name"

Handle Query

Queries are handled by your Workflow.

Don’t include any logic that causes CommandLink preview iconWhat is a Command?

A Command is a requested action issued by a Worker to the Temporal Cluster after a Workflow Task Execution completes.

Learn more generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

Use the SetQueryHandler API from the go.temporal.io/sdk/workflow package to set a Query Handler that listens for a Query by name.

The handler must be a function that returns two values:

  1. A serializable result
  2. An error

The handler function can receive any number of input parameters, but all input parameters must be serializable. The following sample code sets up a Query Handler that handles the current_state Query type:

func YourWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = WithActivityOptions(ctx, yourActivityOptions)
err = ExecuteActivity(ctx, YourActivity, "your_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}

For example, suppose your query handler function takes two parameters:

err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})

Send Query

Queries are sent from a Temporal Client.

Use the QueryWorkflow() API or the QueryWorkflowWithOptions API on the Temporal Client to send a Query to a Workflow Execution.

// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...

You can pass an arbitrary number of arguments to the QueryWorkflow() function.

// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...

The QueryWorkflowWithOptions() API provides similar functionality, but with the ability to set additional configurations through QueryWorkflowWithOptionsRequest. When using this API, you will also receive a structured response of type QueryWorkflowWithOptionsResponse.

// ...
response, err := temporalClient.QueryWorkflowWithOptions(context.Background(), &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryType: queryType,
Args: args,
})
if err != nil {
// ...
}

Workflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Workflow timeouts are set when starting the Workflow ExecutionLink preview iconWorkflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Learn more.

Create an instance of StartWorkflowOptions from the go.temporal.io/sdk/client package, set a timeout, and pass the instance to the ExecuteWorkflow call.

Available timeouts are:

  • WorkflowExecutionTimeout
  • WorkflowRunTimeout
  • WorkflowTaskTimeout
workflowOptions := client.StartWorkflowOptions{
// ...
// Set Workflow Timeout duration
WorkflowExecutionTimeout: time.Hours * 24 * 365 * 10,
// WorkflowRunTimeout: time.Hours * 24 * 365 * 10,
// WorkflowTaskTimeout: time.Second * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}

Workflow retries

A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Use a Retry PolicyLink preview iconWhat is a Retry Policy?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.

Learn more to retry a Workflow Execution in the event of a failure.

Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.

Create an instance of a RetryPolicy from the go.temporal.io/sdk/temporal package and provide it as the value to the RetryPolicy field of the instance of StartWorkflowOptions.

retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
workflowOptions := client.StartWorkflowOptions{
RetryPolicy: retrypolicy,
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}

Activity timeouts

Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.

The following timeouts are available in the Activity Options.

An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.

To set an Activity Timeout in Go, create an instance of ActivityOptions from the go.temporal.io/sdk/workflow package, set the Activity Timeout field, and then use the WithActivityOptions() API to apply the options to the instance of workflow.Context.

Available timeouts are:

  • StartToCloseTimeout
  • ScheduleToClose
  • ScheduleToStartTimeout
activityoptions := workflow.ActivityOptions{
// Set Activity Timeout duration
ScheduleToCloseTimeout: 10 * time.Second,
// StartToCloseTimeout: 10 * time.Second,
// ScheduleToStartTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}

Activity retries

A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Activity Executions are automatically associated with a default Retry PolicyLink preview iconWhat is a Retry Policy?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.

Learn more if a custom one is not provided.

To set a RetryPolicyLink preview iconWhat is a Retry Policy?

A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.

Learn more, create an instance of ActivityOptions from the go.temporal.io/sdk/workflow package, set the RetryPolicy field, and then use the WithActivityOptions() API to apply the options to the instance of workflow.Context.

retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100, // 100 * InitialInterval
MaximumAttempts: 0, // Unlimited
NonRetryableErrorTypes: []string, // empty
}

Providing a Retry Policy here is a customization, and overwrites individual Field defaults.

retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}

activityoptions := workflow.ActivityOptions{
RetryPolicy: retrypolicy,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}

Activity retry simulator

Use this tool to visualize total Activity Execution times and experiment with different Activity timeouts and Retry Policies.

The simulator is based on a common Activity use-case, which is to call a third party HTTP API and return the results. See the example code snippets below.

Use the Activity Retries settings to configure how long the API request takes to succeed or fail. There is an option to generate scenarios. The Task Time in Queue simulates the time the Activity Task might be waiting in the Task Queue.

Use the Activity Timeouts and Retry Policy settings to see how they impact the success or failure of an Activity Execution.

Sample Activity

import axios from 'axios';

async function testActivity(url: string): Promise<void> {
await axios.get(url);
}

export default testActivity;

Activity Retries (in ms)

×

Activity Timeouts (in ms)

Retry Policy (in ms)

Success after 1 ms

{
"startToCloseTimeout": 10000,
"retryPolicy": {
"backoffCoefficient": 2,
"initialInterval": 1000
}
}

Activity Heartbeats

An Activity HeartbeatLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more is a ping from the Worker ProcessLink preview iconWhat is a Worker Process?

A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.

Learn more that is executing the Activity to the Temporal ClusterLink preview iconWhat is a Temporal Cluster?

A Temporal Cluster is the Temporal Server paired with persistence.

Learn more. Each Heartbeat informs the Temporal Cluster that the Activity ExecutionLink preview iconWhat is an Activity Execution?

An Activity Execution is the full chain of Activity Task Executions.

Learn more is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat TimeoutLink preview iconWhat is a Heartbeat Timeout?

A Heartbeat Timeout is the maximum time between Activity Heartbeats.

Learn more time period, the Activity will be considered failed and another Activity Task ExecutionLink preview iconWhat is an Activity Task Execution?

An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.

Learn more may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttledLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain a details field describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.

To HeartbeatLink preview iconWhat is an Activity Heartbeat?

An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.

Learn more in an Activity in Go, use the RecordHeartbeat API.

import (
// ...
"go.temporal.io/sdk/workflow"
// ...
)

func YourActivityDefinition(ctx, YourActivityDefinitionParam) (YourActivityDefinitionResult, error) {
// ...
activity.RecordHeartbeat(ctx, details)
// ...
}

When an Activity Task Execution times out due to a missed Heartbeat, the last value of the details variable above is returned to the calling Workflow in the details field of TimeoutError with TimeoutType set to Heartbeat.

You can also Heartbeat an Activity from an external source:

// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Record heartbeat.
err := temporalClient.RecordActivityHeartbeat(ctx, taskToken, details)

The parameters of the RecordActivityHeartbeat function are:

  • taskToken: The value of the binary TaskToken field of the ActivityInfo struct retrieved inside the Activity.
  • details: The serializable payload containing progress information.

If an Activity Execution Heartbeats its progress before it failed, the retry attempt will have access to the progress information, so that the Activity Execution can resume from the failed state. Here's an example of how this can be implemented:

func SampleActivity(ctx context.Context, inputArg InputParams) error {
startIdx := inputArg.StartIndex
if activity.HasHeartbeatDetails(ctx) {
// Recover from finished progress.
var finishedIndex int
if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil {
startIdx = finishedIndex + 1 // Start from next one.
}
}

// Normal Activity logic...
for i:=startIdx; i<inputArg.EndIdx; i++ {
// Code for processing item i goes here...
activity.RecordHeartbeat(ctx, i) // Report progress.
}
}