Skip to main content

Workflows in Go

What is a Workflow?#

In the Temporal Go SDK programming model, a Workflow is an exportable function.

package app
import (    "go.temporal.io/sdk/workflow")
func SimpleWorkflow(ctx workflow.Context, value string) (string, error) {  // Do something  if err != nil {    return "", err  }  return "success", nil}

The first parameter, workflow.Context is a requirement for all Workflow functions as it is used by the Temporal Go SDK to pass around an execution context, and virtually all the Go SDK APIs that are callable from the Workflow require it.

note

This workflow.Context entity operates similarly to the standard context.Context entity provided by Go. The only difference is that the Done() function provided by workflow.Context returns workflow.Channel instead of the standard Go chan.

The second parameter, string, is a custom parameter that can be used to pass data into the Workflow when it starts. A Workflow can have one or more such parameters. However, we recommend having a single parameter that is of a struct type to support backward compatibility if new parameters are added.

note

All Workflow function parameters must be serializable, which essentially means that params can’t be channels, functions, variadic, or unsafe pointers.

A Workflow can return an err or a value, err. Again, if there is a chance that the return value might change, use a struct type to hold the values. Returning an error from a Workflow is used to indicate that an error was encountered during its execution and the Workflow should be terminated.

How to write Workflow code#

There is a single requirement for how the code inside a Workflow is written. Workflow code must be "deterministic". This requirement stems from how the Temporal Server tracks the state of code execution and its need to be able to replay an execution.

In practical terms, this means the following:

  • Workflow code can only read and manipulate local variables or variables received as return values from Temporal Go SDK APIs. For example, Workflows should never read a configuration directly as it may change in the middle of a Workflow Execution, thus breaking "determinism". Use a SideEffect, MutableSideEffect, or an Activity to load configuration values.
  • Workflow code can not affect changes in external systems directly.
  • Workflow code must use Go SDK APIs to handle things like time, logging, and goroutines.
  • Workflow code can not directly iterate over maps using range because the order of the map's iteration is randomized.

However, the Go SDK provides a number of features to handle these restrictions with ease.

  1. To interact with external systems and nondeterministic code, Workflows can execute Activities.
  2. To handle things like time, logging, and goroutines, as mentioned above, there are specific Go SDK APIs available, such as:
    • workflow.Now() This is a replacement for time.Now().
    • workflow.Sleep() This is a replacement for time.Sleep().
    • workflow.GetLogger() This is to ensure that the provided logger does not duplicate logs during a replay.
    • workflow.Go() This is a replacement for the the go statement.
    • workflow.Channel This is a replacement for the native chan type. Temporal provides support for both buffered and unbuffered channels.
    • workflow.Selector This is a replacement for the select statement. Learn more on the Go SDK Selectors page
    • workflow.Context This is a replacement for context.Context. Learn more on the Go SDK Context Propagation page.
  3. Additionally, for executing very small pieces of nondeterministic logic within the Workflow, you can use the workflow.SideEffect API.

Below is a sample Workflow that is treated as a cron job by the Temporal Server. It executes a single Activity and uses workflow.Now().

cron/workflow.go

package cron
import (    "context"    "time"
    "go.temporal.io/sdk/activity"    "go.temporal.io/sdk/workflow")
// CronResult is used to return data from one cron run to the nexttype CronResult struct {    RunTime time.Time}
// SampleCronWorkflow executes on the given schedule// The schedule is provided when starting the Workflowfunc SampleCronWorkflow(ctx workflow.Context) (*CronResult, error) {
    workflow.GetLogger(ctx).Info("Cron workflow started.", "StartTime", workflow.Now(ctx))
    ao := workflow.ActivityOptions{        StartToCloseTimeout: 10 * time.Second,    }    ctx1 := workflow.WithActivityOptions(ctx, ao)
    // Start from 0 for first cron job    lastRunTime := time.Time{}    // Check to see if there was a previous cron job    if workflow.HasLastCompletionResult(ctx) {        var lastResult CronResult        if err := workflow.GetLastCompletionResult(ctx, &lastResult); err == nil {            lastRunTime = lastResult.RunTime        }    }    thisRunTime := workflow.Now(ctx)
    err := workflow.ExecuteActivity(ctx1, DoSomething, lastRunTime, thisRunTime).Get(ctx, nil)    if err != nil {        // Cron job failed        // Next cron will still be scheduled by the Server        workflow.GetLogger(ctx).Error("Cron job failed.", "Error", err)        return nil, err    }
    return &CronResult{RunTime: thisRunTime}, nil}
// DoSomething is an Activityfunc DoSomething(ctx context.Context, lastRunTime, thisRunTime time.Time) error {    activity.GetLogger(ctx).Info("Cron job running.", "lastRunTime_exclude", lastRunTime, "thisRunTime_include", thisRunTime)    // Query database, call external API, or do any other non-deterministic action.    return nil}

How to start a Workflow#

With the Go SDK, there are two ways that you can start a Workflow:

  1. Use the Go SDK client to start a Workflow from a Go process, as described below.
  2. Start a Workflow from an already running Workflow, which is known as a Child Workflow.
note

Starting a Workflow is not the same as executing a Workflow. Starting a Workflow means that you are telling the Server to begin tracking the state of the Workflow execution. In a Temporal application, you do not run Workflow code directly, instead Workflow code is hosted and executed by a Worker.

To start a Workflow you need to create the Temporal Go SDK client, call ExecuteWorkflow(), and pass it the following:

  1. context.Context, which is a normal Go Context type.
  2. client.StartWorkflowOptions Which can include timeout settings and a RetryPolicy.
  3. The name of the Workflow function.
  4. Variables that should be passed to the Workflow when it begins execution.
package main
import (    "context"
    "go.temporal.io/sdk/client")
func main() {    // Create the client object just once per process    c, err := client.NewClient(client.Options{})    if err != nil {        // Handle failure    }    defer c.Close()
    // TaskQueue is the only required option to start a Workflow    options := client.StartWorkflowOptions{        TaskQueue: "your_task_queue",    }
    we, err := c.ExecuteWorkflow(context.Background(), options, YourWorkflow, workflowParm1, workflowParam2)    if err != nil {        // Handle failure    }
    // Use the WorkflowExecution to get the result    // Get is blocking call and will wait for the Workflow to complete    var workflowResult string    err = we.Get(context.Background(), &workflowResult)    if err != nil {        // Handle failure    }
    // Do something with the result    printResults(workflowResult, we.GetID(), we.GetRunID())}

You can start Workflows asynchronously or synchronously. In most use cases it is better to execute the Workflow asynchronously. In Go, the only difference is whether the code waits for the result of the Workflow in the same process in which you started it, so you should not synchronously block the process if you don't have a good reason to. Workflows do not rely on the process that invoked it, and will continue executing even if the waiting process crashes or stops.

note

In most use cases it is better to execute the Workflow asynchronously. You can also start a Workflow Execution on a regular schedule with the CronSchedule option.

Scheduling Cron Workflows#

You can also start a Workflow Execution on a regular schedule with the CronSchedule option.

workflowOptions := client.StartWorkflowOptions{    ID:           workflowID,    TaskQueue:    "cron",    CronSchedule: "* * * * *",}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, cron.SampleCronWorkflow)

More info in the Distributed Cron docs.

External Workflows#

You can execute Workflows (including those from other language SDKs) by their type name:

workflowID := "myworkflow_" + uuid.New()workflowOptions := client.StartWorkflowOptions{  ID:        workflowID,  TaskQueue: "mytaskqueue",}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, "MySimpleWorkflow")if err != nil {  log.Fatalln("Unable to execute workflow", err)}log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

Here we execute a workflow by its type name, namely MySimpleWorkflow. By default, the Workflow type is the name of the Workflow function, for example:

func MySimpleWorkflow(ctx workflow.Context) error { // Workflow code here...}

Note that you can also set the Workflow type via RegisterWorkflowOptions when registering your Workflow with the Worker, for example:

rwo := workflow.RegisterOptions {   Name: "MyWorkflow", // Set "MyWorkflow" as the Workflow type}w.RegisterWorkflowWithOptions(dynamic.SampleGreetingsWorkflow, rwo)

Inside Workflow code you can also signal other workflows using their workflow type using SignalExternalWorkflow:

// Send 10 signals to PHP workflowfor i := 0; i < 10; i++ {    err :=  workflow.SignalExternalWorkflow(ctx, "simple-workflow-php", "", "goMessage", "Hello from Go workflow: "+strconv.Itoa(i)).Get(ctx, nil)}

Here we are sending a signal to a Workflow with type "simple-workflow-php" and signal name "goMessage".

See our Signals docs and Temporal Polyglot example for more.

Child Workflows#

To spawn a Child Workflow Execution in Go, use the ExecuteChildWorkflow API, which is available from the go.temporal.io/sdk/workflow package.

The ExecuteChildWorkflow call requires an instance of workflow.Context, with an instance of workflow.ChildWorkflowOptions applied to it, the Workflow Type, and any parameters that should be passed to the Child Workflow Execution.

workflow.ChildWorkflowOptions contain the same fields as client.StartWorkflowOptions. Workflow Option fields automatically inherit their values from the Parent Workflow Options if they are not explicitly set. If a custom WorkflowID is not set, one is generated when the Child Workflow Execution is spawned. Use the WithChildOptions API to apply Child Workflow Options to the instance of workflow.Context.

The ExecuteChildWorkflow call returns an instance of a ChildWorkflowFuture.

Call the .Get() method on the instance of ChildWorkflowFuture to wait for the result.

func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
  childWorkflowOptions := workflow.ChildWorkflowOptions{}  ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
  var result ChildResp  err := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{}).Get(ctx, &result)  if err != nil {    // ...  }  // ...  return resp, nil}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {  // ...  return resp, nil}

Parent Close Policy#

In Go, a Parent Close Policy is set on the ParentClosePolicy field of an instance of workflow.ChildWorkflowOptions. The possible values can be obtained from the go.temporal.io/api/enums/v1 package.

  • PARENT_CLOSE_POLICY_ABANDON
  • PARENT_CLOSE_POLICY_TERMINATE
  • PARENT_CLOSE_POLICY_REQUEST_CANCEL

The Child Workflow Options are then applied to the the instance of workflow.Context by using the WithChildOptions API, which is then passed to the ExecuteChildWorkflow() call.

See the Asynchronous execution section below for an example.

Asynchronous execution#

To asynchronously spawn a Child Workflow Execution, the Child Workflow must have an "Abandon" Parent Close Policy set in the Child Workflow Options. Additionally, the Parent Workflow Execution must wait for the "ChildWorkflowExecutionStarted" event to appear in its event history before it completes.

If the Parent makes the ExecuteChildWorkflow call and then immediately completes, the Child Workflow Execution will not spawn.

To be sure that the Child Workflow Execution has started, first call the GetChildWorkflowExecution method on the instance of the ChildWorkflowFuture, which will return a different Future. Then call the Get() method on that Future, which is what will wait until the Child Workflow Execution has spawned.

import (  // ...  "go.temporal.io/api/enums/v1")
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
  childWorkflowOptions := workflow.ChildWorkflowOptions{    ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,  }  ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
  childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})  // Wait for the Child Workflow Execution to spawn  var childWE WorkflowExecution  if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWE); err != nil {     return err  }  // ...  return resp, nil}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {  // ...  return resp, nil}

Querying Workflow State#

When you start a Workflow with ExecuteWorkflow, a WorkflowExecution is returned (which is the we variable above). The WorkflowExecution can be used to get the result or capture the WorkflowId. You can retrieve the result of the Workflow from a completely different process, as long as you have the WorkflowId, by using client.GetWorkflow.

we = client.GetWorkflow(workflowID)var result stringwe.Get(ctx, &result)

How to cancel a Workflow Execution#

Use the CancelWorkflow API to cancel a Workflow Execution using its Id.

cancellation/cancel/main.go

func main() {    var workflowID string    flag.StringVar(&workflowID, "wid", "workflowID-to-cancel", "workflowID of the Workflow Execution to be canceled.")    flag.Parse()
    if workflowID == "" {        flag.PrintDefaults()        return    }
    // The client is a heavyweight object that should be created once per process.    c, err := client.NewClient(client.Options{        HostPort: client.DefaultHostPort,    })    if err != nil {        log.Fatalln("Unable to create client", err)    }    defer c.Close()
    err = c.CancelWorkflow(context.Background(), workflowID, "")    if err != nil {        log.Fatalln("Unable to cancel Workflow Execution", err)    }    log.Println("Workflow Execution cancelled", "WorkflowID", workflowID)}

How to clean up after a Workflow is cancelled#

Workflow Definitions can be written to handle execution cancellation requests with Go's defer and the workflow.NewDisconnectedContext API. In the Workflow Definition below, there is a special Activity that handles clean up should the execution be cancelled.

cancellation/workflow.go

// YourWorkflow is a Workflow Definition that shows how it can be canceled.func YourWorkflow(ctx workflow.Context) error {    ao := workflow.ActivityOptions{        StartToCloseTimeout: 30 * time.Minute,        HeartbeatTimeout:    5 * time.Second,        WaitForCancellation: true,    }    ctx = workflow.WithActivityOptions(ctx, ao)    logger := workflow.GetLogger(ctx)    logger.Info("cancel workflow started")    var a *Activities // Used to call Activities by function pointer    defer func() {                if !errors.Is(ctx.Err(), workflow.ErrCanceled) {            return        }                // When the Workflow is canceled, it has to get a new disconnected context to execute any Activities        newCtx, _ := workflow.NewDisconnectedContext(ctx)        err := workflow.ExecuteActivity(newCtx, a.CleanupActivity).Get(ctx, nil)        if err != nil {            logger.Error("CleanupActivity failed", "Error", err)        }    }()
    var result string    err := workflow.ExecuteActivity(ctx, a.ActivityToBeCanceled).Get(ctx, &result)    logger.Info(fmt.Sprintf("ActivityToBeCanceled returns %v, %v", result, err))
    err = workflow.ExecuteActivity(ctx, a.ActivityToBeSkipped).Get(ctx, nil)    logger.Error("Error from ActivityToBeSkipped", "Error", err)
    logger.Info("Workflow Execution complete.")
    return nil}

How to get data in or out of a running Workflow#

Signals are the mechanism by which you can get data into an already running Workflow.

Queries are the mechanism by which you can get data out of a currently running Workflow.

Custom Serialization and Workflow Security#

Workflow method arguments and return values are serializable to a Payload protobuf that contains a bytearray as well as metadata map. You can use the SDK's DataConverter interface to do this. The default implementation uses JSON serializer, but you can use any alternative serialization mechanism.

The values passed to Workflows through invocation parameters or returned through a result value are recorded in the execution history.

Even though Workflow execution history is cached in the Workers, in the case of Worker failure, the full execution history has to be transferred from the Temporal service to the Workflow Workers.

In those cases a large execution history could adversely impact the performance of your Workflow. Be mindful of the amount of data that you transfer via Activity invocation parameters or return values. Otherwise, no additional limitations exist on Activity implementations.

We discuss how to work around the history size limitations with ContinueAsNew in the Large Event Histories section.

Large Event Histories#

Temporal stores the execution history of all Workflows. There is a maximum limit of this execution history (50,000 events). Even though Temporal Server emits warnings while your workflow are approaching this limit (every 10,000 events), you should make sure your workflows don't reach it.

Workflows that periodically execute a number of Activities, for a long time, have the potential of running into this execution history size limit.

One way of dealing with this issue is to use ContinueAsNew. This feature allows you to complete the current Workflow execution and start a new one atomically. This new execution has the same Workflow Id, but a different Run Id, and as such will get its own execution history.

If your Workflows are running periodically using a Cron definition, the ContinueAsNew feature is used internally by Temporal. In this case, each Workflow execution as defined by the Cron definition will have its own Run Id and execution history.

To trigger this behavior, the Workflow function should terminate by returning the special ContinueAsNewError error:

func SimpleWorkflow(ctx workflow.Context, value string) error {    ...    return workflow.NewContinueAsNewError(ctx, SimpleWorkflow, value)}

If you need to know whether a Workflow was started via continueAsNew, you can check if workflow.GetInfo(ctx).ContinuedExecutionRunID is not nil.