Structured Outputs with Temporal and OpenAI
Last updated Oct 3, 2025
The OpenAI Responses API provides the Structured Outputs API allowing you to request responses conforming to a specific data structure.
In this example, we use structured outputs in a business data cleaning scenario. Structured outputs are also commonly used for tool calling.
OpenAI usually returns the correct type. However, this is not always the case due to the non-deterministic nature of LLMs. When OpenAI returns an incorrect type, Temporal automatically retries the LLM call Activity.
Invoke Model Activity
We create a model-calling Activity that uses the responses.parse
method of the OpenAI client.
Key challenges are related to serialization:
- In
InvokeModelRequest
theresponse_format
field is a class reference. We provide custom Pydantic serialization and deserialization logic. - In
InvokeModelResponse
theresponse_model
must be deserialized to the correct type. We serialize the type in one field and the model, represented as a dictionary, in another.
from temporalio import activity
from openai import AsyncOpenAI
from typing import Optional, List, cast, Any, TypeVar, Generic
from typing_extensions import Annotated
from pydantic import BaseModel
from pydantic.functional_validators import BeforeValidator
from pydantic.functional_serializers import PlainSerializer
import importlib
T = TypeVar("T", bound=BaseModel)
def _coerce_class(v: Any) -> type[Any]:
"""Pydantic validator: convert string path to class during deserialization."""
if isinstance(v, str):
mod_path, sep, qual = v.partition(":")
if not sep: # support "package.module.Class"
mod_path, _, qual = v.rpartition(".")
module = importlib.import_module(mod_path)
obj = module
for attr in qual.split("."):
obj = getattr(obj, attr)
return cast(type[Any], obj)
elif isinstance(v, type):
return v
else:
raise ValueError(f"Cannot coerce {v} to class")
def _dump_class(t: type[Any]) -> str:
"""Pydantic serializer: convert class to string path during serialization."""
return f"{t.__module__}:{t.__qualname__}"
# Custom type that automatically handles class <-> string conversion in Pydantic serialization
ClassReference = Annotated[
type[T],
BeforeValidator(_coerce_class),
PlainSerializer(_dump_class, return_type=str),
]
class InvokeModelRequest(BaseModel, Generic[T]):
model: str
instructions: str
input: str
response_format: Optional[ClassReference[T]] = None
tools: Optional[List[dict]] = None
class InvokeModelResponse(BaseModel, Generic[T]):
# response_format records the type of the response model
response_format: Optional[ClassReference[T]] = None
response_model: Any
@property
def response(self) -> T:
"""Reconstruct the original response type if response_format was provided."""
if self.response_format:
model_cls = self.response_format
return model_cls.model_validate(self.response_model)
return self.response_model
@activity.defn
async def invoke_model(request: InvokeModelRequest[T]) -> InvokeModelResponse[T]:
client = AsyncOpenAI(max_retries=0)
kwargs = {
"model": request.model,
"instructions": request.instructions,
"input": request.input,
}
if request.response_format:
kwargs["text_format"] = request.response_format
if request.tools:
kwargs["tools"] = request.tools
# Use responses API consistently
resp = await client.responses.parse(**kwargs)
if request.response_format:
# Convert structured response to dict for managed serialization.
# This allows us to reconstruct the original response type while maintaining type safety.
parsed_model = cast(BaseModel, resp.output_parsed)
return InvokeModelResponse(
response_model=parsed_model.model_dump(),
response_format=request.response_format,
)
else:
return InvokeModelResponse(
response_model=resp.output_text, response_format=None
)
Workflow
We define the Business
class as a Pydantic model.
We use the Pydantic's EmailStr
type for the email field.
For the phone field, we use a custom validator to ensure the phone number is in E.164 format.
The validators should check for obvious structural errors that LLMs will only get wrong sporadically. If the LLM produces invalid responses consistently, Activity retries will fail consistently. To mitigate the cost of such futile retries, we limit the number of retry attempts when using structured outputs.
from pydantic import BaseModel, Field, field_validator, EmailStr
from pydantic_core import PydanticCustomError
import re
from temporalio import workflow
from activities import invoke_model
from activities.invoke_model import InvokeModelRequest
from typing import List, Optional
from datetime import timedelta
from temporalio.common import RetryPolicy
class Business(BaseModel):
name: Optional[str] = Field(
None,
description="The business name",
json_schema_extra={"example": "Acme Corporation"},
)
email: Optional[EmailStr] = Field(
None,
description="Primary business email address",
json_schema_extra={"example": "info@acmecorp.com"},
)
phone: Optional[str] = Field(
None,
description="Primary business phone number in E.164 format",
json_schema_extra={"example": "+12025550173"},
)
address: Optional[str] = Field(
None,
description="Business mailing address",
json_schema_extra={
"example": "123 Business Park Dr, Suite 100, New York, NY 10001"
},
)
website: Optional[str] = Field(
None,
description="Business website URL",
json_schema_extra={"example": "https://www.acmecorp.com"},
)
industry: Optional[str] = Field(
None,
description="Business industry or sector",
json_schema_extra={"example": "Technology"},
)
@field_validator("phone", mode="before")
def validate_phone(cls, v):
# Allow None values
if v is None:
return None
if isinstance(v, str):
v = v.strip()
# Allow empty strings to be converted to None for optional fields
if not v:
return None
# E.164 format: + followed by 1-9, then 9-15 more digits
e164_pattern = r"^\+[1-9]\d{9,15}$"
if not re.match(e164_pattern, v):
raise PydanticCustomError(
"phone_format",
"Phone number must be in E.164 format (e.g., +12025550173)",
{"invalid_phone": v},
)
return v
@field_validator("name", mode="before")
def validate_name(cls, v):
# Allow None values
if v is None:
return None
if isinstance(v, str):
v = v.strip()
# Convert empty strings to None (this is acceptable)
if not v:
return None
return v
class BusinessList(BaseModel):
businesses: List[Business]
@workflow.defn
class CleanDataWorkflow:
@workflow.run
async def run(self, data: str) -> BusinessList:
results = await workflow.execute_activity(
invoke_model.invoke_model,
InvokeModelRequest(
model="gpt-4o",
instructions=f"""Extract and clean business data with these specific rules:
1. BUSINESS NAME: Extract the main business name, normalize capitalization (Title Case for proper nouns)
2. EMAIL:
- Extract only ONE primary email address
- If multiple emails, choose the one marked as "primary" or the first valid one
- Validate format (must have @ and valid domain with .)
- Set to null if invalid (e.g., "bob@email", "NONE PROVIDED")
3. PHONE:
- Convert to E.164 format (+1 prefix for US numbers, add if not provided)
- Convert letters to numbers where appropriate (e.g., "1-800-FLOWERS" → "+18003569377")
- Set to null if cannot be converted to valid E.164 format
- Examples: "(555) 123-4567" → "+15551234567", "555 234 5678 ext 349i" → null (invalid), "5551234567" → "+15551234567"
4. ADDRESS:
- Provide complete, standardized address
- Set to null if vague/incomplete (e.g., "north end of main st", "unknown", "[PRIVATE]")
5. WEBSITE:
- Standardize to https:// format
- Remove "www." prefix, add https:// if missing
- Set to null if broken/invalid (e.g., "broken-link.com/404", "down for maintenance")
6. INDUSTRY:
- Use clear, professional industry categories
- Normalize similar terms (e.g., "fix cars and trucks" → "Automotive Repair")
Return null for any field that cannot be reliably extracted or validated.""",
input=data,
response_format=BusinessList,
),
start_to_close_timeout=timedelta(seconds=300),
retry_policy=RetryPolicy(
maximum_attempts=3,
),
summary="Clean data",
)
return results.response
Running
Start the Temporal Dev Server:
temporal server start-dev
Run the worker:
uv run python -m worker
Start execution:
uv run python -m start_workflow