Skip to main content

Observability - TypeScript SDK feature guide

The observability section of the Temporal Developer's guide covers the many ways to view the current state of your Temporal Application—that is, ways to view which Workflow Executions are tracked by the Temporal Platform and the state of any specified Workflow Execution, either currently or at points of an execution.

This section covers features related to viewing the state of the application, including:

How to emit metrics

Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics reference.

Metrics can be scraped and stored in time series databases, such as:

Temporal also provides a dashboard you can integrate with graphing services like Grafana. For more information, see:

Workers can emit metrics and traces. There are a few telemetry options that can be provided to Runtime.install. The common options are:

  • metrics: { otel: { url } }: The URL of a gRPC OpenTelemetry collector.
  • metrics: { prometheus: { bindAddress } }: Address on the Worker host that will have metrics for Prometheus to scrape.

To set up tracing of Workflows and Activities, use our opentelemetry-interceptors package. (For details, see the next section.)

telemetryOptions: {
metrics: {
prometheus: { bindAddress: '0.0.0.0:9464' },
},
logging: { forward: { level: 'DEBUG' } },
},

How to setup Tracing

Tracing allows you to view the call graph of a Workflow along with its Activities and any Child Workflows.

Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.

For information about how to configure exporters and instrument your code, see Tracing Temporal Services with OTEL.

The interceptors-opentelemetry sample shows how to use the SDK's built-in OpenTelemetry tracing to trace everything from starting a Workflow to Workflow Execution to running an Activity from that Workflow.

The built-in tracing uses protobuf message headers (like this one when starting a Workflow) to propagate the tracing information from the client to the Workflow and from the Workflow to its successors (when Continued As New), children, and Activities. All of these executions are linked with a single trace identifier and have the proper parent -> child span relation.

Tracing is compatible between different Temporal SDKs as long as compatible context propagators are used.

Context propagation

The TypeScript SDK uses the global OpenTelemetry propagator.

To extend the default (Trace Context and Baggage propagators) to also include the Jaeger propagator, follow these steps:

  • npm i @opentelemetry/propagator-jaeger

  • At the top level of your Workflow code, add the following lines:

    import { propagation } from '@opentelemetry/api';
    import {
    CompositePropagator,
    W3CBaggagePropagator,
    W3CTraceContextPropagator,
    } from '@opentelemetry/core';
    import { JaegerPropagator } from '@opentelemetry/propagator-jaeger';

    propagation.setGlobalPropagator(
    new CompositePropagator({
    propagators: [
    new W3CTraceContextPropagator(),
    new W3CBaggagePropagator(),
    new JaegerPropagator(),
    ],
    }),
    );

Similarly, you can customize the OpenTelemetry NodeSDK propagators by following the instructions in the Initialize the SDK section of the README.md file.

How to log from a Workflow in TypeScript

Send logs and errors to a logging service, so that when things go wrong, you can see what happened.

The SDK core uses WARN for its default logging level.

Sample available

A complete sample for setting up the instrumentation for the different components of the SDK is available in our samples repo: Instrumentation demo.

Logging from Activities

Activities run in the standard Node.js environment and can use any Node.js logger.

Inject Activity context via interceptor and log all Activity Executions

instrumentation/src/activities/interceptors.ts

import { Context } from '@temporalio/activity';
import {
ActivityExecuteInput,
ActivityInboundCallsInterceptor,
Next,
} from '@temporalio/worker';
import { Logger } from 'winston';

/** An Activity Context with an attached logger */
export interface ContextWithLogger extends Context {
logger: Logger;
}

/** Get the current Activity context with an attached logger */
export function getContext(): ContextWithLogger {
return Context.current() as ContextWithLogger;
}

/** Logs Activity executions and their duration */
export class ActivityInboundLogInterceptor
implements ActivityInboundCallsInterceptor
{
public readonly logger: Logger;

constructor(ctx: Context, logger: Logger) {
this.logger = logger.child({
activity: ctx.info,
});

// Set a logger instance on the current Activity Context to provide
// contextual logging information to each log entry generated by the Activity.
(ctx as ContextWithLogger).logger = this.logger;
}

async execute(
input: ActivityExecuteInput,
next: Next<ActivityInboundCallsInterceptor, 'execute'>,
): Promise<unknown> {
let error: any = undefined;
const startTime = process.hrtime.bigint();
try {
return await next(input);
} catch (err: any) {
error = err;
throw err;
} finally {
const durationNanos = process.hrtime.bigint() - startTime;
const durationMs = Number(durationNanos / 1_000_000n);
if (error) {
this.logger.error('activity failed', { error, durationMs });
} else {
this.logger.debug('activity completed', { durationMs });
}
}
}
}

Use the injected logger from an Activity

custom-logger/src/activities/index.ts

export async function greet(name: string): Promise<string> {
const { log } = Context.current();
log.info('Log from activity', { name });
return `Hello, ${name}!`;
}

Logging from Workflows with Workflow sinks

Logging from Workflows is tricky for two reasons:

  1. Workflows run in a sandboxed environment and cannot do any I/O.
  2. Workflow code might get replayed at any time, generating duplicate log messages.

To work around these limitations, we recommend using the sinks feature in the TypeScript SDK. Sinks enable one-way export of logs, metrics, and traces from the Workflow isolate to the Node.js environment.

Sinks are written as objects with methods. Similar to Activities, they are declared in the Worker and then proxied in Workflow code, and it helps to share types between both.

Comparing Sinks, Activities, and interceptors

Sinks are similar to Activities in that they are both registered on the Worker and proxied into the Workflow. However, they differ from Activities in important ways:

  • A sink function doesn't return any value back to the Workflow and cannot be awaited.
  • A sink call isn't recorded in the Event History of a Workflow Execution (no timeouts or retries).
  • A sink function always runs on the same Worker that runs the Workflow Execution it's called from.

Declare the sink interface

Explicitly declaring a sink's interface is optional but is useful for ensuring type safety in subsequent steps:

sinks/src/workflows.ts

import { log, proxySinks, Sinks } from '@temporalio/workflow';

export interface AlertSinks extends Sinks {
alerter: {
alert(message: string): void;
};
}

export type MySinks = AlertSinks;

Implement sinks

Implementing sinks is a two-step process.

Implement and inject the Sink function into a Worker

sinks/src/worker.ts

import { InjectedSinks, Worker } from '@temporalio/worker';
import { MySinks } from './workflows';

async function main() {
const sinks: InjectedSinks<MySinks> = {
alerter: {
alert: {
fn(workflowInfo, message) {
console.log('sending SMS alert!', {
workflowId: workflowInfo.workflowId,
workflowRunId: workflowInfo.runId,
message,
});
},
callDuringReplay: false, // The default
},
},
};
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'sinks',
sinks,
});
await worker.run();
console.log('Worker gracefully shutdown');
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
  • Sink function implementations are passed as an object into WorkerOptions.
  • You can specify whether you want the injected function to be called during Workflow replay by setting the callDuringReplay option.

Proxy and call a sink function from a Workflow

sinks/src/workflows.ts

const { alerter } = proxySinks<MySinks>();

export async function sinkWorkflow(): Promise<string> {
log.info('Workflow Execution started');
alerter.alert('alerter: Workflow Execution started');
return 'Hello, Temporal!';
}

Some important features of the InjectedSinkFunction interface:

  • Injected WorkflowInfo argument: The first argument of a Sink function implementation is a workflowInfo object that contains useful metadata.
  • Limited arguments types: The remaining Sink function arguments are copied between the sandbox and the Node.js environment using the structured clone algorithm.
  • No return value: To prevent breaking determinism, Sink functions cannot return values to the Workflow.

Shared logger interface

Instead of explicitly calling proxySinks() to create a logger sink in your Workflow, you can also create a sharedLogger.ts file that handles calling proxySinks() for you.

logger-shared/src/sharedLogger.ts

import { inWorkflowContext, proxySinks } from '@temporalio/workflow';
import logger from './logger';

const sharedLogger = inWorkflowContext()
? proxySinks().defaultWorkerLogger
: logger;
export default sharedLogger;

You can then import sharedLogger.ts from Activities and Workflows.

logger-shared/src/activities/index.ts

import logger from '../sharedLogger';

export async function greet(name: string): Promise<string> {
logger.info('Log from Activity', { name });
return `Hello, ${name}!`;
}

logger-shared/src/workflows/index.ts

import { proxyActivities } from '@temporalio/workflow';
import type * as activities from '../activities';
import logger from '../sharedLogger';

const { greet } = proxyActivities<typeof activities>({
startToCloseTimeout: '5 minutes',
});

export async function logSampleWorkflow(): Promise<void> {
const greeting = await greet('Temporal');
logger.info('Log from Workflow', { greeting });
}

Advanced: Performance considerations and non-blocking Sinks

The injected sink function contributes to the overall Workflow Task processing duration.

  • If you have a long-running sink function, such as one that tries to communicate with external services, you might start seeing Workflow Task timeouts.
  • The effect is multiplied when using callDuringReplay: true and replaying long Workflow histories because the Workflow Task timer starts when the first history page is delivered to the Worker.

How to provide a custom logger

Use a custom logger for logging.

Logging in Workers and Clients

The Worker comes with a default logger, which defaults to log any messages with level INFO and higher to STDERR using console.error. The following log levels are listed in increasing order of severity.

Customizing the default logger

Temporal uses a DefaultLogger that implements the basic interface:

import { DefaultLogger, Runtime } from '@temporalio/worker';

const logger = new DefaultLogger('WARN', ({ level, message }) => {
console.log(`Custom logger: ${level}${message}`);
});
Runtime.install({ logger });

The previous code example sets the default logger to log only messages with level WARN and higher.

Accumulate logs for testing and reporting

import { DefaultLogger, LogEntry } from '@temporalio/worker';

const logs: LogEntry[] = [];
const logger = new DefaultLogger('TRACE', (entry) => logs.push(entry));
log.debug('hey', { a: 1 });
log.info('ho');
log.warn('lets', { a: 1 });
log.error('go');

A common logging use case is logging to a file to be picked up by a collector like the Datadog Agent.

import { Runtime } from '@temporalio/worker';
import winston from 'winston';

const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new transports.File({ filename: '/path/to/worker.log' })],
});
Runtime.install({ logger });

How to use Visibility APIs

The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Cluster.

How to use Search Attributes

The typical method of retrieving a Workflow Execution is by its Workflow Id.

However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.

You can do this with Search Attributes.

  • Default Search Attributes like WorkflowType, StartTime and ExecutionStatus are automatically added to Workflow Executions.
  • Custom Search Attributes can contain their own domain-specific data (like customerId or numItems).

The steps to using custom Search Attributes are:

  • Create a new Search Attribute in your Cluster using tctl search-attribute create or the Cloud UI.
  • Set the value of the Search Attribute for a Workflow Execution:
    • On the Client by including it as an option when starting the Execution.
    • In the Workflow by calling UpsertSearchAttributes.
  • Read the value of the Search Attribute:
    • On the Client by calling DescribeWorkflow.
    • In the Workflow by looking at WorkflowInfo.
  • Query Workflow Executions by the Search Attribute using a List Filter:
    • In tctl.
    • In code by calling ListWorkflowExecutions.

Here is how to query Workflow Executions:

Use WorkflowService.listWorkflowExecutions:

import { Connection } from '@temporalio/client';

const connection = await Connection.connect();
const response = await connection.workflowService.listWorkflowExecutions({
query: `ExecutionStatus = "Running"`,
});

where query is a List Filter.

How to set custom Search Attributes

After you've created custom Search Attributes in your Cluster (using tctl search-attribute createor the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.

Use WorkflowOptions.searchAttributes.

search-attributes/src/client.ts

const handle = await client.workflow.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
],
},
});

const { searchAttributes } = await handle.describe();

The type of searchAttributes is Record<string, string[] | number[] | boolean[] | Date[]>.

How to upsert Search Attributes

You can upsert Search Attributes to add or update Search Attributes from within Workflow code.

Inside a Workflow, we can read from WorkflowInfo.searchAttributes and call upsertSearchAttributes:

search-attributes/src/workflows.ts

export async function example(): Promise<SearchAttributes> {
const customInt =
(workflowInfo().searchAttributes.CustomIntField?.[0] as number) || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],

// delete the existing CustomBoolField: [true]
CustomBoolField: [],

// add a new value
CustomDoubleField: [3.14],
});
return workflowInfo().searchAttributes;
}

How to remove a Search Attribute from a Workflow

To remove a Search Attribute that was previously set, set it to an empty array: [].

import { upsertSearchAttributes } from '@temporalio/workflow';

async function yourWorkflow() {
upsertSearchAttributes({ CustomIntField: [1, 2, 3] });

// ... later, to remove:
upsertSearchAttributes({ CustomIntField: [] });
}