Skip to main content

Activities in Go

An Activity is the implementation of a particular task in the business logic. You should have a conceptual understanding of Temporal Activities before proceeding.

Overview#

The following example demonstrates a simple Activity Definition that accepts a string parameter, appends a word to it, and then returns a result.

package sample
import (    "context"
    "go.temporal.io/sdk/activity")
// SimpleActivity is a sample Temporal Activity Definition that takes one parameter and// returns a string containing the parameter value.func SimpleActivity(ctx context.Context, value string) (string, error) {    logger := activity.GetLogger(ctx)    logger.Info("SimpleActivity called.", "Value:", value)    return value, nil}

In the Workflow Definition, to invoke this Activity use worklow.ExecuteActivity(ctx, SimpleActivity).

Let's take a look at each component of this Activity.

Heart Beating#

For long-running Activities, Temporal provides an API for the Activity code to report both liveness and progress back to the Temporal managed service.

progress := 0for hasWork {    // Send heartbeat message to the server.    activity.RecordHeartbeat(ctx, progress)    // Do some work.    ...    progress++}

When an Activity times out due to a missed heartbeat, the last value of the details (progress in the above sample) is returned from the workflow.ExecuteActivity function as the details field of TimeoutError with TimeoutType set to Heartbeat.

You can also heartbeat an Activity from an external source:

// The client is a heavyweight object that should be created once per process.serviceClient, err := client.NewClient(client.Options{    HostPort:     HostPort,    Namespace:   Namespace,    MetricsScope: scope,})
// Record heartbeat.err := serviceClient.RecordActivityHeartbeat(ctx, taskToken, details)

The parameters of the RecordActivityHeartbeat function are:

  • taskToken: The value of the binary TaskToken field of the ActivityInfo struct retrieved inside the Activity.
  • details: The serializable payload containing progress information.

Cancellation#

A Workflow can request to cancel an Activity. When an Activity is cancelled, or its Workflow execution has completed or failed, the context passed into its function is cancelled, which also sets its channel’s closed state to Done. An Activity can use that to perform any necessary cleanup and abort its execution.

Cancellation is only delivered to Activities that record heartbeats:

  • The heartbeat request fails with a special error indicating that the Activity was cancelled. Heartbeats can also fail when the Workflow that invoked it is in a completed state.
  • The Activity should perform all necessary cleanup and report when it is done.
  • The Workflow can decide if it wants to wait for the Activity cancellation confirmation or proceed without waiting.

Synchronous Activity Execution#

The primary responsibility of a Workflow implementation is to schedule Activities for execution. The most straightforward way to do this is via the workflow.ExecuteActivity API. The following sample code demonstrates making this call:

ao := workflow.ActivityOptions{        TaskQueue:               "sampleTaskQueue",        ScheduleToCloseTimeout: time.Second * 60,        ScheduleToStartTimeout: time.Second * 60,        StartToCloseTimeout:    time.Second * 60,        HeartbeatTimeout:       time.Second * 10,        WaitForCancellation:    false,}ctx = workflow.WithActivityOptions(ctx, ao)
var result stringerr := workflow.ExecuteActivity(ctx, SimpleActivity, value).Get(ctx, &result)if err != nil {        return err}

Let's take a look at each component of this call.

Activity options#

Before calling workflow.ExecuteActivity(), you must configure ActivityOptions for the invocation. These options customize various execution timeouts, and are passed in by creating a child context from the initial context and overwriting the desired values. The child context is then passed into the workflow.ExecuteActivity() call. If multiple Activities are sharing the same option values, then the same context instance can be used when calling workflow.ExecuteActivity().

By default, Temporal retries failing Activities with these default values (exponential backoff starting from 1 second going up to 100 seconds):

  • InitialInterval of 1 second
  • BackoffCoefficient of 2.0
  • MaximumInterval of 100 seconds (100x multiple of InitialInterval)
  • MaximumAttempts of 0 (in other words, unlimited retries)

You can refer to our Retry Policy documentation for guidance on customizing your retry behavior.

Activity timeouts#

There can be various kinds of timeouts associated with an Activity. Temporal guarantees that Activities are executed at least once, so an Activity either succeeds or fails with one of the following timeouts:

TimeoutDescription
StartToCloseTimeoutMaximum time that a worker can take to process a task after it has received the task.
ScheduleToStartTimeoutTime a task can wait to be picked up by an Activity worker after a Workflow schedules it. If there are no workers available to process this task for the specified duration, the task will time out.
ScheduleToCloseTimeoutTime a task can take to complete after it is scheduled by a Workflow. This is usually greater than the sum of StartToClose and ScheduleToStart timeouts.
HeartbeatTimeoutIf a task doesn't heartbeat to the Temporal service for this duration, it will be considered to have failed. This is useful for long-running tasks.

ExecuteActivity call#

The first parameter in the call is the required workflow.Context object. This type is a copy of context.Context with the Done() method returning workflow.Channel instead of the native Go chan.

The second parameter is the function that we registered as an Activity function. This parameter can also be a string representing the name of the Activity function. The benefit of passing in the actual function object is that the framework can validate Activity parameters.

The remaining parameters are passed to the Activity as part of the call. In our example, we have a single parameter: value. This list of parameters must match the list of parameters declared by the Activity function. The Temporal Go SDK will validate this.

The method call returns immediately and returns a workflow.Future. This allows you to execute more code without having to wait for the scheduled Activity to complete.

When you are ready to process the results of the Activity, call the Get() method on the future object returned. The parameters to this method are the ctx object we passed to the workflow.ExecuteActivity() call and an output parameter that will receive the output of the Activity. The type of the output parameter must match the type of the return value declared by the Activity function. The Get() method will block until the Activity completes and results are available.

You can retrieve the result value returned by workflow.ExecuteActivity() from the future and use it like any normal result from a synchronous function call. The following sample code demonstrates how you can use the result if it is a string value:

future := workflow.ExecuteActivity(ctx, ActivityName, param1)
var result stringif err := future.Get(ctx, &result); err != nil {        return err}
switch result {case "apple":        // Do something.case "banana":        // Do something.default:        return err}

In this example, we called the Get() method on the returned future immediately after workflow.ExecuteActivity(). However, this is not necessary. If you want to execute multiple Activities in parallel, you can repeatedly call workflow.ExecuteActivity(), store the returned futures, and then wait for all Activities to complete by calling the Get() methods of the future at a later time.

To implement more complex wait conditions on returned future objects, use workflow.Selector. Learn more on the Go SDK Selectors page.

Asynchronous Activity Completion#

There are certain scenarios when you want to defer completing an Activity until much later. For example, you might have an application that requires user input in order to complete the Activity. You could do this with a polling mechanism, but a simpler and less resource-intensive implementation is to asynchronously complete a Temporal Activity.

There are two parts to implementing an asynchronously completed Activity:

  1. The Activity provides the information necessary for completion from an external system and notifies the Temporal service that it is waiting for that outside callback (with activity.ErrResultPending).
  2. The external service calls the Temporal service to complete the Activity (with client.CompleteActivity).

The following example demonstrates the first part:

// Retrieve the Activity information needed to asynchronously complete the Activity.activityInfo := activity.GetInfo(ctx)taskToken := activityInfo.TaskToken
// Send the taskToken to the external service that will complete the Activity....
// Return from the Activity a function indicating that Temporal should wait for an async completion// message.return "", activity.ErrResultPending

The following code demonstrates how to complete the Activity successfully:

// Instantiate a Temporal service client.// The same client can be used to complete or fail any number of Activities.// The client is a heavyweight object that should be created once per process.serviceClient, err := client.NewClient(client.Options{})
// Complete the Activity.client.CompleteActivity(context.Background(), taskToken, result, nil)

To fail the Activity, you would do the following:

// Fail the Activity.client.CompleteActivity(context.Background(), taskToken, nil, err)

Following are the parameters of the CompleteActivity function:

  • taskToken: The value of the binary TaskToken field of the ActivityInfo struct retrieved inside the Activity.
  • result: The return value to record for the Activity. The type of this value must match the type of the return value declared by the Activity function.
  • err: The error code to return if the Activity terminates with an error.

If error is not null, the value of the result field is ignored.

Get notified of updates