Workflow message passing - Go SDK
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming Queries and Updates, and via Signal channels. Temporal Clients use messages to read Workflow state and control its execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Go SDK.
Handle messages
The code that follows is part of a working message passing sample.
Follow these guidelines when writing message handlers:
- Values sent in messages, and the return values of message handlers and the main Workflow function, must be serializable.
- Prefer using a single struct over multiple input parameters. This allows you to add fields without changing the calling signature.
Query handlers
A Query is a synchronous operation that retrieves state from a Workflow Execution:
type Language string
const Chinese Language = "chinese"
const English Language = "english"
const French Language = "french"
const Spanish Language = "spanish"
const Portuguese Language = "portuguese"
const GetLanguagesQuery = "GetLanguages"
type GetLanguagesInput struct {
IncludeUnsupported bool
}
func GreetingWorkflow(ctx workflow.Context) (string, error) {
...
greeting := map[Language]string{English: "Hello", Chinese: "你好,世界"}
err := workflow.SetQueryHandler(ctx, GetLanguagesQuery, func(input GetLanguagesInput) ([]Language, error) {
// 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
if input.IncludeUnsupported {
return []Language{Chinese, English, French, Spanish, Portuguese}, nil
} else {
// Range over map is a nondeterministic operation.
// It is OK to have a non-deterministic operation in a query function.
//workflowcheck:ignore
return maps.Keys(greeting), nil
}
})
...
}
- Use
SetQueryHandler
to set a Query Handler that listens for a Query by name. - The handler must be a function that returns two values, a serializable result and an error.
- You can't perform async operations such as executing an Activity in a Query handler.
Signal Channels
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow. Handle Signal messages by receiving them from their channel:
const ApproveSignal = "approve"
type ApproveInput struct {
Name string
}
func GreetingWorkflow(ctx workflow.Context) error {
logger := workflow.GetLogger(ctx)
approverName := ""
...
// Block until the language is approved
var approveInput ApproveInput
workflow.GetSignalChannel(ctx, ApproveSignal).Receive(ctx, &approveInput)
approverName = approveInput.Name
logger.Info("Received approval", "Approver", approverName)
...
}
- Pass the Signal's name to
GetSignalChannel
to get the Signal Channel that listen for Signals of that type.
Alternatively, you might want the Workflow to proceed and still be capable of handling external Signals.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
workflow.Go(ctx, func(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signal)
})
selector.Select(ctx)
}
})
// You could now submit an activity; any signals will still be received while the activity is pending.
}
In the example above, the Workflow code uses workflow.GetSignalChannel
to open a workflow.Channel
for the Signal type (identified by the Signal name).
- Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost. The batch sliding window sample contains an example:
reportCompletionChannel := workflow.GetSignalChannel(ctx, "ReportCompletion")
// Drain signals async
for {
var recordId int
ok := reportCompletionChannel.ReceiveAsync(&recordId)
if !ok {
break
}
s.recordCompletion(ctx, recordId)
}
Update handlers and validators
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
type Language string
const SetLanguageUpdate = "set-language"
func GreetingWorkflow(ctx workflow.Context) error {
language := English
err = workflow.SetUpdateHandlerWithOptions(ctx, SetLanguageUpdate, func(ctx workflow.Context, newLanguage Language) (Language, error) {
// 👉 An Update handler can mutate the Workflow state and return a value.
var previousLanguage Language
previousLanguage, language = language, newLanguage
return previousLanguage, nil
}, workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, newLanguage Language) error {
if _, ok := greeting[newLanguage]; !ok {
// 👉 In an Update validator you return any error to reject the Update.
return fmt.Errorf("%s unsupported language", newLanguage)
}
return nil
},
})
...
}
-
Register an Update handler for a given name using either workflow.SetUpdateHandler or workflow.SetUpdateHandlerWithOptions.
-
The handler must be a function that accepts a
workflow.Context
as its first parameter. -
The function can return either a serializable value with an error or just an error.
-
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you don't need a validator.
- To set a validator, pass the validator function in the workflow.UpdateHandlerOptions when calling workflow.SetUpdateHandlerWithOptions. The validator must be a function that accepts the same argument types as the handler and returns a single value of type error.
-
Accepting and rejecting Updates with validators:
- To reject an Update you must return an error or panic in the validator.
The Workflow's
WorkflowPanicPolicy
determines how panics are handled inside the Handler function. - Without a validator, Updates are always accepted.
- To reject an Update you must return an error or panic in the validator.
The Workflow's
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into History whether the acceptance was automatic or due to a validator function not throwing an error or panicking. - When a validator throws an error, the Update is rejected and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
workflow.GetCurrentUpdateInfo
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Update handlers can use Activities, Child Workflows, durable workflow.Sleep Timers,
workflow.Await
conditions, and more. See Blocking handlers and Workflow message passing for safe usage guidelines.
Send messages
To send Queries, Signals, or Updates, you call methods on a Temporal Client. To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
Send a Query
Queries are sent from a Temporal Client.
Use Client.QueryWorkflow
or Client.QueryWorkflowWithOptions
.
// ...
supportedLangResult, err := temporalClient.QueryWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.GetLanguagesQuery, message.GetLanguagesInput{IncludeUnsupported: false})
if err != nil {
log.Fatalf("Unable to query workflow: %v", err)
}
var supportedLang []message.Language
err = supportedLangResult.Get(&supportedLang)
if err != nil {
log.Fatalf("Unable to get query result: %v", err)
}
log.Println("Supported languages:", supportedLang)
// ...
-
Sending a Query doesn’t add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signal
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that haven’t closed.
Send a Signal from a Client
Pass in both the Workflow Id and Run Id to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is running receives the Signal.
// ...
err = temporalClient.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), message.ApproveSignal, message.ApproveInput{Name: ""})
if err != nil {
log.Fatalf("Unable to signal workflow: %v", err)
}
// ...
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
Sending a Signal from a Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
Use the Client.SignalWithStartWorkflow
API to start a Workflow Execution (if not already running) and pass it the Signal at the same time.
Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
Send an Update
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
Use the Client.UpdateWorkflow
API to send an Update to a Workflow Execution.
You must provide the Workflow Id, but specifying a Run Id is optional. If you supply only the Workflow Id (and provide an empty string as the Run Id param), the running Workflow Execution receives the Update.
You must provide a WaitForStage
when calling UpdateWorkflow()
.
This parameter controls what stage the update must reach before a handle is returned to the caller. If WaitForStage
is set to WorkflowUpdateStageCompleted
the handle is returned after the update completes; if WaitForStage
is set to WorkflowUpdateStageAccepted
the handle is returned after the Update is accepted (i.e. after the validator has run, if there is a validator).
updateHandle, err := temporalClient.UpdateWorkflow(context.Background(), client.UpdateWorkflowOptions{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateName: message.SetLanguageUpdate,
WaitForStage: client.WorkflowUpdateStageAccepted,
Args: []interface{}{message.Chinese},
})
if err != nil {
log.Fatalf("Unable to update workflow: %v", err)
}
var previousLang message.Language
err = updateHandle.Get(context.Background(), &previousLang)
if err != nil {
log.Fatalf("Unable to get update result: %v", err)
}
Update-With-Start
Sending an Update-With-Start, if there is a running Workflow with the given Workflow Id, it will process the Update. Otherwise, a new Workflow starts and immediately processes the Update.
Use the client.NewUpdateWithStartWorkflowOperation
API to create a new Update Operation. The WorkflowID
option is optional; and it is invalid to set the RunID
option.
Then, use the WithStartOperation
option to attach the Update Operation to the Client.ExecuteWorkflow
API call. Note that not all ExecuteWorkflow
options are allowed for Update-With-Start, for example specifying a CronSchedule
with result in an error. Refer to the API documentation for details.
The ExecuteWorkflow
call will return once an Update result is available; or when the provided context times out. To obtain the Update result, first obtain a WorkflowUpdateHandle
by calling Get
on the Update operation.
Note that an UpdateWithStartWorkflowOperation
can only be executed once. Re-using a previously executed operation returns an error from ExecuteWorkflow
.
updateOperation := client.NewUpdateWithStartWorkflowOperation(
client.UpdateWorkflowOptions{
UpdateName: message.SetLanguageUpdate,
WaitForStage: client.WorkflowUpdateStageCompleted,
})
workflowOptions := client.StartWorkflowOptions{
ID: "some-workflow-id",
TaskQueue: "some-task-queue",
WithStartOperation: updateOperation, // attaches the Update to the Workflow Start
}
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow)
if err != nil {
log.Fatalf("Unable to execute workflow: %v", err)
}
updateHandle, err := updateOperation.Get(context.Background())
if err != nil {
log.Fatalf("Unable to obtain update handle: %v", err)
}
var previousLang message.Language
err = updateHandle.Get(context.Background(), &previousLang)
if err != nil {
log.Fatalf("Unable to obtain update result: %v", err)
}
Message handler patterns
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow, Ensuring your messages are processed exactly once, and this sample demonstrating safe blocking message handling.
Blocking handlers
Signal and Update handlers can block.
This allows you to use Activities, Child Workflows, durable workflow.Sleep Timers, workflow.Await
conditions, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.
It's essential to understand the things that could go wrong in order to use blocking handlers safely. See Workflow message passing for guidance on safe usage of blocking Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code modifies the Update handler from earlier on in this page. The Update handler now makes a blocking call to execute an Activity:
func GreetingWorkflow(ctx workflow.Context) error {
language := English
err = workflow.SetUpdateHandler(ctx, SetLanguageUpdate, func(ctx workflow.Context, newLanguage Language) (Language, error) {
if _, ok := greeting[newLanguage]; !ok {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var greeting string
err := workflow.ExecuteActivity(ctx, CallGreetingService, newLanguage).Get(ctx, &greeting)
if err != nil {
return nil, err
}
greeting[newLanguage] = greeting
}
var previousLanguage Language
previousLanguage, language = language, newLanguage
return previousLanguage, nil
})
...
}
Add blocking wait conditions
Sometimes, blocking Signal or Update handlers need to meet certain conditions before they should continue.
You can use workflow.Await
to prevent the code from proceeding until a condition is true.
You specify the condition by passing a function that returns true
or false
.
This is an important feature that helps you control your handler logic.
Here are three important use cases for Workflow.await
:
- Waiting until a specific Update has arrived.
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
err = workflow.SetUpdateHandler(ctx, "UpdateHandler", func(ctx workflow.Context, input UpdateInput) error {
workflow.Await(ctx, updateUnblockedFunc)
...
})
This is necessary if your Update handlers require something in the main Workflow function to be done first, since an Update handler can execute concurrently with the main Workflow function.
You can also use Workflow.await
anywhere else in the handler to wait for a specific condition to become true.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.
Ensure your handlers finish before the Workflow completes
Workflow.await
can ensure your handler completes before a Workflow finishes.
When your Workflow uses blocking Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity.
The Workflow completing may interrupt the handler before it finishes crucial work and cause client errors when trying to retrieve Update results.
Use workflow.Await
to wait for AllHandlersFinished
to return true
to address this problem and allow your Workflow to end smoothly:
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err = workflow.Await(ctx, func() bool {
return workflow.AllHandlersFinished(ctx)
})
return nil
}
By default, your Worker will log a warning if you allow your Workflow Execution to finish with unfinished Update handler executions.
You can silence these warnings on a per-handler basis by setting UnfinishedPolicy
field on workflow.UpdateHandlerOptions
struct:
err = workflow.SetUpdateHandlerWithOptions(ctx, UpdateHandlerName, UpdateFunc, workflow.UpdateHandlerOptions{
UnfinishedPolicy: workflow.HandlerUnfinishedPolicyAbandon,
})
See Finishing handlers before the Workflow completes for more information.
Use workflow.Mutex
to prevent concurrent handler execution
See Message handler concurrency.
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err := workflow.SetUpdateHandler(ctx, "BadUpdateHandler", func(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result Data
err := workflow.ExecuteActivity(ctx, FetchData, name).Get(ctx, &result)
x = result.x
// 🐛🐛 Bug!! If multiple instances of this handler are executing concurrently, then
// there may be times when the Workflow has self.x from one Activity execution and self.y from another.
err = workflow.Sleep(ctx, time.Second)
if err != nil {
return err
}
y = result.y
})
...
}
Coordinating access with workflow.Mutex
corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
...
err := workflow.SetUpdateHandler(ctx, "SafeUpdateHandler", func(ctx workflow.Context) error {
err := mutex.Lock(ctx)
if err != nil {
return err
}
defer mutex.Unlock()
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result Data
err := workflow.ExecuteActivity(ctx, FetchData, name).Get(ctx, &result)
x = data.x
// ✅ OK: the scheduler may switch now to a different handler execution, or to the main workflow
// method, but no other execution of this handler can run until this execution finishes.
err = workflow.Sleep(ctx, time.Second)
if err != nil {
return err
}
self.y = data.y
})
...
}
Troubleshooting
See Exceptions in message handlers for a non–Go-specific discussion of this topic.
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The Client can't contact the server
-
The Workflow does not exist
Unlike Signals, for Queries and Updates, the Client waits for a response from the Worker. If an issue occurs during the handler execution by the Worker, the Client may receive an exception.
Problems when sending an Update
-
There is no Workflow Worker polling the Task Queue
Your request will be retried by the SDK Client until the calling context is cancelled.
-
Update failed.
Update failures are like Workflow failures. Issues that cause a Workflow failure in the main method also cause Update failures in the Update handler. These might include:
- A failed Child Workflow
- A failed Activity if the activity retries have been set to a finite number
- The Workflow author returning an
error
- A panic in the handler, depending on the
WorkflowPanicPolicy
-
The handler caused the Workflow Task to fail A Workflow Task Failure causes the server to retry Workflow Tasks indefinitely. What happens to your Update request depends on its stage:
- If the request hasn't been accepted by the server, you receive a
FAILED_PRECONDITION
error. - If the request has been accepted, it is durable.
Once the Workflow is healthy again after a code deploy, use a
WorkflowUpdateHandle
to fetch the Update result.
- If the request hasn't been accepted by the server, you receive a
-
The Workflow finished while the Update handler execution was in progress: You'll receive a
ServiceError
"workflow execution already completed"`.This will happen if the Workflow finished while the Update handler execution was in progress, for example because
-
The Workflow was canceled or failed.
-
The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
-
Problems when sending a Query
-
There is no Workflow Worker polling the Task Queue
You'll receive a
ServiceError
on which thestatus
isFAILED_PRECONDITION
. -
Query failed. You'll receive a
QueryFailed
error. Any panic in a Query handler will trigger this error. This differs from Signal and Update, where panics can lead to Workflow Task Failure instead. -
The handler caused the Workflow Task to fail. This would happen, for example, if the Query handler blocks the thread for too long without yielding.