Skip to main content

Google ADK integration

View Markdown

Temporal's Google ADK integration lets you run Google ADK agents inside Temporal Workflows, so an agent keeps its place across Worker restarts, deploys, and transient failures.

Temporal gives your agent code Durable Execution. The Google ADK gives you the agent itself: model calls, tools, multi-agent handoffs, and MCP. The integration connects the two so that you write an ordinary ADK agent and run it as a Workflow, without managing sessions, a database, or your own retry logic.

The GoogleAdkPlugin is what ties them together. It runs each model call and tool call as a Temporal Activity, configures the payload converter that serializes ADK objects, and makes ADK's runtime deterministic so the Workflow can replay. You add the same plugin to your Client and your Worker, and the rest of your agent code stays standard ADK.

Prerequisites

Install

Make sure you have the Temporal Python SDK (requires version 1.28.0 or later). Then install the google-adk:

uv add "temporalio[google-adk]>=1.28.0"

If you use pip:

pip install "temporalio[google-adk]>=1.28.0"

Define an agent in a Workflow

Write your agent the way you normally would with the ADK. The one Temporal-specific piece is TemporalModel, which you pass in place of a model name. It runs each model call as an invoke_model Activity, so every turn is durable and shows up in the Event history.

google_adk_agents/basic/workflows/hello_world_workflow.py

@workflow.defn
class HelloWorldAgentWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
# TemporalModel runs each model call as an `invoke_model` activity.
agent = Agent(
name="hello_world_agent",
model=TemporalModel("gemini-2.5-flash"),
instruction="You only respond in haikus.",
)

# The plugin points ADK's session-id generation at workflow.uuid4(), so
# creating a session here is replay-safe.
runner = InMemoryRunner(agent=agent, app_name="hello_world_app")
session = await runner.session_service.create_session(
app_name="hello_world_app", user_id="user"
)

final_text = ""
async with Aclosing(
runner.run_async(
user_id="user",
session_id=session.id,
new_message=types.Content(role="user", parts=[types.Part(text=prompt)]),
)
) as agen:
async for event in agen:
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
final_text = part.text

return final_text


Everything other than TemporalModel is normal ADK. You build an Agent, drive it with a runner, and read the events it produces.

Add the plugin to your Worker and Client

Build one GoogleAdkPlugin and pass the same instance to both the Client and the Worker. The Client side links the code that starts a Workflow to the Workflow itself, and the Worker side runs the agent.

google_adk_agents/basic/run_worker.py

plugin = GoogleAdkPlugin()

client = await Client.connect("localhost:7233", plugins=[plugin])

worker = Worker(
client,
task_queue="google-adk-agents-basic",
workflows=[HelloWorldAgentWorkflow],
plugins=[plugin],
)
await worker.run()

Model calls run as Activities on the Worker, so the Worker process is the one that needs your model provider credentials. The samples use Gemini, which reads its key from the GOOGLE_API_KEY environment variable.

export GOOGLE_API_KEY="your-api-key"
python run_worker.py

The ADK supports other model providers as well, for example non-Gemini models through LiteLLM. Change the model name you pass to TemporalModel to use one.

Start the Workflow the way you would start any other Temporal Workflow. Use a Client that has the plugin so the starting code is linked to the run.

google_adk_agents/basic/run_hello_world_workflow.py

client = await Client.connect("localhost:7233", plugins=[GoogleAdkPlugin()])

result = await client.execute_workflow(
HelloWorldAgentWorkflow.run,
"Tell me about recursion in programming.",
id="google-adk-agents-basic-workflow-id",
task_queue="google-adk-agents-basic",
)
print(f"Result: {result}")

Run tools as Activities

To expose a Temporal Activity as a tool the agent can call, wrap it with activity_tool. When the model calls the tool, it runs as its own Activity, so it gets its own retries and timeouts and appears in the Event history.

google_adk_agents/tools/workflows/weather_workflow.py

@workflow.defn
class WeatherAgentWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
# activity_tool runs the tool call as a real Temporal activity, so it's
# retryable and shows up in history.
weather_tool = temporalio.contrib.google_adk_agents.workflow.activity_tool(
get_weather, start_to_close_timeout=timedelta(seconds=60)
)

agent = Agent(
name="weather_agent",
model=TemporalModel("gemini-2.5-flash"),
instruction="Use the get_weather tool to answer weather questions.",
tools=[weather_tool],
)

get_weather is a plain Temporal Activity. Register it on the Worker alongside the Workflow.

google_adk_agents/tools/run_worker.py

plugin = GoogleAdkPlugin()

client = await Client.connect("localhost:7233", plugins=[plugin])

worker = Worker(
client,
task_queue="google-adk-agents-tools",
workflows=[WeatherAgentWorkflow],
activities=[get_weather],
plugins=[plugin],
)
await worker.run()

Coordinate multiple agents

The ADK's multi-agent patterns work inside a Workflow. Give each agent its own TemporalModel, and pass an ActivityConfig when you want to name its model turns or set per-agent timeouts. A coordinator delegates to its sub_agents using the ADK's built-in handoff.

google_adk_agents/agent_patterns/workflows/multi_agent_workflow.py

@workflow.defn
class MultiAgentWorkflow:
@workflow.run
async def run(self, topic: str) -> str:
session_service = InMemorySessionService()
session = await session_service.create_session(
app_name="multi_agent_app", user_id="user"
)

# The ActivityConfig summary makes each model turn a named activity in
# history.
researcher = LlmAgent(
name="researcher",
model=TemporalModel(
"gemini-2.5-flash",
activity_config=ActivityConfig(summary="Researcher Agent"),
),
instruction="You are a researcher. Find information about the topic.",
)

writer = LlmAgent(
name="writer",
model=TemporalModel(
"gemini-2.5-flash",
activity_config=ActivityConfig(summary="Writer Agent"),
),
instruction="You are a poet. Write a haiku based on the research.",
)

# ADK's transfer_to_agent handoff runs durably here.
coordinator = LlmAgent(
name="coordinator",
model=TemporalModel(
"gemini-2.5-flash",
activity_config=ActivityConfig(
start_to_close_timeout=timedelta(seconds=30),
summary="Coordinator Agent",
),
),
instruction="You are a coordinator. Delegate to researcher then writer.",
sub_agents=[researcher, writer],
)

The summary you set on each ActivityConfig becomes the Activity name in the Event history, which makes it easy to see which agent ran and when.

Use MCP tool servers

To give an agent tools from an MCP server, use TemporalMcpToolSet. It runs the server's list-tools and call-tool operations as Activities because connecting to an MCP server is external I/O. You define a factory function for the toolset and register it with the plugin through a TemporalMcpToolSetProvider.

google_adk_agents/mcp/run_worker.py

plugin = GoogleAdkPlugin(
toolset_providers=[TemporalMcpToolSetProvider("echo", echo_toolset)]
)

In the Workflow, give the agent a TemporalMcpToolSet with the same name. The not_in_workflow_toolset factory lets you run the same agent locally, outside Temporal, by connecting to the MCP server directly.

google_adk_agents/mcp/workflows/echo_workflow.py

@workflow.defn
class EchoMcpWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
# TemporalMcpToolSet runs the MCP server's list-tools and call-tool
# calls as activities.
agent = Agent(
name="echo_agent",
model=TemporalModel("gemini-2.5-flash"),
instruction="Use the echo tool to echo back the user's message.",
tools=[TemporalMcpToolSet("echo", not_in_workflow_toolset=echo_toolset)],
)

Stream model output

TemporalModel can stream a model's output as it is generated, using a Workflow stream from temporalio.contrib.workflow_streams. Give the model a streaming_topic, host a WorkflowStream on the Workflow, and the streaming model call publishes each chunk to the topic as it arrives. A Client subscribes to the topic to read chunks while the Workflow is still running.

caution

Streaming is an early, experimental part of this integration and is more likely to change than the rest of the API. Refer to the streaming sample for a complete, working example.

google_adk_agents/streaming/workflows/streaming_workflow.py

@workflow.defn
class StreamingAgentWorkflow:
@workflow.init
def __init__(self, prompt: str) -> None:
# The streaming activity publishes LlmResponse chunks to this stream as
# they come back from the model.
self.stream = WorkflowStream()

@workflow.run
async def run(self, prompt: str) -> str:
# streaming_mode=SSE routes the call through the invoke_model_streaming
# activity.
model = TemporalModel("gemini-2.5-flash", streaming_topic="responses")
agent = Agent(
name="streaming_agent",
model=model,
instruction="You are a helpful assistant.",
)

runner = InMemoryRunner(agent=agent, app_name="streaming_app")
session = await runner.session_service.create_session(
app_name="streaming_app", user_id="user"
)

final_text = ""
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=types.Content(role="user", parts=[types.Part(text=prompt)]),
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
final_text = part.text

return final_text

Samples

The Google ADK samples cover each pattern in a self-contained, runnable scenario:

  • basic: a single agent with TemporalModel and one model call.
  • tools: a Temporal Activity exposed as a tool with activity_tool.
  • agent_patterns: a coordinator agent that delegates to sub-agents.
  • mcp: MCP tools run as Activities, with a self-contained echo server.
  • streaming: token streaming with WorkflowStream.