Temporal Application development guide
This guide is meant to provide a comprehensive overview of the structures, primitives, and features used in Temporal Application development.
WORK IN PROGRESS
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
This guide is meant to be a comprehensive resource for developing a Temporal Application.
It is broken down into two large sections:
- Foundations: The minimum things required to build and run a simple Workflow with a single Activity.
- Features: All the general features available to a Temporal Application.
Foundations
This section covers the minimum set of concepts and implementation details needed to build and run a simple Temporal Application – that is, all the relevant steps to start a Workflow Execution that executes an Activity.
Run a dev Cluster
Whenever we are developing Temporal Applications, we want to have a Temporal Cluster up and running. We can interact with a Cluster through Temporal Client APIs and tctl commands.
There are four ways to quickly install and run a Temporal Cluster:
- Docker: Using Docker Compose makes it easy to develop your Temporal Application locally.
- Render: Our temporalio/docker-compose experience has been translated to Render's Blueprint format for an alternative cloud connection.
- Helm charts: Deploying a Cluster to Kubernetes is an easy way to test the system and develop Temporal Applications.
- Gitpod: One-click deployments are available for Go and TypeScript.
We do not recommend using any of these methods in a full (production) environment.
Helm charts
Use Temporal Helm charts to deploy the Temporal Server to a Kubernetes cluster.
Deploying the Temporal Cluster with Helm is not recommended for a production environment, but it is a great way to test the system while developing Workflows.
Docker Compose
Use Docker Compose and Temporal Cluster Docker images to quickly install and run a Temporal Cluster locally while developing Workflows.
You must have Docker and Docker Compose installed.
Then clone the temporalio/docker-compose repository and run docker-compose up
from the root of that repo:
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up
When the Temporal Cluster is running, the Temporal Web UI becomes available in your browser: localhost:8080
The preceding steps start and run a Temporal Cluster using a default configuration. To try other configurations (different dependencies and databases), or to try a custom Docker image, follow the temporalio/docker-compose README.
Render
temporal-render-simple translates our docker-compose to Render by using the Auto-Setup Docker image. We do not recommend using this technique for production because all four Temporal internal services (Frontend, Matching, History, and Worker) are run in one process, but the benefit is one-click deployments.
Gitpod
You can run a Temporal Cluster and develop Temporal Applications in your browser using Gitpod.
One-click deployments are available for the temporalio/samples-go repo and the temporalio/samples-typescript repo.
A one-click deployment starts a Temporal Cluster using a Temporal Cluster Docker image, starts a Worker Process, and starts one of the application's sample Workflows.
It can take up to a full minute for the one-click deployments to get fully up and running. When it is running, you can customize the application samples.
Add your SDK
Add a Temporal SDK to your project. Both TypeScript and JavaScript can be used with the TypeScript SDK.
- Go
- Java
- PHP
- TypeScript
The Temporal Go SDK provides a framework for Temporal Application development in the Go language. The SDK contains the following tools:
- A Temporal Client to communicate with a Temporal Cluster
- APIs to use within your Workflows
- APIs to create and manage Worker Entities and Worker Processes
Get the SDK
Add the Temporal Go SDK to your project:
go get -u go.temporal.io/sdk@latest
Or clone the Go SDK repo to your preferred location:
git clone git@github.com:temporalio/sdk-go.git
Are there executable code samples?
You can find a complete list of executable code samples in the samples library, which includes Temporal Go SDK code samples from the temporalio/samples-go repo. Additionally, each of the Go SDK Tutorials is backed by a fully executable template application.
Where is the Go SDK technical reference?
The Temporal Go SDK API reference is published on pkg.go.dev
Where can I find video demos?
The Temporal Java SDK provides a framework for Temporal Application development in Java. The SDK contains the following tools:
- A Temporal Client to communicate with a Temporal Cluster
- APIs to use within your Workflows
- APIs to create and manage Worker Entities and Worker Processes
Get the SDK
Add the Temporal Java SDK to your project as a dependency:
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.11.0</version>
</dependency>
implementation 'io.temporal:temporal-sdk:1.11.0'
Other:
Additional scripts for each SDK version are available here: https://search.maven.org/artifact/io.temporal/temporal-sdk. Select an SDK version to see available scripts.
Are there executable code samples?
You can find a complete list of executable code samples in the samples library, which includes Temporal Java SDK code samples from the temporalio/samples-java repo. Additionally, several of the Java SDK Tutorials are backed by a fully executable template application.
Where is the Java SDK technical reference?
The Java SDK API reference is published to javadoc.io.
Where can I find video demos?
The Temporal TypeScript PHP provides a framework for Temporal Application development in the PHP language. The SDK contains the following tools:
- A Temporal Client to communicate with a Temporal Cluster
- APIs to use within your Workflows
- APIs to create and manage Worker Entities and Worker Processes
Get the SDK
The Temporal PHP SDK is available as composer package and can be installed using the following command in a root of your project:
composer require temporal/sdk
The Temporal PHP SDK requires the RoadRunner 2.0 application server and supervisor to run Activities and Workflows in a scalable way.
Install RoadRunner manually by dowloading its binary from the release page.
Or install RoadRunner through the CLI:
composer require spiral/roadrunner:v2.0 nyholm/psr7
./vendor/bin/rr get-binary
The Temporal TypeScript SDK provides a framework for Temporal Application development in the TypeScript language. The SDK contains the following tools:
- A Temporal Client to communicate with a Temporal Cluster
- APIs to use within your Workflows
- APIs to create and manage Worker Entities and Worker Processes
Get the SDK
To download the latest version of the Temporal TypeScript Command, run the following command:
npm i temporalio
Or clone the TypeScript SDK repo to your preferred location:
git clone git@github.com:temporalio/sdk-typescript.git
This project requires Node.js 14 or later.
Are there executable code samples?
You can find a complete list of executable code samples in the samples' library, which includes Temporal TypeScript SDK code samples from the temporalio/samples-typescript repo. Additionally, each of the TypeScript SDK Tutorials is backed by a fully executable template application.
Where is the TypeScript SDK technical reference?
The Temporal TypeScript SDK API reference is published on typescript.temporal.io.
Develop Workflows
Workflows are the fundamental unit of a Temporal Application, and it all starts with the development of a Workflow Definition.
- Go
- Java
- PHP
- TypeScript
In the Temporal Go SDK programming model, a Workflow Definition is an exportable function.
func YourWorkflowDefinition(ctx workflow.Context) error {
// ...
return nil
}
In Go, by default, the Workflow Type name is the same as the function name.
In the Temporal Java SDK programming model, a Workflow definition comprises a Workflow interface annotated with @WorkflowInterface
and a Workflow implementation that implements the Workflow interface.
The Workflow interface is a Java interface and is annotated with @WorkflowInterface
.
Each Workflow interface must have only one method annotated with @WorkflowMethod
.
The method name can be used to denote the Workflow Type.
// Workflow interface
@WorkflowInterface
public interface YourWorkflow {
@WorkflowMethod
String yourWFMethod(Arguments args);
}
However, when using dynamic Workflows, do not specify a @WorkflowMethod
, and implement the DynamicWorkflow
directly in the Workflow implementation code.
The @WorkflowMethod
identifies the method that is the starting point of the Workflow Execution.
The Workflow Execution completes when this method completes.
You can create interface inheritance hierarchies to reuse components across other Workflow interfaces.
The interface inheritance approach does not apply to @WorkflowMethod
annotations.
A Workflow implementation implements a Workflow interface.
// Define the Workflow implementation which implements our getGreeting Workflow method.
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
}
}
To call Activities in your Workflow, call the Activity implementation.
Use ExternalWorkflowStub
to start or send Signals from within a Workflow to other running Workflow Executions.
You can also invoke other Workflows as Child Workflows with Workflow.newChildWorkflowStub()
or Workflow.newUntypedChildWorkflowStub()
within a Workflow Definition.
Use DynamicWorkflow
to implement Workflow Types dynamically.
Register a Workflow implementation type that extends DynamicWorkflow
to implement any Workflow Type that is not explicitly registered with the Worker.
The dynamic Workflow interface is implemented with the execute
method. This method takes in EncodedValues
that are inputs to the Workflow Execution.
These inputs can be specified by the Client when invoking the Workflow Execution.
public class MyDynamicWorkflow implements DynamicWorkflow {
@Override
public Object execute(EncodedValues args) {
}
}
In PHP, a Workflow is a class method. Classes must implement interfaces that are annotated with #[YourWorkflowInterface]
. The method that is the Workflow must be annotated with #[WorkflowMethod]
.
use Temporal\Workflow\YourWorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
#[YourWorkflowInterface]
interface FileProcessingWorkflow
{
#[WorkflowMethod]
public function processFile(Argument $args);
}
Workflow Definitions are just functions, which can store state and orchestrate Activity Functions.
The following code snippet uses proxyActivities
to schedule a greet
Activity in the system to say hello.
A Workflow Definition can have multiple parameters; however, we recommend using a single object parameter.
type ExampleArgs = {
name: string;
};
export async function example(args: ExampleArgs): Promise<{greeting: string}> {
const greeting = await greet(args.name);
return {greeting};
}
Workflow parameters
Temporal Workflows may have any number of custom parameters. However, it is strongly recommended that objects are used as parameters, so that the object's individual fields may be altered without breaking the signature of the Workflow. All Workflow Definition parameters must be serializable.
- Go
- Java
- PHP
- TypeScript
The first parameter of a Go-based Workflow Definition must be of the workflow.Context
type, as it is used by the Temporal Go SDK to pass around Workflow Execution context, and virtually all the Go SDK APIs that are callable from the Workflow require it.
It is acquired from the go.temporal.io/sdk/workflow
package.
import (
"go.temporal.io/sdk/workflow"
)
func YourWorkflowDefinition(ctx workflow.Context, param string) error {
// ...
}
The workflow.Context
entity operates similarly to the standard context.Context
entity provided by Go.
The only difference between workflow.Context
and context.Context
is that the Done()
function, provided by workflow.Context
, returns workflow.Channel
instead of the standard Go chan
.
The second parameter, string
, is a custom parameter that is passed to the Workflow when it is invoked.
A Workflow Definition may support multiple custom parameters, or none.
These parameters can be regular type variables or safe pointers.
However, the best practice is to pass a single parameter that is of a struct
type, so there can be some backward compatibility if new parameters are added.
type YourWorkflowParam struct {
WorkflowParamFieldOne string
WorkflowParamFieldTwo int
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
}
All Workflow Definition parameters must be serializable, regardless of whether pointers or regular type values are used. Parameters can’t be channels, functions, variadic, or unsafe pointers.
A method annotated with @WorkflowMethod
can have any number of parameters.
We recommend passing a single parameter that contains all the input fields to allow for adding fields in a backward-compatible manner.
Note that all inputs should be serializable by the default Jackson JSON Payload Converter.
You can create a custom object and pass it to the Workflow method, as shown in the following example.
//...
@WorkflowInterface
public interface YourWorkflow {
@WorkflowMethod
String yourWFMethod(CustomObj customobj);
// ...
}
Content is not available
You can define and pass parameters in your Workflow. In this example, you define your arguments in your client.ts
file and pass those parameters to workflow.ts
through your Workflow function.
Start a Workflow with the parameters that are in the client.ts
file. In this example we set the name
parameter to Temporal
and born
to 2019
. Then set the Task Queue and Workflow Id.
client.ts
import { example } from './workflows';
...
await client.start(example, {
args: [{ name: 'Temporal', born: 2019 }],
taskQueue: 'my-queue',
workflowId: 'business-meaningful-id',
});
In workflows.ts
define the type of the parameter that the Workflow function takes in. The interface ExampleParam
is a name we can now use to describe the requirement in the previous example. It still represents having the two properties called name
and born
that is of the type string
. Then define a function that takes in a parameter of the type ExampleParam
and return a Promise<string>
. The Promise
object represents the eventual completion, or failure, of await client.start()
and its resulting value.
- TypeScript
- JavaScript
interface ExampleParam {
name: string;
born: number;
}
export async function example({name, born}: ExampleParam): Promise<string> {
return `Hello ${name}, you were born in ${born}.`;
}
export async function example({ name, born }) {
return `Hello ${name}, you were born in ${born}.`;
}
Workflow return values
Workflow return values must also be serializable. Returning results, returning errors, or throwing exceptions is fairly idiomatic in each language that is supported. However, Temporal APIs that must be used to get the result of a Workflow Execution will only ever receive one of either the result or the error.
- Go
- Java
- PHP
- TypeScript
A Go-based Workflow Definition can return either just an error
or a customValue, error
combination.
Again, the best practice here is to use a struct
type to hold all custom values.
type YourWorkflowResponse struct{
WorkflowResultFieldOne string
WorkflowResultFieldTwo int
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
if err != nil {
return "", err
}
responseVar := YourWorkflowResponse {
FieldOne: "super",
FieldTwo: 1,
}
return responseVar, nil
}
A Workflow Definition written in Go can return both a custom value and an error.
However, it's not possible to receive both a custom value and an error in the calling process, as is normal in Go.
The caller will receive either one or the other.
Returning a non-nil error
from a Workflow indicates that an error was encountered during its execution and the Workflow Execution should be terminated, and any custom return values will be ignored by the system.
Workflow method arguments and return values must be serializable and deserializable using the provided DataConverter
.
The execute
method for DynamicWorkflow
can return type Object.
Ensure that your Client can handle an Object type return or is able to convert the Object type response.
Related references:
A Workflow method returns a Generator.
To properly typecast the Workflow's return value in the client code, use the #[ReturnType()]
annotation.
#[YourWorkflowInterface]
interface FileProcessingWorkflow {
#[WorkflowMethod]
#[ReturnType("string")]
public function processFile(Argument $args);
}
To return a value of the Workflow function, use Promise<something>
. The Promise
is used to make asynchronous calls and comes with guarantees.
The following example uses a Promise<string>
to eventually return a name
and born
parameter.
interface ExampleParam {
name: string;
born: number;
}
export async function example({name, born}: ExampleParam): Promise<string> {
return `Hello ${name}, you were born in ${born}.`;
}
Customize Workflow Type
You can set a custom name for your Workflow Type.
- Go
- Java
- PHP
- TypeScript
To customize the Workflow Type, set the Name
parameter with RegisterOptions
when registering your Workflow with a Worker.
- Type:
string
- Default: function name
// ...
w := worker.New(temporalClient, "your_task_queue_name", worker.Options{})
registerOptions := workflow.RegisterOptions{
Name: "CoolWorkflowTypeName",
// ...
}
w.RegisterWorkflowWithOptions(YourWorkflowDefinition, registerOptions)
// ...
The Workflow Type defaults to the short name of the Workflow interface. In the following example, the Workflow Type defaults to "NotifyUserAccounts".
@WorkflowInterface
public interface NotifyUserAccounts {
@WorkflowMethod
void notify(String[] accountIds);
}
To overwrite this default naming and assign a custom Workflow Type, use the @WorkflowMethod
annotation with the name
parameter.
In the following example, the Workflow Type is set to "Abc".
@WorkflowInterface
public interface NotifyUserAccounts {
@WorkflowMethod(name = "Abc")
void notify(String[] accountIds);
}
When you set the Workflow Type this way, the value of the name
parameter does not have to start with an uppercase letter.
Content is not available
Content is not available
Workflow logic requirements
Workflow logic is constrained by deterministic execution requirements. Therefore, each language is limited to the use of certain idiomatic techniques. However, each Temporal SDK provides a set of APIs that can be used inside your Workflow to interact with external (to the Workflow) application code.
- Go
- Java
- PHP
- TypeScript
In Go, Workflow Definition code cannot directly do the following:
- Iterate over maps using
range
, because withrange
the order of the map's iteration is randomized. Instead you can collect the keys of the map, sort them, and then iterate over the sorted keys to access the map. This technique provides deterministic results. You can also use a Side Effect or an Activity to process the map instead. - Call an external API, conduct a file I/O operation, talk to another service, etc. (Use an Activity for these.)
The Temporal Go SDK has APIs to handle equivalent Go constructs:
workflow.Now()
This is a replacement fortime.Now()
.workflow.Sleep()
This is a replacement fortime.Sleep()
.workflow.GetLogger()
This ensures that the provided logger does not duplicate logs during a replay.workflow.Go()
This is a replacement for thego
statement.workflow.Channel
This is a replacement for the nativechan
type. Temporal provides support for both buffered and unbuffered channels.workflow.Selector
This is a replacement for theselect
statement. Learn more on the Go SDK Selectors pageworkflow.Context
This is a replacement forcontext.Context
. Learn more on the Go SDK Context Propagation page.
When defining Workflows using the Temporal Java SDK, the Workflow code must be written to execute effectively once and to completion.
The following constraints apply when writing Workflow Definitions:
- Do not use mutable global variables in your Workflow implementations. This will ensure that multiple Workflow instances are fully isolated.
- Your Workflow code must be deterministic.
Do not call non-deterministic functions (such as non-seeded random or
UUID.randomUUID()
) directly from the Workflow code. The Temporal SDK provides specific API for calling non-deterministic code in your Workflows. - Do not use programming language constructs that rely on system time.
For example, only use
Workflow.currentTimeMillis()
to get the current time inside a Workflow. - Do not use native Java
Thread
or any other multi-threaded classes likeThreadPoolExecutor
. UseAsync.function
orAsync.procedure
, provided by the Temporal SDK, to execute code asynchronously. - Do not use synchronization, locks, or other standard Java blocking concurrency-related classes besides those provided by the Workflow class.
There is no need for explicit synchronization because multi-threaded code inside a Workflow is executed one thread at a time and under a global lock.
- Call
Workflow.sleep
instead ofThread.sleep
. - Use
Promise
andCompletablePromise
instead ofFuture
andCompletableFuture
. - Use
WorkflowQueue
instead ofBlockingQueue
.
- Call
- Use
Workflow.getVersion
when making any changes to the Workflow code. Without this, any deployment of updated Workflow code might break already running Workflows. - Do not access configuration APIs directly from a Workflow because changes in the configuration might affect a Workflow Execution path. Pass it as an argument to a Workflow function or use an Activity to load it.
- Use
DynamicWorkflow
when you need a default Workflow that can handle all Workflow Types that are not registered with a Worker. A single implementation can implement a Workflow Type which by definition is dynamically loaded from some external source. All standardWorkflowOptions
and determinism rules apply to Dynamic Workflow implementations.
Java Workflow reference: https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/package-summary.html
Content is not available
Content is not available
Develop Activities
One of the primary things that Workflows do is orchestrate the execution of Activities. Activities are normal function/method executions that can interact with the world. For the Workflow to be able to execute the Activity, we must define the Activity Definition
- Go
- Java
- PHP
- TypeScript
In the Temporal Go SDK programming model, an Activity Definition is an exportable function or a struct
method.
Function
// basic function signature
func YourActivityDefinition(ctx context.Context) error {
// ...
return nil
}
// with parameters and return values
func SimpleActivity(ctx context.Context, value string) (string, error)
Struct method
type YourActivityStruct struct {
ActivityFieldOne string
ActivityFieldTwo int
}
func(a *YourActivityStruct) YourActivityDefinition(ctx context.Context) error {
// ...
}
func(a *YourActivityStruct) YourActivityDefinitionTwo(ctx context.Context) error {
// ...
}
An Activity struct can have more than one method, with each method acting as a separate Activity Type. Activities written as struct methods can use shared struct variables, such as:
- an application level DB pool
- client connection to another service
- reusable utilities
- any other expensive resources that you only want to initialize once per process
Because this is such a common need, the rest of this guide shows Activities written as struct
methods.
An Activity Definition is a combination of the Temporal Java SDK Activity Class implementing a specially annotated interface.
An Activity interface is annotated with @ActivityInterface
and an Activity implementation implements this Activity interface.
TO handle Activity types that do not have an explicitly registered handler, you can directly implement a dynamic Activity.
@ActivityInterface
public interface GreetingActivities {
String composeGreeting(String greeting, String language);
}
Each method defined in the Actvity interface defines a separate Activity method.
You can annotate each method in the Activity interface with the @ActivityMethod
annotation, but this is completely optional.
The following example uses the @ActivityMethod
annotation for the method defined in the previous example.
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod
String composeGreeting(String greeting, String language);
}
An Activity implementation is a Java class that implements an Activity annotated interface.
// Implementation for the GreetingActivities interface example from in the previous section
static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
return greeting + " " + name + "!";
}
}
Use DynamicActivity
to implement any number of Activity types dynamically.
When an Activity implementation that extends DynamicActivity
is registered, it is called for any Activity type invocation that doesn't have an explicitly registered handler.
The dynamic Activity interface is implemented with the execute
method, as shown in the following example.
// Dynamic Activity implementation
public static class DynamicGreetingActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
return activityType
+ ": "
+ args.get(0, String.class)
+ " "
+ args.get(1, String.class)
+ " from: "
+ args.get(2, String.class);
}
}
Use Activity.getExecutionContext()
to get information about the Activity type that should be implemented dynamically.
Activities are defined as methods of a plain PHP interface annotated with #[YourActivityInterface]
.
(You can also use PHP 8 attributes in PHP 7.)
Following is an example of an interface that defines four Activities:
#[YourActivityInterface]
// Defining an interface for the activities.
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName): void;
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName): void;
public function processFile(): string;
public function deleteLocalFile(string $fileName): void;
}
How to customize an Activity type
We recommend to use a single value type argument for Activity methods. In this way, adding new arguments as fields to the value type is a backward-compatible change.
An optional #[ActivityMethod]
annotation can be used to override a default Activity name.
You can define your own prefix for all Activity names by adding the prefix
option to the YourActivityInterface
annotation.
(The default prefix is empty.)
#[YourActivityInterface("file_activities.")]
interface FileProcessingActivities
{
public function upload(string $bucketName, string $localName, string $targetName);
#[ActivityMethod("transcode_file")]
public function download(string $bucketName, string $remoteName);
public function processFile(): string;
public function deleteLocalFile(string $fileName);
}
The #[YourActivityInterface("file_activities.")]
is an annotation that tells the PHP SDK to generate a class to implement the FileProcessingActivities
interface. The functions define Activites that are used in the Workflow.
- Activities execute in the standard Node.js environment.
- Activities cannot be in the same file as Workflows and must be separately registered.
- Activities may be retried repeatedly, so you may need to use idempotency keys for critical side effects.
Activities are just functions. The following is an Activity that accepts a string parameter and returns a string.
export async function greet(name: string): Promise<string> {
return `Hello, ${name}!`;
}
Activity parameters
All Activity parameters must be serializable.
There is no explicit limit to the amount of parameter data that can be passed to an Activity, but keep in mind that all parameters and return values are recorded in a Workflow Execution Event History. A large Workflow Execution Event History can adversely impact the performance of your Workflow Executions, because the entire Event History is transferred to Worker Processes with every Workflow Task.
- Go
- Java
- PHP
- TypeScript
The first parameter of an Activity Definition is context.Context
.
This parameter is optional for an Activity Definition, though it is recommended, especially if the Activity is expected to use other Go SDK APIs.
An Activity Definition can support as many other custom parameters as needed. However, all parameters must be serializable (parameters can’t be channels, functions, variadic, or unsafe pointers), and it is recommended to pass a single struct that can be updated later.
type YourActivityParam struct {
ActivityParamFieldOne string
ActivityParamFieldTwo int
}
type YourActivityStruct struct {
// ...
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) error {
// ...
}
An Activity interface can have any number of parameters. All inputs should be serializable by the default Jackson JSON Payload Converter.
When implementing Activities, be mindful of the amount of data that you transfer using the Activity invocation parameters or return values as these are recorded in the Workflow Execution Events History. Large Events Histories can adversely impact performance.
You can create a custom object, and pass it to the Activity interface, as shown in the following example.
@ActivityInterface
public interface YourActivities {
String getCustomObject(CustomObj customobj);
void sendCustomObject(CustomObj customobj, String abc);
}
The execute
method in the dynamic Activity interface implementation takes in EncodedValues
that are inputs to the Activity Execution, as shown in the following example.
// Dynamic Activity implementation
public static class DynamicActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
String activityType = Activity.getExecutionContext().getInfo().getActivityType();
return activityType
+ ": "
+ args.get(0, String.class)
+ " "
+ args.get(1, String.class)
+ " from: "
+ args.get(2, String.class);
}
}
For more details, see Dynamic Activity Reference.
Content is not available
Content is not available
Activity return values
All Activity results must be serializable.
There is no explicit limit to the amount of data that can be returned by an Activity, but keep in mind that all return values are recorded in a Workflow Execution Event History
- Go
- Java
- PHP
- TypeScript
A Go-based Activity Definition can return either just an error
or a customValue, error
combination (same as a Workflow Definition).
You may wish to use a struct
type to hold all custom values, just keep in mind they must all be serializable.
type YourActivityResult struct{
ActivityResultFieldOne string
ActivityResultFieldTwo int
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) (YourActivityResult, error) {
// ...
result := YourActivityResult {
ActivityResultFieldOne: a.ActivityFieldOne,
ActivityResultFieldTwo: a.ActivityFieldTwo,
}
return result, nil
}
Activity return values must be serializable and deserializable by the provided DataConverter
.
The execute
method for `DynamicActivity can return type Object.
Ensure that your Workflow or Client can handle an Object type return or is able to convert the Object type response.
Content is not available
To import the types of the Activities defined in ./activities
, you must first retrieve an Activity from an Activity Handle before you can call it, then define Return Types in your Activity.
import type * as activities from "./activities";
const {greet} = proxyActivities<typeof activities>({
startToCloseTimeout: "1 minute",
});
// A workflow that simply calls an activity
export async function example(name: string): Promise<string> {
return await greet(name);
}
Customize Activity Type
You can set a custom name for your Activity Type.
- Go
- Java
- PHP
- TypeScript
To customize the Activity Type, set the Name
parameter with RegisterOptions
when registering your Activity with a Worker.
- Type:
string
- Default: function name
// ...
w := worker.New(temporalClient, "your_task_queue_name", worker.Options{})
registerOptions := activity.RegisterOptions{
Name: "CoolActivityTypeName",
// ...
}
w.RegisterActivityWithOptions(a.YourActivityDefinition, registerOptions)
// ...
The Activity Type defaults to method name, with the first letter of the method name capitalized, and can be customized using namePrefix()
or {ActivityMethod.name()}
to ensure they are distinct.
In the following example, the Activity Type defaults to ComposeGreeting
.
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod
String composeGreeting(String greeting, String language);
}
To overwrite this default naming and assign a custom Activity Type, use the @ActivityMethod
annotation with the name
parameter.
In the following example, the Activity Type is set to "greet".
@ActivityInterface
public interface GreetingActivities {
@ActivityMethod(name = "greet")
String composeGreeting(String greeting, String language);
}
You can also define a prefix for all of your Activity Types using the namePrefix
parameter with the @ActivityInterface
annotation.
The following example shows a namePrefix
parameter applied to the @ActivityInterface
, and two Activity methods, of which one is defined using the @ActivityMethod
annotation.
@ActivityInterface(namePrefix = "A_")
Public interface GreetingActivities {
String sendGreeting(String input);
@ActivityMethod(name = "abc")
String composeGreeting(String greeting, String language);
}
In this example, the Activity type for the first method is set to "A_SendGreeting".
The Activity type for the method annotated with @ActivityMethod
is set to "A_abc".
Content is not available
Content is not available
Start Activity Execution
Calls to spawn Activity Executions are written within a Workflow Definition. The call to spawn an Activity Execution generates the ScheduleActivityTask Command. This results in the set of three Activity Task related Events (ActivityTaskScheduled, ActivityTaskStarted, and ActivityTask[Closed])in your Workflow Execution Event History.
A single instance of the Activities implementation is shared across multiple simultaneous Activity invocations. Therefore, the Activity implementation code must be stateless.
The values passed to Activities through invocation parameters or returned through a result value are recorded in the Execution history. The entire Execution history is transferred from the Temporal service to Workflow Workers when a Workflow state needs to recover. A large Execution history can thus adversely impact the performance of your Workflow.
Therefore, be mindful of the amount of data you transfer through Activity invocation parameters or Return Values. Otherwise, no additional limitations exist on Activity implementations.
- Go
- Java
- PHP
- TypeScript
To spawn an Activity Execution, use the ExecuteActivity()
API call inside your Workflow Definition.
The API is available from the go.temporal.io/sdk/workflow
package.
The ExecuteActivity()
API call requires an instance of workflow.Context
, the Activity function name, and any variables to be passed to the Activity Execution.
import (
// ...
"go.temporal.io/sdk/workflow"
)
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
yourActivityParam := YourActivityParam{
// ...
}
var activities *YourActivityStruct
future := workflow.ExecuteActivity(ctx, activities.YourActivityDefinition, yourActivityParam)
// ...
}
func (a *YourActivityStruct) YourActivityDefinition(ctx context.Context, param YourActivityParam) error {
// ...
}
The Activity function name can be provided as a variable object (no quotations) or as a string.
// ...
future := workflow.ExecuteActivity(ctx, "YourActivityDefinition", yourActivityParam)
// ...
The benefit of passing the actual function object is that the framework can validate the parameters against the Activity Definition.
The ExecuteActivity
call returns a Future, which can be used to get the result of the Activity Execution.
Activities are remote procedure calls that must be invoked from within a Workflow using ActivityStub
.
Activities are not executable on their own. You cannot start an Activity Execution by itself.
Note that before an Activity Execution is invoked:
- Activity options (either
setStartToCloseTimeout
orScheduleToCloseTimeout
are required) must be set for the Activity. For details, see Set Activity Options and Activity Options reference. - The Activity must be registered with a Worker. See Worker Program
- Activity code must be thread-safe.
Activities should only be instantiated using stubs from within a Workflow.
An ActivityStub
returns a client-side stub that implements an Activity interface.
You can invoke Activities using Workflow.newActivityStub
(type-safe) or Workflow.newUntypedActivityStub
(untyped).
Calling a method on the Activity interface schedules the Activity invocation with the Temporal service, and generates an ActivityTaskScheduled
Event.
Activities can be invoked synchronously or asynchronously.
Invoking Activities Synchronously
In the following example, we use the type-safe Workflow.newActivityStub
within the "FileProcessingWorkflow" Workflow implementation to create a client-side stub of the FileProcessingActivities
class. We also define ActivityOptions
and set setStartToCloseTimeout
option to one hour.
public class FileProcessingWorkflowImpl implements FileProcessingWorkflow {
private final FileProcessingActivities activities;
public FileProcessingWorkflowImpl() {
this.activities = Workflow.newActivityStub(
FileProcessingActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.build());
}
@Override
public void processFile(Arguments args) {
String localName = null;
String processedName = null;
try {
localName = activities.download(args.getSourceBucketName(), args.getSourceFilename());
processedName = activities.processFile(localName);
activities.upload(args.getTargetBucketName(), args.getTargetFilename(), processedName);
} finally {
if (localName != null) {
activities.deleteLocalFile(localName);
}
if (processedName != null) {
activities.deleteLocalFile(processedName);
}
}
}
// ...
}
A Workflow can have multiple Activity stubs. Each Activity stub can have its own ActivityOptions
defined.
The following example shows a Workflow implementation with two typed Activity stubs.
public FileProcessingWorkflowImpl() {
ActivityOptions options1 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue1")
.setStartToCloseTimeout(Duration.ofMinutes(10))
.build();
this.store1 = Workflow.newActivityStub(FileProcessingActivities.class, options1);
ActivityOptions options2 = ActivityOptions.newBuilder()
.setTaskQueue("taskQueue2")
.setStartToCloseTimeout(Duration.ofMinutes(5))
.build();
this.store2 = Workflow.newActivityStub(FileProcessingActivities.class, options2);
}
To invoke Activities inside Workflows without referencing the interface it implements, use an untyped Activity stub Workflow.newUntypedActivityStub
.
This is useful when the Activity type is not known at compile time, or to invoke Activities implemented in different programming languages.
// Workflow code
ActivityOptions activityOptions =
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(3))
.setTaskQueue("simple-queue-node")
.build();
ActivityStub activity = Workflow.newUntypedActivityStub(activityOptions);
activity.execute("ComposeGreeting", String.class, "Hello World" , "Spanish");
Invoking Activities Asynchronously
Sometimes Workflows need to perform certain operations in parallel.
The Temporal Java SDK provides the Async
class which includes static methods used to invoke any Activity asynchronously.
The calls return a result of type Promise
which is similar to the Java Future
and CompletionStage
.
When invoking Activities, use Async.function
for Activities that return a result, and Async.procedure
for Activities that return void.
In the following asynchronous Activity invocation, the method reference is passed to Async.function
followed by Activity arguments.
Promise<String> localNamePromise = Async.function(activities::download, sourceBucket, sourceFile);
The following example shows how to call two Activity methods, "download" and "upload", in parallel on multiple files.
public void processFile(Arguments args) {
List<Promise<String>> localNamePromises = new ArrayList<>();
List<String> processedNames = null;
try {
// Download all files in parallel.
for (String sourceFilename : args.getSourceFilenames()) {
Promise<String> localName =
Async.function(activities::download, args.getSourceBucketName(), sourceFilename);
localNamePromises.add(localName);
}
List<String> localNames = new ArrayList<>();
for (Promise<String> localName : localNamePromises) {
localNames.add(localName.get());
}
processedNames = activities.processFiles(localNames);
// Upload all results in parallel.
List<Promise<Void>> uploadedList = new ArrayList<>();
for (String processedName : processedNames) {
Promise<Void> uploaded =
Async.procedure(
activities::upload,
args.getTargetBucketName(),
args.getTargetFilename(),
processedName);
uploadedList.add(uploaded);
}
// Wait for all uploads to complete.
Promise.allOf(uploadedList).get();
} finally {
for (Promise<String> localNamePromise : localNamePromises) {
// Skip files that haven't completed downloading.
if (localNamePromise.isCompleted()) {
activities.deleteLocalFile(localNamePromise.get());
}
}
if (processedNames != null) {
for (String processedName : processedNames) {
activities.deleteLocalFile(processedName);
}
}
}
}
Activity Execution Context
ActivityExecutionContext
is a context object passed to each Activity implementation by default.
You can access it in your Activity implementations via Activity.getExecutionContext()
.
It provides getters to access information about the Workflow that invoked the Activity.
Note that the Activity context information is stored in a thread-local variable.
Therefore, calls to getExecutionContext()
succeed only within the thread that invoked the Activity function.
Following is an example of using the ActivityExecutionContext
:
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
@Override
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
ActivityInfo info = ctx.getInfo();
log.info("namespace=" + info.getActivityNamespace());
log.info("workflowId=" + info.getWorkflowId());
log.info("runId=" + info.getRunId());
log.info("activityId=" + info.getActivityId());
log.info("activityTimeout=" + info.getStartToCloseTimeoutSeconds());
return downloadFileFromS3(bucketName, remoteName, localDirectory + localName);
}
...
}
For details on getting the results of an Activity Execution, see Activity Execution Result.
Activity implementation is an implementation of an Activity interface. The following code example, uses a constructor that takes an Amazon S3 client and a local directory, and uploads a file to the S3 bucket. Then, the code uses a function to download a file from the S3 bucket passing a bucket name, remote name, and local name as arguments. Finally, it uses a function that takes a local file name as an argument and returns a string.
// An implementation of an Activity interface.
class FileProcessingActivitiesImpl implements FileProcessingActivities {
private S3Client $s3Client;
private string $localDirectory;
public function __construct(S3Client $s3Client, string $localDirectory) {
$this->s3Client = $s3Client;
$this->localDirectory = $localDirectory;
}
// Uploading a file to S3.
public function upload(string $bucketName, string $localName, string $targetName): void
{
$this->s3Client->putObject(
$bucketName,
$targetName,
fopen($this->localDirectory . $localName, 'rb+')
);
}
// Downloading a file from S3.
public function download(
string $bucketName,
string $remoteName,
string $localName
): void
{
$this->s3Client->downloadObject(
$bucketName,
$remoteName,
fopen($this->localDirectory .$localName, 'wb+')
);
}
// A function that takes a local file name as an argument and returns a string.
public function processFile(string $localName): string
{
// Implementation omitted for brevity.
return compressFile($this->localDirectory . $localName);
}
public function deleteLocalFile(string $fileName): void
{
unlink($this->localDirectory . $fileName);
}
}
To spawn an Activity Execution, you must retrieve the Activity handle in your Workflow.
import {proxyActivities} from "@temporalio/workflow";
// Only import the activity types
import type * as activities from "./activities";
const {greet} = proxyActivities<typeof activities>({
startToCloseTimeout: "1 minute",
});
// A workflow that calls an activity
export async function example(name: string): Promise<string> {
return await greet(name);
}
This imports the individual Activities and declares the type alias for each Activity.
Get Activity results
The call to spawn an Activity Execution generates the ScheduleActivityTask Command and provides the Workflow with an Awaitable. Workflow Executions can either block progress until the result is available through the Awaitable or continue progressing, making use of the result when it becomes available.
- Go
- Java
- PHP
- TypeScript
The ExecuteActivity
API call returns an instance of workflow.Future
which has the following two methods:
Get()
: Takes an instance of theworkflow.Context
, that was passed to the Activity Execution, and a pointer as parameters. The variable associated with the pointer is populated with the Activity Execution result. This call blocks until the results are available.IsReady()
: Returnstrue
when the result of the Activity Execution is ready.
Call the Get()
method on the instance of workflow.Future
to get the result of the Activity Execution.
The type of the result parameter must match the type of the return value declared by the Activity function.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
future := workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam)
var yourActivityResult YourActivityResult
if err := future.Get(ctx, &yourActivityResult); err != nil {
// ...
}
// ...
}
Use the IsReady()
method first to make sure the Get()
call doesn't cause the Workflow Execution to wait on the result.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
future := workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam)
// ...
if(future.IsReady()) {
var yourActivityResult YourActivityResult
if err := future.Get(ctx, &yourActivityResult); err != nil {
// ...
}
}
// ...
}
It is idiomatic to invoke multiple Activity Executions from within a Workflow. Therefore, it is also idiomatic to either block on the results of the Activity Executions or continue on to execute additional logic, checking for the Activity Execution results at a later time.
To get the results of an asynchronously invoked Activity method, use the Promise
get
method to block until the Activity method result is available.
Sometimes an Activity Execution lifecycle goes beyond a synchronous method invocation. For example, a request can be put in a queue and later a reply comes and is picked up by a different Worker process. The whole request-reply interaction can be modeled as a single Activity.
To indicate that an Activity should not be completed upon its method return, call ActivityExecutionContext.doNotCompleteOnReturn()
from the original Activity thread.
Then later, when replies come, complete the Activity using the ActivityCompletionClient
.
To correlate Activity invocation with completion use either a TaskToken
or Workflow and Activity IDs.
Following is an example of using ActivityExecutionContext.doNotCompleteOnReturn()
:
public class FileProcessingActivitiesImpl implements FileProcessingActivities {
public String download(String bucketName, String remoteName, String localName) {
ActivityExecutionContext ctx = Activity.getExecutionContext();
// Used to correlate reply
byte[] taskToken = ctx.getInfo().getTaskToken();
asyncDownloadFileFromS3(taskToken, bucketName, remoteName, localDirectory + localName);
ctx.doNotCompleteOnReturn();
// Return value is ignored when doNotCompleteOnReturn was called.
return "ignored";
}
...
}
When the download is complete, the download service potentially can complete the Activity, or fail it from a different process, for example:
public <R> void completeActivity(byte[] taskToken, R result) {
completionClient.complete(taskToken, result);
}
public void failActivity(byte[] taskToken, Exception failure) {
completionClient.completeExceptionally(taskToken, failure);
}
Workflow::newActivityStub
returns a client-side stub an implements an Activity interface. The client-side stub can be used within the Workflow code. It takes the Activity's type andActivityOptions
as arguments.
Calling (via yield
) a method on this interface invokes an Activity that implements this method.
An Activity invocation synchronously blocks until the Activity completes, fails, or times out.
Even if Activity execution takes a few months, the Workflow code still sees it as a single synchronous invocation.
It doesn't matter what happens to the processes that host the Workflow.
The business logic code just sees a single method call.
class GreetingWorkflow implements GreetingWorkflowInterface
{
private $greetingActivity;
public function __construct()
{
$this->greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
}
public function greet(string $name): \Generator
{
// This is a blocking call that returns only after the activity has completed.
return yield $this->greetingActivity->composeGreeting('Hello', $name);
}
}
If different Activities need different options, like timeouts or a task queue, multiple client-side stubs can be created with different options.
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 seconds'))
);
$greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()->withStartToCloseTimeout(\DateInterval::createFromDateString('30 minutes'))
);
Since Activities are referenced by their string name, you can reference them dynamically to get the result of an Activity Execution.
export async function DynamicWorkflow(activityName, ...args) {
const acts = proxyActivities(/* activityOptions */);
// these are equivalent
await acts.activity1();
await acts["activity1"]();
let result = await acts[activityName](...args);
return result;
}
The proxyActivities()
returns an object that calls the Activities in the function. acts[activityName]()
references the Activity using the Activity name, then it returns the results.
Create Temporal Clients
A Temporal Client is needed to create Worker Entities and to communicate with a Temporal Cluster. Communication with the Temporal Cluster includes but is not limited to starting Workflow Executions, sending Signals to Workflow Executions, sending Queries to Workflow Executions, getting the result of a Workflow Execution.
A Temporal Client cannot be initialized and used inside Workflow code. However, it is acceptable and common to utilize a Temporal Client, to communicate with a Temporal Cluster, inside an Activity.
- Go
- Java
- PHP
- TypeScript
Use the NewClient()
API available in the go.temporal.io/sdk/client
package to create a new Client
import (
// ...
"go.temporal.io/sdk/client"
)
func main() {
temporalClient, err := client.NewClient(client.Options{})
if err != nil {
// ...
}
defer temporalClient.Close()
// ...
}
To initialize a Workflow Client, create an instance of a WorkflowClient
, create a client-side WorkflowStub
, and then call a Workflow method (annotated with the @WorkflowMethod
annotation).
To start a Workflow Execution, your Temporal Server must be running, and your front-end service must be accepting gRPC calls.
To establish a connection with the front-end service, use WorkflowServiceStubs
.
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
You can provide WorkflowServiceStubsOptions
to override the default values for the gRPC calls.
For example, the default front-end service gRPC address is set to 127.0.0.1:7233
, where 7233
is the default port for the Temporal frontend service. If your server is running on a different host or port from the default, you can set it as shown in the following example.
WorkflowServiceStubs service = WorkflowServiceStubs.newServiceStubs(
WorkflowServiceStubsOptions.newBuilder()
.setTarget(TARGET_ENDPOINT)
.build());
You can also provide certificates to be able to connect to your frontend service using mTLS.
The following example shows how to set up cetificates and pass the SSLContext
for the Client.
import io.temporal.serviceclient.SimpleSslContextBuilder;
...
// Load your client certificate, which should look like:
// -----BEGIN CERTIFICATE-----
// ...
// -----END CERTIFICATE-----
InputStream clientCert = new FileInputStream(System.getenv("TEMPORAL_CLIENT_CERT"));
// PKCS8 client key, which should look like:
// -----BEGIN PRIVATE KEY-----
// ...
// -----END PRIVATE KEY-----
InputStream clientKey = new FileInputStream(System.getenv("TEMPORAL_CLIENT_KEY"));
// For Temporal Cloud this would likely be ${namespace}.tmprl.cloud:7233
String targetEndpoint = System.getenv("TEMPORAL_ENDPOINT");
// Your registered Namespace.
String namespace = System.getenv("TEMPORAL_NAMESPACE");
// Create SSL enabled client by passing SslContext, created by SimpleSslContextBuilder.
WorkflowServiceStubs service =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setSslContext(SimpleSslContextBuilder.forPKCS8(clientCert, clientKey).build())
.setTarget(targetEndpoint)
.build());
For details, see Sample.
After the connection to the Temporal frontend service is established, create a Client for the service stub. The Workflow Client helps with client-side APIs and is required by Workers.
Create an instance of a WorkflowClient
for the Workflow service stub, and use WorkflowClientOptions
to set options for the Workflow Client.
The following example shows how to create a WorkflowClient
instance called "client" for the WorkflowServiceStubs
"service" that we created in the previous example, and set Namespace
option for the WorkflowClient
.
WorkflowClient client = WorkflowClient.newServiceStubs(
service,
WorkflowClientOptions.newBuilder()
.setNamespace(“Abc”)
.build());
See WorkflowClientOptions for details.
WorkflowService
and WorkflowClient
creation is a heavyweight operation, and will be resource-intensive if created each time you start a Workflow or send a Signal to it.
The recommended way is to create them once and reuse where possible.
With the Client defined, you can start interacting with the Temporal Frontend Service using the SDK APIs.
To initialize a Workflow in the Client, create a WorkflowStub
, and start the Workflow Execution with WorkflowClient.start()
.
Starting Workflows or sending Signals or Queries to Workflows from within a Client must be done using WorkflowStubs
.
WorkflowClient workflowClient = WorkflowClient.newInstance(service, clientOptions);
// Create a Workflow stub.
YourWorkflow workflow = workflowClient.newWorkflowStub(YourWorkflow.class);
// Start Workflow asynchronously and call its "yourWFMethod" Workflow method
WorkflowClient.start(workflow::yourWFMethod);
For details, see How to spawn a Workflow Execution in Java. See How to spawn a Workflow Execution in Java for details.
Content is not available
Use a new WorflowClient()
with the requisite gRPC Connection
to create a new Client.
import {Connection, WorkflowClient} from "@temporalio/client";
const connection = await Connection.connect(); // to configure for production
const client = new WorkflowClient({connection});
Declaring the WorflowClient()
creates a new connection to the Temporal service.
If you ommit the connection and just call the new WorkflowClient()
, you will create a default connection that works locally. However, configure your connection and Namespace when deploying to production.
The following example, creates a Client, connects to an account, and declares your Namespace.
import {Connection, WorkflowClient} from "@temporalio/client";
const connection = await Connection.connect({
address: "<Namespace ID>.tmprl.cloud", // defaults port to 7233 if not specified
tls: {
// set to true if TLS without mTLS
// See docs for other TLS options
clientCertPair: {
crt: clientCert,
key: clientKey,
},
},
});
const client = new WorkflowClient({
connection,
namespace: "your.namespace",
});
The Hello World mTLS sample demonstrates sample code used to connect to a Temporal Cloud account. When signing up to Temporal Cloud you should receive a Namespace, a Server address and a Client certificate and key. Use the following environment variables to set up the sample:
- TEMPORAL_ADDRESS: looks like
foo.bar.tmprl.cloud
(NOT web.foo.bar.tmprl.cloud) - TEMPORAL_NAMESPACE: looks like
foo.bar
- TEMPORAL_CLIENT_CERT_PATH:
/tls/ca.pem
(file contents start with -----BEGIN CERTIFICATE-----) - TEMPORAL_CLIENT_KEY_PATH:
/tls/ca.key
(file contents start with -----BEGIN PRIVATE KEY-----)
You can leave the remaining vars, like TEMPORAL_SERVER_NAME_OVERRIDE
and TEMPORAL_SERVER_ROOT_CA_CERT_PATH
blank.
There is another var, TEMPORAL_TASK_QUEUE
, which the example defaults to 'hello-world-mtls'
but you can customize as needed.
Example environment settings
export function getEnv(): Env {
return {
address: "web.<Namespace ID>.tmprl.cloud", // NOT web.foo.bar.tmprl.cloud
namespace: "your.namespace", // as assigned
clientCertPath: "foobar.pem", // in project root
clientKeyPath: "foobar.key", // in project root
taskQueue: process.env.TEMPORAL_TASK_QUEUE || "hello-world-mtls", // just to ensure task queue is same on client and worker, totally optional
// // not usually needed
// serverNameOverride: process.env.TEMPORAL_SERVER_NAME_OVERRIDE,
// serverRootCACertificatePath: process.env.TEMPORAL_SERVER_ROOT_CA_CERT_PATH,
};
}
If you have misconfigured your connection somehow, you will get an opaque [TransportError: transport error]
error. Read through your settings carefully and contact Temporal if you are sure you have checked everything.
Note the difference between the gRPC and Temporal Web endpoints:
- The gRPC endpoint has a DNS address of
<Namespace ID>.tmprl.cloud
, for example:accounting-production.f45a2.tmprl.cloud
. - The Temporal Web endpoint is
web.<Namespace ID>.tmprl.cloud
, for example:https://web.accounting-production.f45a2.tmprl.cloud
.
If you are using mTLS, it is completely up to you how to get the clientCert
and clientKey
pair into your code, whether it is reading from file system, secrets manager, or both. Just keep in mind that they are whitespace sensitive, and some environment variable systems have been known to cause frustration because they modify whitespace.
The following code example works for local development and for certifications hosted in an Amazon S3 bucket.
let serverRootCACertificate: Buffer | undefined;
let clientCertificate: Buffer | undefined;
let clientKey: Buffer | undefined;
if (certificateS3Bucket) {
const s3 = new S3client({region: certificateS3BucketRegion});
serverRootCACertificate = await s3.getObject({
bucket: certificateS3Bucket,
key: serverRootCACertificatePath,
});
clientCertificate = await s3.getObject({
bucket: certificateS3Bucket,
key: clientCertPath,
});
clientKey = await s3.getObject({
bucket: certificateS3Bucket,
key: clientKeyPath,
});
} else {
serverRootCACertificate = fs.readFileSync(serverRootCACertificatePath);
clientCertificate = fs.readFileSync(clientCertPath);
clientKey = fs.readFileSync(clientKeyPath);
}
Run Worker Processes
The Worker Process is where Workflow Functions and Activity Functions are executed. Each Worker Entity in the Worker Process must register the exact Workflow Types and Activity Types it may execute. Each Worker Entity must also associate itself with exactly one Task Queue. Each Worker Entity polling the same Task Queue must be registered with the same Workflow Types and Activity Types.
A Worker Entity is the component within a Worker Process that listens to a specific Task Queue.
Although multiple Worker Entities can be in a single Worker Process, a single Worker Entity Worker Process may be perfectly sufficient. For more information, see the Worker tuning guide.
A Worker Entity contains both a Workflow Worker and an Activity Worker so that it can make progress for either a Workflow Execution or an Activity Execution.
- Go
- Java
- PHP
- TypeScript
Create an instance of Worker
by calling worker.New()
, available via the go.temporal.io/sdk/worker
package, and pass it the following parameters:
- An instance of the Temporal Go SDK
Client
. - The name of the Task Queue that it will poll.
- An instance of
worker.Options
, which can be empty.
Then, register the Workflow Types and the Activity Types that the Worker will be capable of executing.
Lastly, call either the Start()
or the Run()
method on the instance of the Worker.
Run accepts an interrupt channel as a parameter, so that the Worker can be stopped in the terminal.
Otherwise, the Stop()
method must be called to stop the Worker.
package main
import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.NewClient(client.Options{})
if err != nil {
// ...
}
defer c.Close()
w := worker.New(c, "your-task-queue", worker.Options{})
w.RegisterWorkflow(YourWorkflowDefinition)
w.RegisterActivity(YourActivityDefinition)
err = w.Run(worker.InterruptCh())
if err != nil
// ...
}
// ...
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
}
func YourActivityDefinition(ctx context.Context, param YourActivityParam) (YourActivityResponse, error) {
// ...
}
Use the newWorker
method on an instance of a WorkerFactory
to create a new Worker in Java.
A single Worker Entity can contain many Worker Objects.
Call the start()
method on the instance of the WorkerFactory
to start all the Workers created in this process.
// ...
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
public class YourWorker {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker yourWorker = factory.newWorker("your_task_queue");
// Register Workflow
// and/or register Activities
factory.start();
}
}
After creating the Worker entity, register all Workflow Types and all Activity Types that the Worker can execute. A Worker can be registered with just Workflows, just Activities, or both.
Operation guides:
Content is not available
Create a Worker with Worker.create()
(which establishes the initial gRPC connection), then call worker.run()
on it (to start polling the Task Queue).
Below is an example of starting a Worker that polls the Task Queue named tutorial
.
taskQueue
is the only required option, but you will also use workflowsPath
and activities
to register Workflows and Activities with the Worker.
A full example for Workers looks like this:
import {Worker, NativeConnection} from "@temporalio/worker";
import * as activities from "./activities";
async function run() {
const connection = await NativeConnection.connect({
address: "foo.bar.tmprl.cloud", // defaults port to 7233 if not specified
tls: {
// set to true if TLS without mTLS
// See docs for other TLS options
clientCertPair: {
crt: clientCert,
key: clientKey,
},
},
});
const worker = await Worker.create({
connection,
namespace: "foo.bar", // as explained in Namespaces section
// ...
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
See below for more Worker options.
Workflow and Activity registration
Workers bundle Workflow code and node_modules
using Webpack v5 and execute them inside V8 isolates.
Activities are directly required and run by Workers in the Node.js environment.
Workers are very flexible – you can host any or all of your Workflows and Activities on a Worker, and you can host multiple Workers in a single machine.
There are three main things the Worker needs:
taskQueue
: the Task Queue to poll. This is the only required argument.activities
: Optional. Imported and supplied directly to the Worker. Not the path.- Workflow bundle:
- Either specify a
workflowsPath
to yourworkflows.ts
file to pass to Webpack, e.g.,require.resolve('./workflows')
. Workflows will be bundled with their dependencies, which you can fine-tune withnodeModulesPaths
. - Or pass a prebuilt bundle to
workflowBundle
instead if you prefer to handle the bundling yourself.
Additional Worker Options
This is a selected subset of options you are likely to use. Even more advanced options, particularly for performance tuning, are available in the API reference.
Options | Description |
---|---|
nodeModulesPaths | Array of paths of Workflow dependencies to pass to Webpack. Defaults to the first encountered node_modules directory when scanning the filesystem starting with workflowsPath . |
dataConverter | Encodes and decodes data entering and exiting a Temporal Server. Supports undefined , UintBArray , and JSON. |
sinks | Allows injection of Workflow Sinks (Advanced feature: see Logging docs) |
interceptors | A mapping of interceptor type to a list of factories or module paths (Advanced feature: see Interceptors) |
Operation guides:
Register multiple types
All Workers listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task. However, the failure of the Task does not cause the associated Workflow Execution to fail.
- Go
- Java
- PHP
- TypeScript
The RegisterWorkflow()
and RegisterActivity()
calls essentially create an in-memory mapping between the Workflow Types and their implementations, inside the Worker process.
Registering Activity structs
Per Activity Definition best practices, you might have an Activity struct that has multiple methods and fields.
When you use RegisterActivity()
for an Activity struct, that Worker has access to all exported methods.
Registering multiple Types
To register multiple Activity Types and/or Workflow Types with the Worker Entity, just make multiple Activity registration calls, but make sure each Type name is unique:
w.registerActivity(ActivityA)
w.registerActivity(ActivityB)
w.registerActivity(ActivityC)
w.registerWorkflow(WorkflowA)
w.registerWorkflow(WorkflowB)
w.registerWorkflow(WorkflowC)
Use worker.registerWorkflowImplementationTypes
to register Workflow type and worker.registerActivitiesImplementations
to register Activity implementation with Workers.
For Workflows, the Workflow Type is registered with a Worker. A Workflow Type can be registered only once per Worker entity. If you define multiple Workflow implementations of the same type, you get an exception at the time of registration.
For Activities, Activity implementation instances are registered with a Worker because they are stateless and thread-safe. You can pass any number of dependencies in the Activity implementation constructor, such as the database connections, services, etc.
The following example shows how to register a Workflow and an Activity with a Worker.
Worker worker = workerFactory.newWorker("your_task_queue");
...
// Register Workflow
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Register Activity
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
When you register a single instance of an Activity, you can have multiple instances of Workflow Executions calling the same Activity. Activity code must be thread-safe because the same instance of the Activity code is run for every Workflow Execution that calls it.
For DynamicWorkflow
, only one Workflow implementation that extends DynamicWorkflow
can be registered with a Worker.
The following example shows how to register the DynamicWorkflow
and DynamicActivity
implementation with a Worker.
public static void main(String[] arg) {
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE);
/* Register the Dynamic Workflow implementation with the Worker. Workflow implementations
** must be known to the Worker at runtime to dispatch Workflow Tasks.
*/
worker.registerWorkflowImplementationTypes(DynamicGreetingWorkflowImpl.class);
// Start all the Workers that are in this process.
factory.start();
/* Create the Workflow stub. Note that the Workflow type is not explicitly registered with the Worker. */
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions);
/**
* Register Dynamic Activity implementation with the Worker. Since Activities are stateless
* and thread-safe, we need to register a shared instance.
*/
worker.registerActivitiesImplementations(new DynamicGreetingActivityImpl());
/* Start Workflow Execution and immmediately send Signal. Pass in the Workflow args and Signal args. */
workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"});
// Wait for the Workflow to finish getting the results.
String result = workflow.getResult(String.class);
System.out.println(result);
System.exit(0);
}
}
You can register multiple type-specific Workflow implementations alongside a single DynamicWorkflow
implementation.
You can register only one Activity instance that implements DynamicActivity
with a Worker.
Content is not available
Content is not available
Start Workflow Execution
Workflow Execution semantics rely on several parameters—that is, to start a Workflow Execution you must supply a Task Queue that will be used for the Tasks (one that a Worker is polling), the Workflow Type, language-specific contextual data, and Workflow Function parameters.
In the examples below, all Workflow Executions are started using a Temporal Client. To spawn Workflow Executions from within another Workflow Execution, use either the Child Workflow or External Workflow APIs.
See the Customize Workflow Type section to see how to customize the name of the Workflow Type.
A request to spawn a Workflow Execution causes the Temporal Cluster to create the first Event (WorkflowExecutionStarted) in the Workflow Execution Event History. The Temporal Cluster then creates the first Workflow Task, resulting in the first WorkflowTaskScheduled Event.
- Go
- Java
- PHP
- TypeScript
To spawn a Workflow Execution, use the ExecuteWorkflow()
method on the Go SDK Client
.
The ExecuteWorkflow()
API call requires an instance of context.Context
, an instance of StartWorkflowOptions
, a Workflow Type name, and all variables to be passed to the Workflow Execution.
The ExecuteWorkflow()
call returns a Future, which can be used to get the result of the Workflow Execution.
package main
import (
// ...
"go.temporal.io/sdk/client"
)
func main() {
temporalClient, err := client.NewClient(client.Options{})
if err != nil {
// ...
}
defer temporalClient.Close()
// ...
workflowOptions := client.StartWorkflowOptions{
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
if err != nil {
// ...
}
// ...
}
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) (YourWorkflowResponse, error) {
// ...
}
If the invocation process has access to the function directly, then the Workflow Type name parameter can be passed as if the function name were a variable, without quotations.
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
If the invocation process does not have direct access to the statically defined Workflow Definition, for example, if the Workflow Definition is in an un-importable package, or it is written in a completely different language, then the Workflow Type can be provided as a string
.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, "YourWorkflowDefinition", param)
Use WorkflowStub
to start a Workflow Execution from within a Client, and ExternalWorkflowStub
to start a different Workflow Execution from within a Workflow.
See SignalwithStart
to start a Workflow Execution to receive a Signal from within another Workflow.
Using WorkflowStub
WorkflowStub
is a proxy generated by the WorkflowClient
.
Each time a new Workflow Execution is started, an instance of the Workflow implementation object is created.
Then, one of the methods (depending on the Workflow Type of the instance) annotated with @WorkflowMethod
can be invoked.
As soon as this method returns, the Workflow Execution is considered to be complete.
You can use a typed or untyped WorkflowStub
in the client code.
- Typed
WorkflowStub
are useful because they are type safe and allow you to invoke your Workflow methods such as@WorkflowMethod
,@QueryMethod
, and@SignalMethod
directly. - An untyped
WorkflowStub
does not use the Workflow interface, and is not type safe. It is more flexible because it has methods from theWorkflowStub
interface, such asstart
,signalWithStart
,getResults
(sync and async),query
,signal
,cancel
andterminate
. Note that the Temporal Java SDK also provides typedWorkflowStub
versions for these methods. When using untypedWorkflowStub
, we rely on the Workflow Type, Activity Type, Child Workflow Type, as well as Query and Signal names. For details, see Temporal Client.
A Workflow Execution can be started either synchronously or asynchronously.
Synchronous invocation starts a Workflow and then waits for its completion. If the process that started the Workflow crashes or stops waiting, the Workflow continues executing. Because Workflows are potentially long-running, and Client crashes happen, it is not very commonly found in production use. The following example is a type-safe approach for starting a Workflow Execution synchronously.
NotifyUserAccounts workflow = client.newWorkflowStub(
NotifyUserAccounts.class,
WorkflowOptions.newBuilder()
.setWorkflowId("notifyAccounts")
.setTaskQueue(taskQueue)
.build()
);
// start the Workflow and wait for a result.
workflow.notify(new String[] { "Account1", "Account2", "Account3", "Account4", "Account5",
"Account6", "Account7", "Account8", "Account9", "Account10"});
}
// notify(String[] accountIds) is a Workflow method defined in the Workflow Definition.Asynchronous start initiates a Workflow Execution and immediately returns to the caller. This is the most common way to start Workflows in production code. The
WorkflowClient
https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java) provides some static methods, such asstart
,execute
,signalWithStart
etc., that help with starting your Workflows asynchronously.The following examples show how to start Workflow Executions asynchronously, with either typed or untyped
WorkflowStub
.Typed WorkflowStub Example
// create typed Workflow stub
FileProcessingWorkflow workflow = client.newWorkflowStub(FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(taskQueue)
.setWorkflowId(workflowId)
.build());
// use WorkflowClient.execute (if your Workflow takes in arguments) or WorkflowClient.start (for zero arguments)
WorkflowClient.start(workflow::greetCustomer);Untyped WorkflowStub Example
WorkflowStub untyped = client.newUntypedWorkflowStub("FileProcessingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
// blocks until Workflow Execution has been started (not until it completes)
untyped.start(argument);
You can call a Dynamic Workflow implementation using an untyped WorkflowStub
.
The following example shows how to call the Dynamic Workflow implementation in the Client code.
WorkflowClient client = WorkflowClient.newInstance(service);
/**
* Note that for this part of the client code, the dynamic Workflow implementation must
* be known to the Worker at runtime in order to dispatch Workflow tasks, and may be defined
* in the Worker definition as:*/
// worker.registerWorkflowImplementationTypes(DynamicGreetingWorkflowImpl.class);
/* Create the Workflow stub to call the dynamic Workflow.
* Note that the Workflow type is not explicitly registered with the Worker.*/
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions);
DynamicWorkflow
can be used to invoke different Workflow Types.
To check what type is running when your Dynamic Workflow execute
method runs, use getWorkflowType()
in the implementation code.
String type = Workflow.getInfo().getWorkflowType();
See Workflow Execution Result for details on how to get the results of the Workflow Execution.
Using ExternalWorkflowStub
Use ExternalWorkflowStub
within a Workflow to invoke, and send Signals to, other Workflows by type.
This helps particularly for executing Workflows written in other language SDKs, as shown in the following example.
@Override
public String yourWFMethod(String name) {
ExternalWorkflowStub callOtherWorkflow = Workflow.newUntypedExternalWorkflowStub("OtherWFId");
}
See the Temporal Polyglot code for examples of executing Workflows written in other language SDKs.
Recurring start
You can start a Workflow Execution on a regular schedule by using setCronSchedule
Workflow option in the Client code.
Content is not available
When you have a Workflow Client, you can schedule the start of a Workflow with client.start()
, specifying workflowId
, taskQueue
, and args
and returning a Workflow handle immediately after the Server acknowledges the receipt.
const handle = await client.start(example, {
workflowId: "your-workflow-id",
taskQueue: "your-task-queue",
args: ["argument01", "argument02", "argument03"], // this is typechecked against workflowFn's args
});
const handle = client.getHandle(workflowId);
const result = await handle.result();
Calling client.start()
and client.execute()
send a command to Temporal Server to schedule a new Workflow Execution on the specified Task Queue. It does not actually start until a Worker that has a matching Workflow Type, polling that Task Queue, picks it up.
You can test this by executing a Workflow Client command without a matching Worker. Temporal Server records the command in Event History, but does not make progress with the Workflow Execution until a Worker starts polling with a matching Task Queue and Workflow Definition.
Workflow Execution run in a separate V8 isolate context in order to provide a deterministic runtime.
Set Task Queue
The only Workflow Option that must be set is the name of the Task Queue.
For any code to execute, a Worker Process must be running that contains a Worker Entity that is polling the same Task Queue name.
- Go
- Java
- PHP
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the TaskQueue
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: None, this is a required field to be set by the developer
workflowOptions := client.StartWorkflowOptions{
// ...
TaskQueue: "your-task-queue",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Task Queue with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setTaskQueue
.
- Type:
String
- Default: none
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
// Set the Task Queue
.setTaskQueue(WorkerGreet.TASK_QUEUE)
.build());
Content is not available
A Task Queue is a dynamic queue in Temporal polled by one or more Workers.
Workers bundle Workflow code and node modules using Webpack v5 and execute them inside V8 isolates. Activities are directly required and run by Workers in the Node.js environment.
Workers are flexible. You can host any or all of your Workflows and Activities on a Worker, and you can host multiple Workers on a single machine.
There are three main things the Worker needs:
taskQueue
: the Task Queue to poll. This is the only required argument.activities
: Optional. Imported and supplied directly to the Worker.- Workflow bundle, specify one of the following options:
- a
workflowsPath
to yourworkflows.ts
file to pass to Webpack. For example,require.resolve('./workflows')
. Workflows will be bundled with their dependencies, which you can finetune withnodeModulesPaths
. - Or pass a prebuilt bundle to
workflowBundle
, if you prefer to handle the bundling yourself.
- a
- TypeScript
- JavaScript
import {Worker} from "@temporalio/worker";
import * as activities from "./activities";
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: "hello-world",
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// /typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `tutorial` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
import { Worker } from "@temporalio/worker";
import * as activities from "./activities";
async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve("./workflows"),
activities,
taskQueue: "hello-world",
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// /typescript/security#encryption-in-transit-with-mtls
// Step 2: Start accepting tasks on the `tutorial` queue
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
taskQueue
is the only required option; however, use workflowsPath
and activities
to register Workflows and Activities with the Worker.
When scheduling a Workflow, a taskQueue
must be specified.
- TypeScript
- JavaScript
import {Connection, WorkflowClient} from "@temporalio/client";
// This is the code that is used to start a workflow.
const connection = await Connection.create();
const client = new WorkflowClient({connection});
const result = await client.execute(myWorkflow, {
taskQueue: "your-task-queue", // required
workflowId: "your-workflow-id", // required
});
import { Connection, WorkflowClient } from "@temporalio/client";
// This is the code that is used to start a workflow.
const connection = await Connection.create();
const client = new WorkflowClient({ connection });
const result = await client.execute(myWorkflow, {
taskQueue: "your-task-queue",
workflowId: "your-workflow-id", // required
});
When creating a Worker, you must pass the taskQueue
option to the Worker.create()
function.
- TypeScript
- JavaScript
const worker = await Worker.create({
activities, // imported elsewhere
taskQueue: "your-task-queue",
});
const worker = await Worker.create({
activities,
taskQueue: "your-task-queue",
});
Optionally, in Workflow code, when calling an Activity, you can specify the Task Queue by passing the taskQueue
option to proxyActivities()
, startChild()
, or executeChild()
. If you do not specify a taskQueue
, then the TypeScript SDK places Activity and Child Workflow Tasks in the same Task Queue as the Workflow Task Queue.
Set Workflow Id
Although it is not required, we recommend providing your own Workflow Id that maps to a business process or business entity identifier, such as an order identifier or customer identifier.
- Go
- Java
- PHP
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the ID
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: System generated UUID
workflowOptions := client.StartWorkflowOptions{
// ...
ID: "Your-Custom-Workflow-Id",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Id with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setWorkflowId
.
- Type:
String
- Default: none
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
// Set the Workflow Id
.setWorkflowId("YourWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
.build());
Content is not available
Connect to a Client with client.start()
and any arguments. Then specify your taskQueue
and set your workflowId
to a meaningful business identifier.
const handle = await client.start(example, {
workflowId: "yourWorkflowId",
taskQueue: "yourTaskQueue",
args: ["your", "arg", "uments"],
});
This starts a new Client with the given Workflow Id, Task Queue name, and an argument.
Get Workflow results
If the call to start a Workflow Execution is successful, you will gain access to the Workflow Execution's Run Id.
The Workflow Id, Run Id, and Namespace may be used to uniquely identify a Workflow Execution in the system and get its result.
It's possible to both block progress on the result (synchronous execution) or get the result at some other point in time (asynchronous execution).
In the Temporal Platform, it's also acceptable to use Queries as the preferred method for accessing the state and results of Workflow Executions.
- Go
- Java
- PHP
- TypeScript
The ExecuteWorkflow
call returns an instance of WorkflowRun
, which is the workflowRun
variable in the following line.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, app.YourWorkflowDefinition, param)
if err != nil {
// ...
}
// ...
}
The instance of WorkflowRun
has the following three methods:
GetWorkflowID()
: Returns the Workflow Id of the invoked Workflow Execution.GetRunID()
: Always returns the Run Id of the initial Run (See Continue As New) in the series of Runs that make up the full Workflow Execution.Get
: Takes a pointer as a parameter and populates the associated variable with the Workflow Execution result.
To wait on the result of Workflow Execution in the same process that invoked it, call Get()
on the instance of WorkflowRun
that is returned by the ExecuteWorkflow()
call.
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition, param)
if err != nil {
// ...
}
var result YourWorkflowResponse
err = workflowRun.Get(context.Background(), &result)
if err != nil {
// ...
}
// ...
}
However, the result of a Workflow Execution can be obtained from a completely different process. All that is needed is the Workflow Id. (A Run Id is optional if more than one closed Workflow Execution has the same Workflow Id.) The result of the Workflow Execution is available for as long as the Workflow Execution Event History remains in the system.
Call the GetWorkflow()
method on an instance of the Go SDK Client and pass it the Workflow Id used to spawn the Workflow Execution.
Then call the Get()
method on the instance of WorkflowRun
that is returned, passing it a pointer to populate the result.
// ...
workflowID := "Your-Custom-Workflow-Id"
workflowRun := c.GetWorkflow(context.Background, workflowID)
var result YourWorkflowResponse
err = workflowRun.Get(context.Background(), &result)
if err != nil {
// ...
}
// ...
Get last completion result
In the case of a Temporal Cron Job, you might need to get the result of the previous Workflow Run and use it in the current Workflow Run.
To do this, use the HasLastCompletionResult
and GetLastCompletionResult
APIs, available from the go.temporal.io/sdk/workflow
package, directly in your Workflow code.
type CronResult struct {
Count int
}
func YourCronWorkflowDefinition(ctx workflow.Context) (CronResult, error) {
count := 1
if workflow.HasLastCompletionResult(ctx) {
var lastResult CronResult
if err := workflow.GetLastCompletionResult(ctx, &lastResult); err == nil {
count = count + lastResult.Count
}
}
newResult := CronResult {
Count: count,
}
return newResult, nil
}
This will work even if one of the cron Workflow Runs fails. The next Workflow Run gets the result of the last successfully Completed Workflow Run.
A synchronous Workflow Execution blocks your client thread until the Workflow Execution completes (or fails) and get the results (or error in case of failure).
The following example is a type-safe approach for getting the results of a synchronous Workflow Execution.
FileProcessingWorkflow workflow = client.newWorkflowStub(
FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build();
// start sync and wait for results (or failure)
String result = workflow.processfile(new Argument());
An asynchronous Workflow Execution immediately returns a value to the caller.
The following examples show how to get the results of a Workflow Execution through typed and untyped WorkflowStub
.
Typed WorkflowStub Example
// create typed Workflow stub
FileProcessingWorkflow workflow = client.newWorkflowStub(FileProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(taskQueue)
.setWorkflowId(workflowId)
.build());
// use WorkflowClient.execute (if your Workflow takes in arguments) or WorkflowClient.start (for zero arguments)
WorkflowClient.start(workflow::greetCustomer);Untyped WorkflowStub Example
WorkflowStub untyped = client.newUntypedWorkflowStub("FileProcessingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
// blocks until Workflow Execution has been started (not until it completes)
untyped.start(argument);
If you need to wait for a Workflow Execution to complete after an asynchronous start, the most straightforward way is to call the blocking Workflow instance again.
Note that if WorkflowOptions.WorkflowIdReusePolicy
is not set to AllowDuplicate
, then instead of throwing DuplicateWorkflowException
, it reconnects to an existing Workflow and waits for its completion.
The following example shows how to do this from a different process than the one that started the Workflow Execution.
YourWorkflow workflow = client.newWorkflowStub(YourWorkflow.class, workflowId);
// Returns the result after waiting for the Workflow to complete.
String result = workflow.yourMethod();
Another way to connect to an existing Workflow and wait for its completion from another process, is to use UntypedWorkflowStub
. For example:
WorkflowStub workflowStub = client.newUntypedWorkflowStub(workflowType, workflowOptions);
// Returns the result after waiting for the Workflow to complete.
String result = untyped.getResult(String.class);
Get last (successful) completion result
For a Temporal Cron Job, get the result of previous successful runs using GetLastCompletionResult()
.
The method returns null
if there is no previous completion.
The following example shows how to implement this in a Workflow.
public String cronWorkflow() {
String lastProcessedFileName = Workflow.getLastCompletionResult(String.class);
// Process work starting from the lastProcessedFileName.
// Business logic implementation goes here.
// Updates lastProcessedFileName to the new value.
return lastProcessedFileName;
}
Note that this works even if one of the Cron schedule runs failed. The next schedule will still get the last successful result if it ever successfully completed at least once. For example, for a daily cron Workflow, if the run succeeds on the first day and fails on the second day, then the third day run will get the result from first day's run using these APIs.
Content is not available
To return the results of a Workflow Execution:
return (
"Completed " +
wf.workflowInfo().workflowId +
", Total Charged: " +
totalCharged
);
totalCharged
is just a function declared in your code. For a full example, see subscription-workflow-project-template-typescript/src/workflows.ts.
A Workflow function may return a result. If it doesn’t (in which case the return type is Promise<void>
), the result will be undefined
.
If you started a Workflow with handle.start()
, you can choose to wait for the result anytime with handle.result()
.
const handle = client.getHandle(workflowId);
const result = await handle.result();
Using a Workflow Handle isn't necessary with client.execute()
.
Workflows that prematurely end will throw a WorkflowFailedError
if you call result()
.
If you call result()
on a Workflow that prematurely ended for some reason, it throws a WorkflowFailedError
error that reflects the reason. For that reason, it is recommended to catch that error.
const handle = client.getHandle(workflowId);
try {
const result = await handle.result();
} catch (err) {
if (err instanceof WorkflowFailedError) {
throw new Error("Temporal workflow failed: " + workflowId, {
cause: err,
});
} else {
throw new Error("error from Temporal workflow " + workflowId, {
cause: err,
});
}
}
Features
This section covers many of the features that are available to use in your Temporal Application.
Signals
A Signal is a message sent to a running Workflow Execution.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
Define Signal
A Signal has a name and can have arguments. The arguments must be serializable.
- Go
- Java
- PHP
- TypeScript
Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter.
The Receive()
method on the Data Converter decodes the data into the Struct within the Workflow.
Only public fields are serializable.
MySignal struct {
Message string // serializable
message string // not serializable
}
The @SignalMethod
annotation indicates that the method is used to handle and react to external Signals.
@SignalMethod
void mySignal(String signalName);
The method can have parameters that contain the Signal payload and must be serializable by the default Jackson JSON Payload Converter.
void mySignal(String signalName, Object... args);
This method does not return a value and must have a void
return type.
Things to consider when defining Signals:
- Use Workflow object constructors and initialization blocks to initialize the internal data structures if possible.
- Signals might be received by a Workflow before the Workflow method is executed. When implementing Signals in scenarios where this can occur, assume that no parts of Workflow code ran. In some cases, Signal method implementation might require some initialization to be performed by the Workflow method code first—for example, when the Signal processing depends on, and is defined by the Workflow input. In this case, you can use a flag to determine whether the Workflow method is already triggered; if not, persist the Signal data into a collection for delayed processing by the Workflow method.
Content is not available
- TypeScript
- JavaScript
import {defineSignal} from "@temporalio/workflow";
interface JoinInput {
userId: string;
groupId: string;
}
const joinSignal = defineSignal<JoinInput>("join");
import { defineSignal } from "@temporalio/workflow";
const joinSignal = defineSignal("join");
Handle Signal
Workflows listen for Signals by the Signal's name.
- Go
- Java
- PHP
- TypeScript
Use the GetSignalChannel()
API from the go.temporal.io/sdk/workflow
package to get the Signal Channel.
Get a new Selector
and pass it the Signal Channel and a callback function to handle the payload.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(channel workflow.ReceiveChannel, more bool) {
channel.Receive(ctx, &signal)
// ...
})
selector.Select(ctx)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}
In the example above, the Workflow code uses workflow.GetSignalChannel
to open a workflow.Channel
for the Signal type (identified by the Signal name).
We then use a workflow.Selector
and the AddReceive()
to wait on a Signal from this channel.
The more
bool in the callback function indicates that channel is not closed and more deliveries are possible.
Use the @SignalMethod
annotation to handle Signals in the Workflow interface.
The Signal type defaults to the name of the method. In the following example, the Signal type defaults to retryNow
.
@WorkflowInterface
public interface FileProcessingWorkflow {
@WorkflowMethod
String processFile(Arguments args);
@SignalMethod
void retryNow();
}
To overwrite this default naming and assign a custom Signal type, use the @SignalMethod
annotation with the name
parameter.
In the following example, the Signal type is set to "retrysignal".
@WorkflowInterface
public interface FileProcessingWorkflow {
@WorkflowMethod
String processFile(Arguments args);
@SignalMethod(name = "retrysignal")
void retryNow();
}
A Workflow interface can define any number of methods annotated with @SignalMethod
, but the method names or the name
parameters for each must be unique.
In the following example, we define a Signal method "updateGreeting" to update the greeting in the Workflow.
We set a Workflow.await
in the Workflow implementation to block the current Workflow Execution until the provided unblock condition is evaluated to true
.
In this case, the unblocking condition is evaluated to true
when the Signal to update the greeting is received.
@WorkflowInterface
public interface HelloWorld {
@WorkflowMethod
void sayHello(String name);
@SignalMethod
void updateGreeting(String greeting);
}
public class HelloWorldImpl implements HelloWorld {
private final Logger workflowLogger = Workflow.getLogger(HelloWorldImpl.class);
private String greeting;
@Override
public void sayHello(String name) {
int count = 0;
while (!"Bye".equals(greeting)) {
String oldGreeting = greeting;
Workflow.await(() -> !Objects.equals(greeting, oldGreeting));
}
workflowLogger.info(++count + ": " + greeting + " " + name + "!");
}
@Override
public void updateGreeting(String greeting) {
this.greeting = greeting;
}
}
This Workflow completes when the Signal updates the greeting to "Bye".
Dynamic Signal Handler You can also implement Signal handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicSignalListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicSignalHandler)
(signalName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Signals sent to the Workflow without a defined handler will be delivered to the DynamicSignalHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicSignalHandler
can be implemented in both regular and dynamic Workflow implementations.
Content is not available
- TypeScript
- JavaScript
import {setHandler} from "@temporalio/workflow";
export async function myWorkflow() {
const groups = new Map<string, Set<string>>();
setHandler(joinSignal, ({userId, groupId}: JoinInput) => {
const group = groups.get(groupId);
if (group) {
group.add(userId);
} else {
groups.set(groupId, new Set([userId]));
}
});
}
import { setHandler } from "@temporalio/workflow";
export async function myWorkflow() {
const groups = new Map();
setHandler(joinSignal, ({ userId, groupId }) => {
const group = groups.get(groupId);
if (group) {
group.add(userId);
}
else {
groups.set(groupId, new Set([userId]));
}
});
}
Send Signal from Client
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.
- Go
- Java
- PHP
- TypeScript
Use the SignalWorkflow()
method on an instance of the Go SDK Temporal Client to send a Signal to a Workflow Execution.
Pass in both the Workflow Id and Run Id to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...
Possible errors:
serviceerror.NotFound
serviceerror.Internal
serviceerror.Unavailable
To send a Signal to a Workflow Execution from a Client, call the Signal method, annotated with @SignalMethod
in the Workflow interface, from the Client code.
In the following Client code example, we start the Workflow "greetCustomer" and call the Signal method "addCustomer" that is handled in the Workflow.
// create a typed Workflow stub for GreetingsWorkflow
GreetingsWorkflow workflow = client.newWorkflowStub(GreetingsWorkflow.class,
WorkflowOptions.newBuilder()
// set the Task Queue
.setTaskQueue(taskQueue)
// Workflow Id is recommended but not required
.setWorkflowId(workflowId)
.build());
// start the Workflow
WorkflowClient.start(workflow::greetCustomer);
// send a Signal to the Workflow
Customer customer = new Customer("John", "Spanish", "john@john.com");
workflow.addCustomer(customer); //addCustomer is the Signal method defined in the greetCustomer Workflow.
See Handle Signals for details on how to handle Signals in a Workflow.
Content is not available
import {WorkflowClient} from "@temporalio/client";
import {joinSignal} from "./workflows";
const client = new WorkflowClient();
const handle = await client.getHandle("workflow-id-123");
await handle.signal(joinSignal, {userId: "user-1", groupId: "group-1"});
Send Signal from Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
- Go
- Java
- PHP
- TypeScript
A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow
API from the go.temporal.io/sdk/workflow
package.
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signalData).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}
To send a Signal from within a Workflow to a different Workflow Execution, initiate an ExternalWorkflowStub
in the implementation of the current Workflow and call the Signal method defined in the other Workflow.
The following example shows how to use an untyped ExternalWorkflowStub
in the Workflow implementation to send a Signal to another Workflow.
public String sendGreeting(String name) {
// initiate ExternalWorkflowStub to call another Workflow by its Id "ReplyWF"
ExternalWorkflowStub callRespondWorkflow = Workflow.newUntypedExternalWorkflowStub("ReplyWF");
String responseTrigger = activity.greeting("Hello", name);
// send a Signal from this sendGreeting Workflow to the other Workflow
// by calling the Signal method name "getGreetCall" defined in that Workflow.
callRespondWorkflow.signal("getGreetCall", responseTrigger);
return responseTrigger;
Content is not available
import {getExternalWorkflowHandle} from "@temporalio/workflow";
import {joinSignal} from "./other-workflow";
export async function myWorkflowThatSignals() {
const handle = getExternalWorkflowHandle("workflow-id-123");
await handle.signal(joinSignal, {userId: "user-1", groupId: "group-1"});
}
Send Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
- Go
- Java
- PHP
- TypeScript
Use the SignalWithStartWorkflow()
API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.
Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
To send Signals to a Workflow Execution whose status is unknown, use SignalWithStart
with a WorkflowStub
in the Client code.
This method ensures that if the Workflow Execution is in a closed state, a new Workflow Execution is spawned and the Signal is delivered to the running Workflow Execution.
Note that when the SignalwithStart
spawns a new Workflow Execution, the Signal is delivered before the call to your @WorkflowMethod
.
This means that the Signal handler in your Workflow interface code will execute before the @WorkfowMethod
.
You must ensure that your code logic can deal with this.
In the following example, the Client code uses SignalwithStart
to send the Signal "setCustomer" to the UntypedWorkflowStub
named "GreetingWorkflow".
If the "GreetingWorkflow" Workflow Execution is not running, the SignalwithStart
starts the Workflow Execution.
...
public static void signalWithStart() {
// WorkflowStub is a client-side stub to a single Workflow instance
WorkflowStub untypedWorkflowStub = client.newUntypedWorkflowStub("GreetingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
untypedWorkflowStub.signalWithStart("setCustomer", new Object[] {customer2}, new Object[] {customer1});
printWorkflowStatus();
try {
String greeting = untypedWorkflowStub.getResult(String.class);
printWorkflowStatus();
System.out.println("Greeting: " + greeting);
} catch(WorkflowFailedException e) {
System.out.println("Workflow failed: " + e.getCause().getMessage());
printWorkflowStatus();
}
}
...
The following example shows the Workflow interface for the "GreetingWorkflow" called in the previous example.
...
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String greet(Customer customer);
@SignalMethod
void setCustomer(Customer customer);
@QueryMethod
Customer getCustomer();
...
}
Note that the Signal handler "setCustomer" is executed before the @WorkflowMethod
"greet" is called.
Content is not available
WorkflowClient.signalWithStart
import {WorkflowClient} from "@temporalio/client";
import {myWorkflow, joinSignal} from "./workflows";
const client = new WorkflowClient();
await client.signalWithStart(myWorkflow, {
workflowId: "workflow-id-123",
args: [{foo: 1}],
signal: joinSignal,
signalArgs: [{userId: "user-1", groupId: "group-1"}],
});
Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
Query name
A Query name (also called Query type) is simply a string name.
- Go
- Java
- PHP
- TypeScript
In Go, a Query type, also called a Query name, is a string
value.
queryType := "your_query_name"
To define a Query, define the method name and the result type of the Query.
query(String queryType, Class<R> resultClass, Type resultType, Object... args);
/* @param queryType name of the Query handler. Usually it is a method name.
* @param resultClass class of the Query result type
* @param args optional Query arguments
* @param <R> type of the Query result
*/
Query methods can take in any number of input parameters which can be used to limit the data that is returned.
Use the Query method names to send and receive Queries.
Query methods must never change any Workflow state including starting Activities or blocking threads in any way.
Content is not available
Content is not available
Send Query
Queries are sent from a Temporal Client.
- Go
- Java
- PHP
- TypeScript
Use the QueryWorkflow()
API or the QueryWorkflowWithOptions
API on the Temporal Client to send a Query to a Workflow Execution.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...
You can pass an arbitrary number of arguments to the QueryWorkflow()
function.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...
The QueryWorkflowWithOptions()
API provides similar functionality, but with the ability to set additional configurations through QueryWorkflowWithOptionsRequest.
When using this API, you will also receive a structured response of type QueryWorkflowWithOptionsResponse.
// ...
response, err := temporalClient.QueryWorkflowWithOptions(context.Background(), &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryType: queryType,
Args: args,
})
if err != nil {
// ...
}
To send a Query to a Workflow Execution from an external process, call the Query method (defined in the Workflow) from a WorkflowStub
within the Client code.
For example, the following Client code calls a Query method queryGreeting()
defined in the GreetingWorkflow
Workflow interface.
// Create our workflow options
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE).build();
// Create the Temporal client stub. It is used to start our workflow execution.
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
// Start our workflow asynchronously to not use another thread to query.
WorkflowClient.start(workflow::createGreeting, "World");
// Query the Workflow to get the current value of greeting and print it.
System.out.println(workflow.queryGreeting());
Content is not available
Content is not available
Handle Query
Queries are handled by your Workflow.
Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.
- Go
- Java
- PHP
- TypeScript
Use the SetQueryHandler
API from the go.temporal.io/sdk/workflow
package to set a Query Handler that listens for a Query by name.
The handler must be a function that returns two values:
- A serializable result
- An error
The handler function can receive any number of input parameters, but all input parameters must be serializable.
The following sample code sets up a Query Handler that handles the current_state
Query type:
func MyWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = WithActivityOptions(ctx, myActivityOptions)
err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}
For example, suppose your query handler function takes two parameters:
err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})
To handle a Query in the Workflow, create a Query handler using the @QueryMethod
annotation in the Workflow interface and define it in the Workflow implementation.
The @QueryMethod
annotation indicates that the method is used to handle a Query that is sent to the Workflow Execution.
The method can have parameters that can be used to filter data that the Query returns.
Because the method returns a value, it must have a return type that is not void
.
The Query name defaults to the name of the method.
In the following example, the Query name defaults to getStatus
.
@WorkflowInterface
public interface FileProcessingWorkflow {
@QueryMethod
String getStatus();
}
To overwrite this default naming and assign a custom Query name, use the @QueryMethod
annotation with the name
parameter. In the following example, the Query name is set to "history".
@WorkflowInterface
public interface FileProcessingWorkflow {
@QueryMethod(name = "history")
String getStatus();
}
A Workflow Definition interface can define multiple methods annotated with @QueryMethod
, but the method names or the name
parameters for each must be unique.
The following Workflow interface has a Query method getCount()
to handle Queries to this Workflow.
@WorkflowInterface
public interface HelloWorld {
@WorkflowMethod
void sayHello(String name);
@QueryMethod
int getCount();
}
The following example is the Workflow implementation with the Query method defined in the HelloWorld
Workflow interface from the previous exmaple.
public static class HelloWorldImpl implements HelloWorld {
private String greeting = "Hello";
private int count = 0;
@Override
public void sayHello(String name) {
while (!"Bye".equals(greeting)) {
logger.info(++count + ": " + greeting + " " + name + "!");
String oldGreeting = greeting;
Workflow.await(() -> !Objects.equals(greeting, oldGreeting));
}
logger.info(++count + ": " + greeting + " " + name + "!");
}
@Override
public int getCount() {
return count;
}
}
Dynamic Query Handler You can also implement Query handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicQueryListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicQueryHandler)
(queryName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Queries sent to the Workflow without a defined handler will be delivered to the DynamicQueryHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicQueryHandler
can be implemented in both regular and dynamic Workflow implementations.
Content is not available
Query Handlers can return values inside a Workflow in TypeScript.
You make a Query with handle.query(query, ...args)
. A Query needs a return value, but can also take arguments.
import * as wf from "@temporalio/workflow";
export const unblockSignal = wf.defineSignal("unblock");
export const isBlockedQuery = wf.defineQuery<boolean>("isBlocked");
export async function unblockOrCancel(): Promise<void> {
let isBlocked = true;
wf.setHandler(unblockSignal, () => void (isBlocked = false));
wf.setHandler(isBlockedQuery, () => isBlocked);
console.log("Blocked");
try {
await wf.condition(() => !isBlocked);
console.log("Unblocked");
} catch (err) {
if (err instanceof wf.CancelledFailure) {
console.log("Cancelled");
}
throw err;
}
}
Workflow timeouts & retries
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution. A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Workflow Execution Timeout
Use the Workflow Execution Timeout to limit the maximum time that a Workflow Execution can be executing (have an Open status) including retries and any usage of Continue As New.
- Go
- Java
- PHP
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the WorkflowExecutionTimeout
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
time.Duration
- Default: Unlimited
workflowOptions := client.StartWorkflowOptions{
// ...
WorkflowExecutionTimeout: time.Hours * 24 * 365 * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Execution Timeout with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setWorkflowExecutionTimeout
.
- Type:
time.Duration
- Default: Unlimited
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Execution Timeout duration
.setWorkflowExecutionTimeout(Duration.ofSeconds(10))
.build());
Content is not available
Content is not available
Workflow Run Timeout
Use the Workflow Run Timeout to restrict the maximum amount of time that a single Workflow Run can last.
- Go
- Java
- PHP
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the WorkflowRunTimeout
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
time.Duration
- Default: Same as
WorkflowExecutionTimeout
workflowOptions := client.StartWorkflowOptions{
WorkflowRunTimeout: time.Hours * 24 * 365 * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Run Timeout with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setWorkflowRunTimeout
.
- Type:
time.Duration
- Default: Same as WorkflowExecutionTimeout.
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Run Timeout duration
.setWorkflowRunTimeout(Duration.ofSeconds(10))
.build());
Content is not available
Content is not available
Workflow Task Timeout
Use the Workflow Task Timeout to restrict the maximum amount of time that a Worker can execute a Workflow Task.
- Go
- Java
- PHP
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the WorkflowTaskTimeout
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
time.Duration
- Default:
time.Seconds * 10
workflowOptions := client.StartWorkflowOptions{
WorkflowTaskTimeout: time.Second * 10,
//...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Workflow Task Timeout with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setWorkflowTaskTimeout
.
- Type:
time.Duration
- Default: 10 seconds.
- Values: Maximum accepted value is 60 seconds.
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Task Timeout duration
.setWorkflowTaskTimeout(Duration.ofSeconds(10))
.build());
Content is not available
Content is not available
Workflow Retry Policy
Use a Retry Policy to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
- Go
- Java
- PHP
- TypeScript
Create an instance of a RetryPolicy
from the go.temporal.io/sdk/temporal
package and provide it as the value to the RetryPolicy
field of the instance of StartWorkflowOptions
.
- Type:
RetryPolicy
- Default: None
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
workflowOptions := client.StartWorkflowOptions{
RetryPolicy: retrypolicy,
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set Workflow Retry Options in the WorkflowStub
instance using WorkflowOptions.Builder.setWorkflowRetryOptions
.
- Type:
RetryOptions
- Default:
Null
which means no retries will be attempted.
//create Workflow stub for GreetWorkflowInterface
GreetWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("GreetWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Retry Options
.setRetryOptions(RetryOptions.newBuilder()
.build());
Content is not available
Content is not available
Activity timeouts & retries
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution. A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Schedule-To-Close Timeout
Use the Schedule-To-Close Timeout to limit the maximum duration of an Activity Execution.
- Go
- Java
- PHP
- TypeScript
To set a Schedule-To-Close Timeout, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the ScheduleToCloseTimeout
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
This or StartToCloseTimeout
must be set.
- Type:
time.Duration
- Default: ∞ (infinity - no limit)
activityoptions := workflow.ActivityOptions{
ScheduleToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}