workflow

package
v1.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 8 Imported by: 946

Documentation

Overview

Package workflow contains functions and types used to implement Temporal workflows.

A workflow is an implementation of coordination logic. The Temporal programming framework (aka SDK) allows you to write the workflow coordination logic as simple procedural code that uses standard Go data modeling. The client library takes care of the communication between the worker service and the Temporal service, and ensures state persistence between events even in case of worker failures. Any particular execution is not tied to a particular worker machine. Different steps of the coordination logic can end up executing on different worker instances, with the framework ensuring that necessary state is recreated on the worker executing the step.

In order to facilitate this operational model both the Temporal programming framework and the managed service impose some requirements and restrictions on the implementation of the coordination logic. The details of these requirements and restrictions are described in the "Implementation" section below.

Overview

The sample code below shows a simple implementation of a workflow that executes one activity. The workflow also passes the sole parameter it receives as part of its initialization as a parameter to the activity.

package sample

import (
	"time"

	"go.temporal.io/sdk/workflow"
)

func SimpleWorkflow(ctx workflow.Context, value string) error {
	ao := workflow.ActivityOptions{
		TaskQueue:               "sampleTaskQueue",
		ScheduleToCloseTimeout: time.Second * 60,
		ScheduleToStartTimeout: time.Second * 60,
		StartToCloseTimeout:    time.Second * 60,
		HeartbeatTimeout:       time.Second * 10,
		WaitForCancellation:    false,
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	future := [workflow.ExecuteActivity](ctx, SimpleActivity, value)
	var result string
	if err := future.Get(ctx, &result); err != nil {
		return err
	}
	workflow.GetLogger(ctx).Info(“Done”, “result”, result)
	return nil
}

The following sections describe what is going on in the above code.

Declaration

In the Temporal programming model a workflow is implemented with a function. The function declaration specifies the parameters the workflow accepts as well as any values it might return.

func SimpleWorkflow(ctx workflow.Context, value string) error

The first parameter to the function is ctx workflow.Context. This is a required parameter for all workflow functions and is used by the Temporal client library to pass execution context. Virtually all the client library functions that are callable from the workflow functions require this ctx parameter. This **context** parameter is the same concept as the standard context.Context provided by Go. The only difference between workflow.Context and context.Context is that the Done() function in workflow.Context returns workflow.Channel instead of the standard go chan.

The second string parameter is a custom workflow parameter that can be used to pass in data into the workflow on start. A workflow can have one or more such parameters. All parameters to an workflow function must be serializable, which essentially means that params can’t be channels, functions, variadic, or unsafe pointer.

Since it only declares error as the return value it means that the workflow does not return a value. The error return value is used to indicate an error was encountered during execution and the workflow should be failed.

Implementation

In order to support the synchronous and sequential programming model for the workflow implementation there are certain restrictions and requirements on how the workflow implementation must behave in order to guarantee correctness. The requirements are that:

  • Execution must be deterministic
  • Execution must be idempotent

A simplistic way to think about these requirements is that the workflow code:

  • Can only read and manipulate local state or state received as return values from Temporal client library functions
  • Should really not affect changes in external systems other than through invocation of activities
  • Should interact with time only through the functions provided by the Temporal client library (i.e. workflow.Now(), workflow.Sleep())
  • Should not create and interact with goroutines directly, it should instead use the functions provided by the Temporal client library. (i.e. workflow.Go() instead of go, workflow.Channel instead of chan, workflow.Selector instead of select)
  • Should do all logging via the logger provided by the Temporal client library (i.e. workflow.GetLogger())
  • Should not iterate over maps using range as order of map iteration is randomized

Now that we laid out the ground rules we can take a look at how to implement some common patterns inside workflows.

Special Temporal client library functions and types

The Temporal client library provides a number of functions and types as alternatives to some native Go functions and types. Usage of these replacement functions/types is necessary in order to ensure that the workflow code execution is deterministic and repeatable within an execution context.

Coroutine related constructs:

  • workflow.Go : This is a replacement for the the go statement
  • workflow.Channel : This is a replacement for the native chan type. Temporal provides support for both buffered and unbuffered channels
  • workflow.Selector : This is a replacement for the select statement

Time related functions:

Failing a Workflow

To mark a workflow as failed all that needs to happen is for the workflow function to return an error via the err return value. Returning an error and a result from a workflow are mutually exclusive. If an error is returned from a workflow then any results returned are ignored.

Execute Activity

The primary responsibility of the workflow implementation is to schedule activities for execution. The most straightforward way to do that is via the library method workflow.ExecuteActivity:

ao := workflow.ActivityOptions{
	TaskQueue:               "sampleTaskQueue",
	ScheduleToCloseTimeout: time.Second * 60,
	ScheduleToStartTimeout: time.Second * 60,
	StartToCloseTimeout:    time.Second * 60,
	HeartbeatTimeout:       time.Second * 10,
	WaitForCancellation:    false,
}
ctx = workflow.WithActivityOptions(ctx, ao)

future := [workflow.ExecuteActivity](ctx, SimpleActivity, value)
var result string
if err := future.Get(ctx, &result); err != nil {
	return err
}

Before calling workflow.ExecuteActivity(), ActivityOptions must be configured for the invocation. These are for the most part options to customize various execution timeouts. These options are passed in by creating a child context from the initial context and overwriting the desired values. The child context is then passed into the workflow.ExecuteActivity() call. If multiple activities are sharing the same exact option values then the same context instance can be used when calling workflow.ExecuteActivity().

The first parameter to the call is the required workflow.Context object. This type is an exact copy of context.Context with the Done() method returning workflow.Channel instead of native go chan.

The second parameter is the function that we registered as an activity function. This parameter can also be the a string representing the fully qualified name of the activity function. The benefit of passing in the actual function object is that in that case the framework can validate activity parameters.

The remaining parameters are the parameters to pass to the activity as part of the call. In our example we have a single parameter: **value**. This list of parameters must match the list of parameters declared by the activity function. Like mentioned above the Temporal client library will validate that this is indeed the case.

The method call returns immediately and returns a workflow.Future. This allows for more code to be executed without having to wait for the scheduled activity to complete.

When we are ready to process the results of the activity we call the Get() method on the future object returned. The parameters to this method are the ctx object we passed to the workflow.ExecuteActivity() call and an output parameter that will receive the output of the activity. The type of the output parameter must match the type of the return value declared by the activity function. The Get() method will block until the activity completes and results are available.

The result value returned by workflow.ExecuteActivity() can be retrieved from the future and used like any normal result from a synchronous function call. If the result above is a string value we could use it as follows:

var result string
if err := future.Get(ctx1, &result); err != nil {
	return err
}

switch result {
case “apple”:
	// do something
case “bannana”:
	// do something
default:
	return err
}

In the example above we called the Get() method on the returned future immediately after workflow.ExecuteActivity(). However, this is not necessary. If we wish to execute multiple activities in parallel we can repeatedly call workflow.ExecuteActivity() store the futures returned and then wait for all activities to complete by calling the Get() methods of the future at a later time.

To implement more complex wait conditions on the returned future objects, use the workflow.Selector class. Take a look at our Pickfirst sample for an example of how to use of workflow.Selector.

Child Workflow

workflow.ExecuteChildWorkflow enables the scheduling of other workflows from within a workflow's implementation. The parent workflow has the ability to "monitor" and impact the life-cycle of the child workflow in a similar way it can do for an activity it invoked.

cwo := workflow.ChildWorkflowOptions{
	// Do not specify WorkflowID if you want temporal to generate a unique ID for child execution
	WorkflowID:                   "BID-SIMPLE-CHILD-WORKFLOW",
	WorkflowExecutionTimeout: time.Minute * 30,
}
ctx = workflow.WithChildOptions(ctx, cwo)

var result string
future := workflow.ExecuteChildWorkflow(ctx, SimpleChildWorkflow, value)
if err := future.Get(ctx, &result); err != nil {
	workflow.GetLogger(ctx).Error("SimpleChildWorkflow failed.", "Error", err)
	return err
}

Before calling workflow.ExecuteChildWorkflow(), ChildWorkflowOptions must be configured for the invocation. These are for the most part options to customize various execution timeouts. These options are passed in by creating a child context from the initial context and overwriting the desired values. The child context is then passed into the workflow.ExecuteChildWorkflow() call. If multiple activities are sharing the same exact option values then the same context instance can be used when calling workflow.ExecuteChildWorkflow().

The first parameter to the call is the required workflow.Context object. This type is an exact copy of context.Context with the Done() method returning workflow.Channel instead of the native go chan.

The second parameter is the function that we registered as a workflow function. This parameter can also be a string representing the fully qualified name of the workflow function. What's the benefit? When you pass in the actual function object, the framework can validate workflow parameters.

The remaining parameters are the parameters to pass to the workflow as part of the call. In our example we have a single parameter: value. This list of parameters must match the list of parameters declared by the workflow function.

The method call returns immediately and returns a workflow.Future. This allows for more code to be executed without having to wait for the scheduled workflow to complete.

When we are ready to process the results of the workflow we call the Get() method on the future object returned. The parameters to this method are the ctx object we passed to the workflow.ExecuteChildWorkflow() call and an output parameter that will receive the output of the workflow. The type of the output parameter must match the type of the return value declared by the workflow function. The Get() method will block until the workflow completes and results are available.

The workflow.ExecuteChildWorkflow() function is very similar to the workflow.ExecuteActivity() function. All the patterns described for using the workflow.ExecuteActivity() apply to the workflow.ExecuteChildWorkflow() function as well.

Child workflows can also be configured to continue to exist once their parent workflow is closed. When using this pattern, extra care needs to be taken to ensure the child workflow is started before the parent workflow finishes.

cwo := workflow.ChildWorkflowOptions{
	// Do not terminate when parent closes.
	// assumes import enumspb "go.temporal.io/api/enums/v1"
	ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, cwo)

future := workflow.ExecuteChildWorkflow(ctx, SimpleChildWorkflow, value)

// Wait for the child workflow to start
if err := future.GetChildWorkflowExecution().Get(ctx, nil); err != nil {
	// Problem starting workflow.
	return err
}

Error Handling

Activities and child workflows can fail. Activity errors are *temporal.ActivityError and errors during child workflow execution are *temporal.ChildWorkflowExecutionError. The cause of the errors may be types like *temporal.ApplicationError, *temporal.TimeoutError, *temporal.CanceledError, and *temporal.PanicError.

See ExecuteActivity() and ExecuteChildWorkflow() for details.

Signals

Signals provide a mechanism to send data directly to a running workflow. Previously, you had two options for passing data to the workflow implementation:

  • Via start parameters
  • As return values from activities

With start parameters, we could only pass in values before workflow execution begins.

Return values from activities allowed us to pass information to a running workflow, but this approach comes with its own complications. One major drawback is reliance on polling. This means that the data needs to be stored in a third-party location until it's ready to be picked up by the activity. Further, the lifecycle of this activity requires management, and the activity requires manual restart if it fails before acquiring the data.

Signals, on the other hand, provides a fully asynch and durable mechanism for providing data to a running workflow. When a signal is received for a running workflow, Temporal persists the event and the payload in the workflow history. The workflow can then process the signal at any time afterwards without the risk of losing the information. The workflow also has the option to stop execution by blocking on a signal channel.

var signalVal string
signalChan := workflow.GetSignalChannel(ctx, signalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c [workflow.Channel], more bool) {
	c.Receive(ctx, &signalVal)
	workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
})
s.Select(ctx)

if len(signalVal) > 0 && signalVal != "SOME_VALUE" {
	return errors.New("signalVal")
}

In the example above, the workflow code uses workflow.GetSignalChannel to open a workflow.Channel for the named signal. We then use a workflow.Selector to wait on this channel and process the payload received with the signal.

Updates

## Handle Update

Updates provide a fully async and durable mechanism to send data directly to a running workflow and receive a response back. Unlike a Query handler, an update handler has no restriction over normal workflow code so you can modify workflow state, schedule activities, launch child workflow, etc.

counter := param.StartCount
err := workflow.SetUpdateHandler(ctx, YourUpdateName, func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
    counter += arg.Add
    result := YourUpdateResult{
        Total: counter,
    }
    return result, nil
})

For more information see our docs on handling updates

## Validate Updates

Note: This is a feature for advanced users for pre-persistence, read-only validation. Other more advanced validation can and should be done in the handler.

Update validators provide a mechanism to perform read-only validation (i.e. not modify workflow state or schedule any commands). If the update validator returns any error the update will fail and not be written into history.

if err := workflow.SetUpdateHandlerWithOptions(
	ctx,
	FetchAndAdd,
	func(ctx workflow.Context, i int) (int, error) {
		tmp := counter
		counter += i
		return tmp, nil
	},
	workflow.UpdateHandlerOptions{Validator: nonNegative},
); err != nil {
	return 0, err
}

func nonNegative(ctx workflow.Context, i int) error {
	if i < 0 {
		return fmt.Errorf("addend must be non-negative (%v)", i)
	}
	return nil
}

For more information see our docs on validator functions

ContinueAsNew Workflow Completion

Workflows that need to rerun periodically could naively be implemented as a big for loop with a sleep where the entire logic of the workflow is inside the body of the for loop. The problem with this approach is that the history for that workflow will keep growing to a point where it reaches the maximum size enforced by the service.

[ContinueAsNew] is the low level construct that enables implementing such workflows without the risk of failures down the road. The operation atomically completes the current execution and starts a new execution of the workflow with the same workflow ID. The new execution will not carry over any history from the old execution. To trigger this behavior, the workflow function should terminate by returning the special ContinueAsNewError error:

func SimpleWorkflow(workflow.Context ctx, value string) error {
    ...
    return workflow.NewContinueAsNewError(ctx, SimpleWorkflow, value)
}

For a complete example implementing this pattern please refer to our Cron example.

SideEffect API

workflow.SideEffect executes the provided function once, records its result into the workflow history, and doesn't re-execute upon replay. Instead, it returns the recorded result. Use it only for short, nondeterministic code snippets, like getting a random value or generating a UUID. It can be seen as an "inline" activity. However, one thing to note about workflow.SideEffect is that whereas for activities Temporal guarantees "at-most-once" execution, no such guarantee exists for workflow.SideEffect. Under certain failure conditions, workflow.SideEffect can end up executing the function more than once.

The only way to fail SideEffect is to panic, which causes workflow task failure. The workflow task after timeout is rescheduled and re-executed giving SideEffect another chance to succeed. Be careful to not return any data from the SideEffect function any other way than through its recorded return value.

encodedRandom := SideEffect(func(ctx workflow.Context) interface{} {
	return rand.Intn(100)
})

var random int
encodedRandom.Get(&random)
if random < 50 {
	....
} else {
	....
}

Query API

A workflow execution could be stuck at some state for longer than expected period. Temporal provide facilities to query the current call stack of a workflow execution. You can use tctl to do the query, for example:

tctl --namespace samples-namespace workflow query -w my_workflow_id -r my_run_id -qt __stack_trace

The above cli command uses __stack_trace as the query type. The __stack_trace is a built-in query type that is supported by temporal client library. You can also add your own custom query types to support thing like query current state of the workflow, or query how many activities the workflow has completed. To do so, you need to setup your own query handler using workflow.SetQueryHandler in your workflow code:

func MyWorkflow(ctx workflow.Context, input string) error {
   currentState := "started" // this could be any serializable struct
   err := workflow.SetQueryHandler(ctx, "state", func() (string, error) {
	 return currentState, nil
   })
   if err != nil {
	 return err
   }
   // your normal workflow code begins here, and you update the currentState as the code makes progress.
   currentState = "waiting timer"
   err = NewTimer(ctx, time.Hour).Get(ctx, nil)
   if err != nil {
	 currentState = "timer failed"
	 return err
   }
   currentState = "waiting activity"
   ctx = WithActivityOptions(ctx, myActivityOptions)
   err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
   if err != nil {
	 currentState = "activity failed"
	 return err
   }
   currentState = "done"
   return nil
}

The above sample code sets up a query handler to handle query type "state". With that, you should be able to query with cli:

tctl --namespace samples-namespace workflow query -w my_workflow_id -r my_run_id -qt state

Besides using tctl, you can also issue query from code using QueryWorkflow() API on temporal Client object.

Registration

For some client code to be able to invoke a workflow type, the worker process needs to be aware of all the implementations it has access to. A workflow is registered with the following call:

worker.RegisterWorkflow(SimpleWorkflow)

This call essentially creates an in memory mapping inside the worker process between the fully qualified function name and the implementation. If the worker receives tasks for a workflow type it does not know it will fail that task. However, the failure of the task will not cause the entire workflow to fail.

Similarly, we need to have at least one worker that hosts the activity functions:

worker.RegisterActivity(MyActivity)

See the activity package for more details on activity registration.

Testing

The Temporal client library provides a test framework to facilitate testing workflow implementations. The framework is suited for implementing unit tests as well as functional tests of the workflow logic.

The code below implements the unit tests for the SimpleWorkflow sample.

package sample

import (
	"errors"
	"testing"

	"github.com/stretchr/testify/mock"
	"github.com/stretchr/testify/suite"

	"go.temporal.io/sdk/testsuite"
)

type UnitTestSuite struct {
	suite.Suite
	testsuite.WorkflowTestSuite

	env *testsuite.TestWorkflowEnvironment
}

func (s *UnitTestSuite) SetupTest() {
	s.env = s.NewTestWorkflowEnvironment()
}

func (s *UnitTestSuite) AfterTest(suiteName, testName string) {
	s.env.AssertExpectations(s.T())
}

func (s *UnitTestSuite) Test_SimpleWorkflow_Success() {
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_success")

	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

func (s *UnitTestSuite) Test_SimpleWorkflow_ActivityParamCorrect() {
	s.env.OnActivity(SimpleActivity, mock.Anything, mock.Anything).Return(func(ctx context.Context, value string) (string, error) {
		s.Equal("test_success", value)
		return value, nil
	})
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_success")

	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

func (s *UnitTestSuite) Test_SimpleWorkflow_ActivityFails() {
	s.env.OnActivity(SimpleActivity, mock.Anything, mock.Anything).Return("", errors.New("SimpleActivityFailure"))
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_failure")

	s.True(s.env.IsWorkflowCompleted())

	s.NotNil(s.env.GetWorkflowError())
	_, ok := s.env.GetWorkflowError().(*error.GenericError)
	s.True(ok)
	s.Equal("SimpleActivityFailure", s.env.GetWorkflowError().Error())
}

func TestUnitTestSuite(t *testing.T) {
	suite.Run(t, new(UnitTestSuite))
}

Setup

First, we define a "test suite" struct that absorbs both the basic suite functionality from testify via suite.Suite and the suite functionality from the Temporal test framework via [testsuite.WorkflowTestSuite]. Since every test in this suite will test our workflow we add a property to our struct to hold an instance of the test environment. This will allow us to initialize the test environment in a setup method. For testing workflows we use a [testsuite.TestWorkflowEnvironment].

We then implement a SetupTest method to setup a new test environment before each test. Doing so ensure that each test runs in it's own isolated sandbox. We also implement an AfterTest function where we assert that all mocks we setup were indeed called by invoking s.env.AssertExpectations(s.T()).

Finally, we create a regular test function recognized by "go test" and pass the struct to suite.Run.

A Simple Test

The simplest test case we can write is to have the test environment execute the workflow and then evaluate the results.

func (s *UnitTestSuite) Test_SimpleWorkflow_Success() {
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_success")

	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

Calling s.env.ExecuteWorkflow(...) will execute the workflow logic and any invoked activities inside the test process. The first parameter to s.env.ExecuteWorkflow(...) is the workflow functions and any subsequent parameters are values for custom input parameters declared by the workflow function. An important thing to note is that unless the activity invocations are mocked or activity implementation replaced (see next section), the test environment will execute the actual activity code including any calls to outside services.

In the example above, after executing the workflow we assert that the workflow ran through to completion via the call to s.env.IsWorkflowComplete(). We also assert that no errors where returned by asserting on the return value of s.env.GetWorkflowError(). If our workflow returned a value, we we can retrieve that value via a call to s.env.GetWorkflowResult(&value) and add asserts on that value.

Activity Mocking and Overriding

When testing workflows, especially unit testing workflows, we want to test the workflow logic in isolation. Additionally, we want to inject activity errors during our tests runs. The test framework provides two mechanisms that support these scenarios: activity mocking and activity overriding. Both these mechanisms allow you to change the behavior of activities invoked by your workflow without having to modify the actual workflow code.

Lets first take a look at a test that simulates a test failing via the "activity mocking" mechanism.

func (s *UnitTestSuite) Test_SimpleWorkflow_ActivityFails() {
	s.env.OnActivity(SimpleActivity, mock.Anything, mock.Anything).Return("", errors.New("SimpleActivityFailure"))
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_failure")

	s.True(s.env.IsWorkflowCompleted())

	s.NotNil(s.env.GetWorkflowError())
	_, ok := s.env.GetWorkflowError().(*error.GenericError)
	s.True(ok)
	s.Equal("SimpleActivityFailure", s.env.GetWorkflowError().Error())
}

In this test we want to simulate the execution of the activity SimpleActivity invoked by our workflow SimpleWorkflow returning an error. We do that by setting up a mock on the test environment for the SimpleActivity that returns an error.

s.env.OnActivity(SimpleActivity, mock.Anything, mock.Anything).Return("", errors.New("SimpleActivityFailure"))

With the mock set up we can now execute the workflow via the s.env.ExecuteWorkflow(...) method and assert that the workflow completed successfully and returned the expected error.

Simply mocking the execution to return a desired value or error is a pretty powerful mechanism to isolate workflow logic. However, sometimes we want to replace the activity with an alternate implementation to support a more complex test scenario. For our simple workflow lets assume we wanted to validate that the activity gets called with the correct parameters.

func (s *UnitTestSuite) Test_SimpleWorkflow_ActivityParamCorrect() {
	s.env.OnActivity(SimpleActivity, mock.Anything, mock.Anything).Return(func(ctx context.Context, value string) (string, error) {
		s.Equal("test_success", value)
		return value, nil
	})
	s.env.ExecuteWorkflow(SimpleWorkflow, "test_success")

	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

In this example, we provide a function implementation as the parameter to Return. This allows us to provide an alternate implementation for the activity SimpleActivity. The framework will execute this function whenever the activity is invoked and pass on the return value from the function as the result of the activity invocation. Additionally, the framework will validate that the signature of the "mock" function matches the signature of the original activity function.

Since this can be an entire function, there really is no limitation as to what we can do in here. In this example, to assert that the "value" param has the same content to the value param we passed to the workflow.

NOTE: The default MaximumAttempts for retry policy set by server is 0 which means unlimited retries. However, during a unit test the default MaximumAttempts is 10 to avoid a test getting stuck.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSessionFailed is the error returned when user tries to execute an activity but the
	// session it belongs to has already failed
	ErrSessionFailed = internal.ErrSessionFailed

	// SessionStateOpen means the session worker is heartbeating and new activities will be schedule on the session host.
	SessionStateOpen = internal.SessionStateOpen

	// SessionStateClosed means the session was closed by the workflow and new activities will not be scheduled on the session host.
	SessionStateClosed = internal.SessionStateClosed

	// SessionStateFailed means the session worker was detected to be down and the session cannot be used to schedule new activities.
	SessionStateFailed = internal.SessionStateFailed
)
View Source
var ErrCanceled = internal.ErrCanceled

ErrCanceled is the error returned by Context.Err when the context is canceled.

View Source
var ErrDeadlineExceeded = internal.ErrDeadlineExceeded

ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.

Functions

func Await

func Await(ctx Context, condition func() bool) error

Await blocks the calling thread until condition() returns true. Do not mutate values or trigger side effects inside condition. Returns CanceledError if the ctx is canceled. The following code will block until the captured count variable is set to 5:

workflow.Await(ctx, func() bool {
    return count == 5
})

The trigger is evaluated on every workflow state transition. Note that conditions that wait for time can be error-prone as nothing might cause evaluation. For example:

workflow.Await(ctx, func() bool {
    return workflow.Now() > someTime
})

might never return true unless some other event like a Signal or activity completion forces the condition evaluation. For a time-based wait use workflow.AwaitWithTimeout function.

func AwaitWithTimeout

func AwaitWithTimeout(ctx Context, timeout time.Duration, condition func() bool) (ok bool, err error)

AwaitWithTimeout blocks the calling thread until condition() returns true or blocking time exceeds the passed timeout value. Returns ok=false if timed out, and err CanceledError if the ctx is canceled. The following code will block until the captured count variable is set to 5, or one hour passes.

workflow.AwaitWithTimeout(ctx, time.Hour, func() bool {
  return count == 5
})

func CompleteSession

func CompleteSession(ctx Context)

CompleteSession completes a session. It releases worker resources, so other sessions can be created. CompleteSession won't do anything if the context passed in doesn't contain any session information or the session has already completed or failed.

After a session has completed, user can continue to use the context, but the activities will be scheduled on the normal taskQueue (as user specified in ActivityOptions) and may be picked up by another worker since it's not in a session.

Due to internal logic, this call must be made in the same coroutine CreateSession/RecreateSession were called in.

func DataConverterWithoutDeadlockDetection added in v1.15.0

func DataConverterWithoutDeadlockDetection(c converter.DataConverter) converter.DataConverter

DataConverterWithoutDeadlockDetection returns a data converter that disables workflow deadlock detection for each call on the data converter. This should be used for advanced data converters that may perform remote calls or otherwise intentionally execute longer than the default deadlock detection timeout.

func DeterministicKeys added in v1.26.0

func DeterministicKeys[K constraints.Ordered, V any](m map[K]V) []K

DeterministicKeys returns the keys of a map in deterministic (sorted) order. To be used in for loops in workflows for deterministic iteration.

func DeterministicKeysFunc added in v1.26.0

func DeterministicKeysFunc[K comparable, V any](m map[K]V, cmp func(K, K) int) []K

DeterministicKeysFunc returns the keys of a map in a deterministic (sorted) order. cmp(a, b) should return a negative number when a < b, a positive number when a > b and zero when a == b. Keys are sorted by cmp. To be used in for loops in workflows for deterministic iteration.

func GetLastCompletionResult

func GetLastCompletionResult(ctx Context, d ...interface{}) error

GetLastCompletionResult extract last completion result from the last successful run for this cron or schedule workflow. This is used in combination with cron schedule or schedule workflow. A workflow can be started with an optional cron schedule. If a cron workflow wants to pass some data to next schedule, it can return any data and that data will become available when next run starts. This will contain the last successful result even if the most recent run failed. This GetLastCompletionResult() extract the data into expected data structure. See TestWorkflowEnvironment.SetLastCompletionResult() for unit test support.

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func GetLastError added in v1.3.0

func GetLastError(ctx Context) error

GetLastError extracts the error from the last run of this workflow. If the last run of this workflow did not fail or this is the first run, this will be nil. This is used in combination with cron schedule or schedule workflow.

See TestWorkflowEnvironment.SetLastError() for unit test support.

func GetLogger

func GetLogger(ctx Context) log.Logger

GetLogger returns a logger to be used in workflow's context

func GetMetricsHandler added in v1.12.0

func GetMetricsHandler(ctx Context) metrics.Handler

GetMetricsHandler returns a metrics handler to be used in workflow's context. This handler does not record metrics during replay.

func GetTypedSearchAttributes added in v1.26.0

func GetTypedSearchAttributes(ctx Context) temporal.SearchAttributes

GetTypedSearchAttributes returns a collection of the search attributes currently set for this workflow

func GetUnhandledSignalNames added in v1.20.0

func GetUnhandledSignalNames(ctx Context) []string

GetUnhandledSignalNames returns signal names that have unconsumed signals.

func Go

func Go(ctx Context, f func(ctx Context))

Go creates a new coroutine. It has similar semantics to a goroutine, but in the context of the workflow.

func GoNamed

func GoNamed(ctx Context, name string, f func(ctx Context))

GoNamed creates a new coroutine with a given human-readable name. It has similar semantics to a goroutine, but in the context of the workflow. The name appears in stack traces that include this coroutine.

func HasLastCompletionResult

func HasLastCompletionResult(ctx Context) bool

HasLastCompletionResult checks if there is completion result from previous runs. This is used in combination with cron schedule. A workflow can be started with an optional cron schedule. If a cron workflow wants to pass some data to next schedule, it can return any data and that data will become available when next run starts. This HasLastCompletionResult() checks if there is such data available passing down from previous successful run.

func IsContinueAsNewError added in v1.4.0

func IsContinueAsNewError(err error) bool

IsContinueAsNewError return if the err is a ContinueAsNewError

func IsReplaying

func IsReplaying(ctx Context) bool

IsReplaying returns whether the current workflow code is replaying.

Warning! Never make commands, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on this flag as it is going to break workflow determinism requirement. The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or metric reporting. Please note that Temporal already provide standard logging/metric via workflow.GetLogger and workflow.GetMetricsHandler, and those standard mechanism are replay-aware and it will automatically suppress during replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.

Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a workflow causes workflow task to fail and temporal server will rescheduled later to retry.

func MutableSideEffect

func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue

MutableSideEffect executes the provided function once, then it looks up the history for the value with the given id. If there is no existing value, then it records the function result as a value with the given id on history; otherwise, it compares whether the existing value from history has changed from the new function result by calling the provided equals function. If they are equal, it returns the value without recording a new one in history; otherwise, it records the new value with the same id on history.

Caution: do not use MutableSideEffect to modify closures. Always retrieve result from MutableSideEffect's encoded return value.

The difference between MutableSideEffect() and SideEffect() is that every new SideEffect() call in non-replay will result in a new marker being recorded on history. However, MutableSideEffect() only records a new marker if the value changed. During replay, MutableSideEffect() will not execute the function again, but it will return the exact same value as it was returning during the non-replay run.

One good use case of MutableSideEffect() is to access dynamically changing config without breaking determinism.

func NewContinueAsNewError

func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) error

NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.

 ctx - use context to override any options for the new workflow like execution timeout, workflow task timeout, task queue.
	  if not mentioned it would use the defaults that the current workflow is using.
       ctx := WithWorkflowExecutionTimeout(ctx, 30 * time.Minute)
       ctx := WithWorkflowTaskTimeout(ctx, time.Minute)
	  ctx := WithWorkflowTaskQueue(ctx, "example-group")
 wfn - workflow function. for new execution it can be different from the currently running.
 args - arguments for the new workflow.

func NewContinueAsNewErrorWithOptions added in v1.26.0

func NewContinueAsNewErrorWithOptions(ctx Context, options ContinueAsNewErrorOptions, wfn interface{}, args ...interface{}) error

NewContinueAsNewErrorWithOptions creates ContinueAsNewError instance with additional options.

func NewDisconnectedContext

func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc)

NewDisconnectedContext returns a new context that won't propagate parent's cancellation to the new child context. One common use case is to do cleanup work after workflow is canceled.

err := workflow.ExecuteActivity(ctx, ActivityFoo).Get(ctx, &activityFooResult)
if err != nil && temporal.IsCanceledError(ctx.Err()) {
  // activity failed, and workflow context is canceled
  disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx);
  workflow.ExecuteActivity(disconnectedCtx, handleCancellationActivity).Get(disconnectedCtx, nil)
  return err // workflow return CanceledError
}

func NewFuture

func NewFuture(ctx Context) (Future, Settable)

NewFuture creates a new future as well as an associated Settable that is used to set its value.

func Now

func Now(ctx Context) time.Time

Now returns the time when the workflow task was first started, even during replay. Workflows must use this Now() to get the wall clock time, instead of Go's time.Now().

func SetQueryHandler

func SetQueryHandler(ctx Context, queryType string, handler interface{}) error

SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler should handle. The handler must be a function that returns 2 values. The first return value must be a serializable result. The second return value must be an error. The handler function could receive any number of input parameters. All the input parameter must be serializable. You should call workflow.SetQueryHandler() at the beginning of the workflow code. When client calls Client.QueryWorkflow() to temporal server, a task will be generated on server that will be dispatched to a workflow worker, which will replay the history events and then execute a query handler based on the query type. The query handler will be invoked out of the context of the workflow, meaning that the handler code must not use workflow context to do things like workflow.NewChannel(), workflow.Go() or to call any workflow blocking functions like Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive QueryFailedError. Example of workflow code that support query type "current_state":

func MyWorkflow(ctx workflow.Context, input string) error {
  currentState := "started" // this could be any serializable struct
  err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) {
    return currentState, nil
  })
  if err != nil {
    currentState = "failed to register query handler"
    return err
  }
  // your normal workflow code begins here, and you update the currentState as the code makes progress.
  currentState = "waiting timer"
  err = NewTimer(ctx, time.Hour).Get(ctx, nil)
  if err != nil {
    currentState = "timer failed"
    return err
  }

  currentState = "waiting activity"
  ctx = WithActivityOptions(ctx, myActivityOptions)
  err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
  if err != nil {
    currentState = "activity failed"
    return err
  }
  currentState = "done"
  return nil
}

func SetUpdateHandler added in v1.20.0

func SetUpdateHandler(ctx Context, updateName string, handler interface{}) error

SetUpdateHandler forwards to SetUpdateHandlerWithOptions with an zero-initialized UpdateHandlerOptions struct. See SetUpdateHandlerWithOptions for more details.

NOTE: Experimental

func SetUpdateHandlerWithOptions added in v1.20.0

func SetUpdateHandlerWithOptions(ctx Context, updateName string, handler interface{}, opts UpdateHandlerOptions) error

SetUpdateHandlerWithOptions binds an update handler function to the specified name such that update invocations specifying that name will invoke the handler. The handler function can take as input any number of parameters so long as they can be serialized/deserialized by the system. The handler can take a workflow.Context as its first parameter but this is not required. The update handler must return either a single error or a single serializable object along with a single error. The update handler function is invoked in the context of the workflow and thus is subject to the same restrictions as workflow code, namely, the update handler must be deterministic. As with other workflow code, update code is free to invoke and wait on the results of activities. Update handler code is free to mutate workflow state.

This registration can optionally specify (through UpdateHandlerOptions) an update validation function. If provided, this function will be invoked before the update handler itself is invoked and if this function returns an error, the update request will be considered to have been rejected and as such will not occupy any space in the workflow history. Validation functions must take as inputs the same parameters as the associated update handler but may vary from said handler by the presence/absence of a workflow.Context as the first parameter. Validation handlers must only return a single error. Validation handlers must be deterministic and can observe workflow state but must not mutate workflow state in any way.

Example of workflow code that supports a monotonic counter

func MyWorkflow(ctx workflow.Context) (int, error) {
	counter := 0
	err := workflow.SetUpdateHandlerWithOptions(
		ctx,
		"add",
		func(val int) (int, error) { // Calls
			counter += val // note that this mutates workflow state
			return counter, nil
		},
		UpdateHandlerOptions{
			Validator: func(val int) error {
				if val < 0 { // reject attempts to add negative values
					return fmt.Errorf("invalid addend: %v", val)
				}
				return nil
			},
		})
	if err != nil {
		return 0, err
	}
	_ = ctx.Done().Receive(ctx, nil)
	return counter, nil
}

NOTE: Experimental

func SideEffect

func SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue

SideEffect executes the provided function once, records its result into the workflow history. The recorded result on history will be returned without executing the provided function during replay. This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay. Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID. The only way to fail SideEffect is to panic which causes workflow task failure. The workflow task after timeout is rescheduled and re-executed giving SideEffect another chance to succeed.

Caution: do not use SideEffect to modify closures. Always retrieve result from SideEffect's encoded return value. For example this code is BROKEN:

// Bad example:
var random int
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
       random = rand.Intn(100)
       return nil
})
// random will always be 0 in replay, thus this code is non-deterministic
if random < 50 {
       ....
} else {
       ....
}

On replay the provided function is not executed, the random will always be 0, and the workflow could takes a different path breaking the determinism.

Here is the correct way to use SideEffect:

// Good example:
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
      return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
if random < 50 {
       ....
} else {
       ....
}

func Sleep

func Sleep(ctx Context, d time.Duration) (err error)

Sleep pauses the current workflow for at least the duration d. A negative or zero duration causes Sleep to return immediately. Workflow code must use this Sleep() to sleep, instead of Go's timer.Sleep(). You can cancel the pending sleep by canceling the Context (using the context from workflow.WithCancel(ctx)). Sleep() returns nil if the duration d is passed, or *CanceledError if the ctx is canceled. There are two reasons the ctx might be canceled: 1) your workflow code canceled the ctx (with workflow.WithCancel(ctx)); 2) your workflow itself was canceled by external request.

func UpsertMemo added in v1.17.0

func UpsertMemo(ctx Context, memo map[string]interface{}) error

UpsertMemo is used to add or update workflow memo. UpsertMemo will merge keys to the existing map in workflow. For example:

func MyWorkflow(ctx workflow.Context, input string) error {
	memo1 := map[string]interface{}{
		"Key1": 1,
		"Key2": true,
	}
	workflow.UpsertMemo(ctx, memo1)

	memo2 := map[string]interface{}{
		"Key1": 2,
		"Key3": "seattle",
	}
	workflow.UpsertMemo(ctx, memo2)
}

The workflow memo will eventually be:

map[string]interface{}{
	"Key1": 2,
	"Key2": true,
	"Key3": "seattle",
}

This is only supported with Temporal Server 1.18+

func UpsertSearchAttributes deprecated

func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error

UpsertSearchAttributes is used to add or update workflow search attributes. The search attributes can be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side; The value has to be Json serializable. UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code:

  func MyWorkflow(ctx workflow.Context, input string) error {
	   attr1 := map[string]interface{}{
		   "CustomIntField": 1,
		   "CustomBoolField": true,
	   }
	   workflow.UpsertSearchAttributes(ctx, attr1)

	   attr2 := map[string]interface{}{
		   "CustomIntField": 2,
		   "CustomKeywordField": "seattle",
	   }
	   workflow.UpsertSearchAttributes(ctx, attr2)
  }

will eventually have search attributes:

map[string]interface{}{
	"CustomIntField": 2,
	"CustomBoolField": true,
	"CustomKeywordField": "seattle",
}

For supported operations on different server versions see Visibility.

Deprecated: use UpsertTypedSearchAttributes instead.

func UpsertTypedSearchAttributes added in v1.26.0

func UpsertTypedSearchAttributes(ctx Context, searchAttributeUpdate ...temporal.SearchAttributeUpdate) error

UpsertTypedSearchAttributes is used to add, update, or remove workflow search attributes. The search attributes can be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side. UpsertTypedSearchAttributes will merge attributes to existing map in workflow, for example workflow code:

var intKey = temporal.NewSearchAttributeKeyInt64("CustomIntField")
var boolKey = temporal.NewSearchAttributeKeyBool("CustomBoolField")
var keywordKey = temporal.NewSearchAttributeKeyBool("CustomKeywordField")

func MyWorkflow(ctx workflow.Context, input string) error {
	err = workflow.UpsertTypedSearchAttributes(ctx, intAttrKey.ValueSet(1), boolAttrKey.ValueSet(true))
	// ...

	err = workflow.UpsertSearchAttributes(ctx, intKey.ValueSet(2), keywordKey.ValueUnset())
	// ...
}

For supported operations on different server versions see Visibility.

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

Types

type ActivityOptions

type ActivityOptions = internal.ActivityOptions

ActivityOptions stores all activity-specific invocation parameters that will be stored inside of a context.

func GetActivityOptions added in v1.6.0

func GetActivityOptions(ctx Context) ActivityOptions

GetActivityOptions returns all activity options present on the context.

type CancelFunc

type CancelFunc = internal.CancelFunc

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.

type Channel

type Channel = internal.Channel

Channel must be used instead of a native go channel by workflow code. Use workflow.NewChannel to create a Channel instance. Channel extends both ReceiveChannel and SendChannel. Prefer using one of these interfaces to share a Channel with consumers or producers.

func NewBufferedChannel

func NewBufferedChannel(ctx Context, size int) Channel

NewBufferedChannel creates a new buffered Channel instance

func NewChannel

func NewChannel(ctx Context) Channel

NewChannel creates a new Channel instance

func NewNamedBufferedChannel

func NewNamedBufferedChannel(ctx Context, name string, size int) Channel

NewNamedBufferedChannel creates a new BufferedChannel instance with a given human-readable name. The name appears in stack traces that are blocked on this Channel.

func NewNamedChannel

func NewNamedChannel(ctx Context, name string) Channel

NewNamedChannel creates a new Channel instance with a given human-readable name. The name appears in stack traces that are blocked on this channel.

type ChildWorkflowFuture

type ChildWorkflowFuture = internal.ChildWorkflowFuture

ChildWorkflowFuture represents the result of a child workflow execution

func ExecuteChildWorkflow

func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interface{}) ChildWorkflowFuture

ExecuteChildWorkflow requests child workflow execution in the context of a workflow. Context can be used to pass the settings for the child workflow. For example: task queue that this child workflow should be routed, timeouts that need to be configured. Use ChildWorkflowOptions to pass down the options.

 cwo := ChildWorkflowOptions{
	    WorkflowExecutionTimeout: 10 * time.Minute,
	    WorkflowTaskTimeout: time.Minute,
	}
 ctx := WithChildOptions(ctx, cwo)

Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. Input args are the arguments that need to be passed to the child workflow function represented by childWorkflow.

If the child workflow failed to complete then the future get error would indicate the failure. The error will be of type *ChildWorkflowExecutionError. It will have important child workflow information and actual error that caused child workflow failure. Use errors.Unwrap to get this error or errors.As to check it type which can be one of *ApplicationError, *TimeoutError, or *CanceledError.

You can cancel the pending child workflow using context(workflow.WithCancel(ctx)) and that will fail the workflow with *CanceledError set as cause for *ChildWorkflowExecutionError.

ExecuteChildWorkflow returns ChildWorkflowFuture.

type ChildWorkflowOptions

type ChildWorkflowOptions = internal.ChildWorkflowOptions

ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context.

func GetChildWorkflowOptions added in v1.6.0

func GetChildWorkflowOptions(ctx Context) ChildWorkflowOptions

GetChildWorkflowOptions returns all workflow options present on the context.

type Context

type Context = internal.Context

Context is a clone of context.Context with Done() returning Channel instead of native channel. A Context carries a deadline, a cancellation signal, and other values across API boundaries.

Context's methods may be called by multiple goroutines simultaneously.

func CreateSession

func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error)

CreateSession creates a session and returns a new context which contains information of the created session. The session will be created on the taskqueue user specified in ActivityOptions. If none is specified, the default one will be used.

CreationSession will fail in the following situations:

  1. The context passed in already contains a session which is still open (not closed and failed).
  2. All the workers are busy (number of sessions currently running on all the workers have reached MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session cannot be created within a specified timeout.

If an activity is executed using the returned context, it's regarded as part of the session. All activities within the same session will be executed by the same worker. User still needs to handle the error returned when executing an activity. Session will not be marked as failed if an activity within it returns an error. Only when the worker executing the session is down, that session will be marked as failed. Executing an activity within a failed session will return ErrSessionFailed immediately without scheduling that activity.

The returned session Context will be canceled if the session fails (worker died) or CompleteSession() is called. This means that in these two cases, all user activities scheduled using the returned session Context will also be canceled.

If user wants to end a session since activity returns some error, use CompleteSession API below. New session can be created if necessary to retry the whole session.

Example:

   so := &SessionOptions{
	      ExecutionTimeout: time.Minute,
	      CreationTimeout:  time.Minute,
   }
   sessionCtx, err := CreateSession(ctx, so)
   if err != nil {
		    // Creation failed. Wrong ctx or too many outstanding sessions.
   }
   defer CompleteSession(sessionCtx)
   err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil)
   if err == ErrSessionFailed {
       // Session has failed
   } else {
       // Handle activity error
   }
   ... // execute more activities using sessionCtx

NOTE: Session recreation via RecreateSession may not work properly across worker fail/crash before Temporal server version v1.15.1.

func RecreateSession

func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionOptions) (Context, error)

RecreateSession recreate a session based on the sessionInfo passed in. Activities executed within the recreated session will be executed by the same worker as the previous session. RecreateSession() returns an error under the same situation as CreateSession() or the token passed in is invalid. It also has the same usage as CreateSession().

The main usage of RecreateSession is for long sessions that are splited into multiple runs. At the end of one run, complete the current session, get recreateToken from sessionInfo by calling SessionInfo.GetRecreateToken() and pass the token to the next run. In the new run, session can be recreated using that token.

NOTE: Session recreation via RecreateSession may not work properly across worker fail/crash before Temporal server version v1.15.1.

func WithActivityOptions

func WithActivityOptions(ctx Context, options ActivityOptions) Context

WithActivityOptions makes a copy of the context and adds the passed in options to the context. If an activity options exists, it will be overwritten by the passed in value as a whole. So specify all the values in the options as necessary, as values in the existing context options will not be carried over.

func WithChildOptions

func WithChildOptions(ctx Context, cwo ChildWorkflowOptions) Context

WithChildOptions adds all workflow options to the context.

func WithDataConverter

func WithDataConverter(ctx Context, dc converter.DataConverter) Context

WithDataConverter adds DataConverter to the context.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(ctx Context, d time.Duration) Context

WithHeartbeatTimeout makes a copy of the current context and update the HeartbeatTimeout field in its activity options. An empty activity options will be created if it does not exist in the original context.

Temporal time resolution is in seconds and the library uses math.Ceil(d.Seconds()) to calculate the final value. This is subject to change in the future.

func WithLocalActivityOptions

func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context

WithLocalActivityOptions makes a copy of the context and adds the passed in options to the context. If a local activity options exists, it will be overwritten by the passed in value.

func WithRetryPolicy

func WithRetryPolicy(ctx Context, retryPolicy temporal.RetryPolicy) Context

WithRetryPolicy makes a copy of the current context and update the RetryPolicy field in its activity options. An empty activity options will be created if it does not exist in the original context.

func WithScheduleToCloseTimeout

func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context

WithScheduleToCloseTimeout makes a copy of the current context and update the ScheduleToCloseTimeout field in its activity options. An empty activity options will be created if it does not exist in the original context.

Temporal time resolution is in seconds and the library uses math.Ceil(d.Seconds()) to calculate the final value. This is subject to change in the future.

func WithScheduleToStartTimeout

func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context

WithScheduleToStartTimeout makes a copy of the current context and update the ScheduleToStartTimeout field in its activity options. An empty activity options will be created if it does not exist in the original context.

Temporal time resolution is in seconds and the library uses math.Ceil(d.Seconds()) to calculate the final value. This is subject to change in the future.

func WithStartToCloseTimeout

func WithStartToCloseTimeout(ctx Context, d time.Duration) Context

WithStartToCloseTimeout makes a copy of the current context and update the StartToCloseTimeout field in its activity options. An empty activity options will be created if it does not exist in the original context.

Temporal time resolution is in seconds and the library uses math.Ceil(d.Seconds()) to calculate the final value. This is subject to change in the future.

func WithTaskQueue

func WithTaskQueue(ctx Context, name string) Context

WithTaskQueue makes a copy of the current context and update the taskQueue field in its activity options. An empty activity options will be created if it does not exist in the original context.

func WithValue

func WithValue(parent Context, key interface{}, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

func WithWaitForCancellation

func WithWaitForCancellation(ctx Context, wait bool) Context

WithWaitForCancellation makes a copy of the current context and update the WaitForCancellation field in its activity options. An empty activity options will be created if it does not exist in the original context.

func WithWorkflowID

func WithWorkflowID(ctx Context, workflowID string) Context

WithWorkflowID adds a workflowID to the context.

func WithWorkflowNamespace

func WithWorkflowNamespace(ctx Context, name string) Context

WithWorkflowNamespace adds a namespace to the context.

func WithWorkflowRunTimeout

func WithWorkflowRunTimeout(ctx Context, d time.Duration) Context

WithWorkflowRunTimeout adds a run timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithWorkflowTaskQueue

func WithWorkflowTaskQueue(ctx Context, name string) Context

WithWorkflowTaskQueue adds a task queue to the context.

func WithWorkflowTaskTimeout

func WithWorkflowTaskTimeout(ctx Context, d time.Duration) Context

WithWorkflowTaskTimeout adds a workflow task timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithWorkflowVersioningIntent added in v1.23.0

func WithWorkflowVersioningIntent(ctx Context, intent temporal.VersioningIntent) Context

WithWorkflowVersioningIntent is used to set the VersioningIntent before constructing a ContinueAsNewError with NewContinueAsNewError.

type ContextAware added in v1.6.0

type ContextAware = internal.ContextAware

ContextAware is an optional interface that can be implemented alongside DataConverter. This interface allows Temporal to pass Workflow/Activity contexts to the DataConverter so that it may tailor it's behaviour.

Note that data converters may be called in non-context-aware situations to convert payloads that may not be customized per context. Data converter implementers should not expect or require contextual data be present.

type ContextPropagator

type ContextPropagator = internal.ContextPropagator

ContextPropagator is an interface that determines what information from context to pass along

type ContinueAsNewError

type ContinueAsNewError = internal.ContinueAsNewError

ContinueAsNewError can be returned by a workflow implementation function and indicates that the workflow should continue as new with the same WorkflowID, but new RunID and new history.

type ContinueAsNewErrorOptions added in v1.26.0

type ContinueAsNewErrorOptions = internal.ContinueAsNewErrorOptions

ContinueAsNewErrorOptions specifies optional attributes to be carried over to the next run.

type Execution

type Execution = internal.WorkflowExecution

Execution Details.

type Future

type Future = internal.Future

Future represents the result of an asynchronous computation.

func ExecuteActivity

func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteActivity requests activity execution in the context of a workflow. Context can be used to pass the settings for this activity. For example: task queue that this need to be routed, timeouts that need to be configured. Use ActivityOptions to pass down the options.

 ao := ActivityOptions{
	    TaskQueue: "exampleTaskQueue",
	    ScheduleToStartTimeout: 10 * time.Second,
	    StartToCloseTimeout: 5 * time.Second,
	    ScheduleToCloseTimeout: 10 * time.Second,
	    HeartbeatTimeout: 0,
	}
	ctx := WithActivityOptions(ctx, ao)

Or to override a single option

ctx := WithTaskQueue(ctx, "exampleTaskQueue")

Input activity is either an activity name (string) or a function representing an activity that is getting scheduled. Note that the function implementation is ignored by this call. It uses function to extract activity type string from it. Input args are the arguments that need to be passed to the scheduled activity. To call an activity that is a member of a structure use the function reference with nil receiver. For example if an activity is defined as:

type Activities struct {
  ... // members
}

func (a *Activities) Activity1() (string, error) {
   ...
}

Then a workflow can invoke it as:

var a *Activities
workflow.ExecuteActivity(ctx, a.Activity1)

If the activity failed to complete then the future get error would indicate the failure. The error will be of type *ActivityError. It will have important activity information and actual error that caused activity failure. Use errors.Unwrap to get this error or errors.As to check it type which can be one of *ApplicationError, *TimeoutError, *CanceledError, or *PanicError.

You can cancel the pending activity using context(workflow.WithCancel(ctx)) and that will fail the activity with *CanceledError set as cause for *ActivityError. The context in the activity only becomes aware of the cancellation when a heartbeat is sent to the server. Since heartbeats may be batched internally, this could take up to the HeartbeatTimeout to appear or several minutes by default if that value is not set.

ExecuteActivity immediately returns a Future that can be used to block waiting for activity result or failure.

func ExecuteLocalActivity

func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteLocalActivity requests to run a local activity. A local activity is like a regular activity with some key differences:

• Local activity is scheduled and run by the workflow worker locally.

• Local activity does not need Temporal server to schedule activity task and does not rely on activity worker.

• No need to register local activity.

• Local activity is for short living activities (usually finishes within seconds).

• Local activity cannot heartbeat.

Context can be used to pass the settings for this local activity. For now there is only one setting for timeout to be set:

lao := LocalActivityOptions{
	ScheduleToCloseTimeout: 5 * time.Second,
}
ctx := WithLocalActivityOptions(ctx, lao)

The timeout here should be relative shorter than the WorkflowTaskTimeout of the workflow. If you need a longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is designed to be used for short living activities (usually finishes within seconds).

Input args are the arguments that will to be passed to the local activity. The input args will be hand over directly to local activity function without serialization/deserialization because we don't need to pass the input across process boundary. However, the result will still go through serialization/deserialization because we need to record the result as history to temporal server so if the workflow crashes, a different worker can replay the history without running the local activity again.

If the activity failed to complete then the future get error would indicate the failure. The error will be of type *ActivityError. It will have important activity information and actual error that caused activity failure. Use errors.Unwrap to get this error or errors.As to check it type which can be one of *ApplicationError, *TimeoutError, *CanceledError, or *PanicError.

You can cancel the pending activity using context(workflow.WithCancel(ctx)) and that will fail the activity with *CanceledError set as cause for *ActivityError.

ExecuteLocalActivity returns Future with local activity result or failure.

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future

NewTimer returns immediately and the future becomes ready after the specified duration d. Workflows must use this NewTimer() to get the timer, instead of Go's timer.NewTimer(). You can cancel the pending timer by canceling the Context (using the context from workflow.WithCancel(ctx)) and that will cancel the timer. After the timer is canceled, the returned Future becomes ready, and Future.Get() will return *CanceledError.

func RequestCancelExternalWorkflow

func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future

RequestCancelExternalWorkflow can be used to request cancellation of an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace of the target workflow using the context like:

ctx := WithWorkflowNamespace(ctx, "namespace")

RequestCancelExternalWorkflow return Future with failure or empty success result.

func SignalExternalWorkflow

func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future

SignalExternalWorkflow can be used to send signal info to an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace of the target workflow using the context like:

ctx := WithWorkflowNamespace(ctx, "namespace")

SignalExternalWorkflow return Future with failure or empty success result.

type HeaderReader

type HeaderReader = internal.HeaderReader

HeaderReader is an interface to read information from temporal headers

type HeaderWriter

type HeaderWriter = internal.HeaderWriter

HeaderWriter is an interface to write information to temporal headers

type Info

type Info = internal.WorkflowInfo

Info information about currently executing workflow

func GetInfo

func GetInfo(ctx Context) *Info

GetInfo extracts info of a current workflow from a context.

type LocalActivityOptions

type LocalActivityOptions = internal.LocalActivityOptions

LocalActivityOptions doc

func GetLocalActivityOptions added in v1.6.0

func GetLocalActivityOptions(ctx Context) LocalActivityOptions

GetLocalActivityOptions returns all local activity options present on the context.

type ReceiveChannel

type ReceiveChannel = internal.ReceiveChannel

ReceiveChannel is a read-only view of the Channel

func GetSignalChannel

func GetSignalChannel(ctx Context, signalName string) ReceiveChannel

GetSignalChannel returns channel corresponding to the signal name.

type RegisterOptions

type RegisterOptions = internal.RegisterWorkflowOptions

RegisterOptions consists of options for registering a workflow

type Selector

type Selector = internal.Selector

Selector must be used instead of native go select by workflow code. Use workflow.NewSelector method to create a Selector instance.

func NewNamedSelector

func NewNamedSelector(ctx Context, name string) Selector

NewNamedSelector creates a new Selector instance with a given human-readable name. The name appears in stack traces that are blocked on this Selector.

func NewSelector

func NewSelector(ctx Context) Selector

NewSelector creates a new Selector instance.

type SendChannel

type SendChannel = internal.SendChannel

SendChannel is a write-only view of the Channel

type SessionInfo

type SessionInfo = internal.SessionInfo

SessionInfo contains information of a created session. For now, exported fields are SessionID and HostName.

SessionID is a uuid generated when CreateSession() or RecreateSession() is called and can be used to uniquely identify a session.

HostName specifies which host is executing the session

SessionState specifies the current know state of the session.

Note: Sessions have an inherently stale view of the worker they are running on. Session state may be stale up the the SessionOptions.HeartbeatTimeout. SessionOptions.HeartbeatTimeout should be less than half the activity timeout for the state to be accurate when checking after activity failure.

func GetSessionInfo

func GetSessionInfo(ctx Context) *SessionInfo

GetSessionInfo returns the sessionInfo stored in the context. If there are multiple sessions in the context, (for example, the same context is used to create, complete, create another session. Then user found that the session has failed, and created a new one on it), the most recent sessionInfo will be returned.

This API will return nil if there's no sessionInfo in the context.

type SessionOptions

type SessionOptions = internal.SessionOptions

SessionOptions specifies metadata for a session. ExecutionTimeout: required, no default

Specifies the maximum amount of time the session can run

CreationTimeout: required, no default

Specifies how long session creation can take before returning an error

HeartbeatTimeout: optional, default 20s

Specifies the heartbeat timeout. If heartbeat is not received by server
within the timeout, the session will be declared as failed

type SessionState added in v1.21.0

type SessionState = internal.SessionState

SessionState specifies the state of the session.

type Settable

type Settable = internal.Settable

Settable is used to set value or error on a future. See more: workflow.NewFuture.

type Type

type Type = internal.WorkflowType

Type identifies a workflow type.

type UpdateHandlerOptions added in v1.20.0

type UpdateHandlerOptions = internal.UpdateHandlerOptions

type UpdateInfo added in v1.24.0

type UpdateInfo = internal.UpdateInfo

UpdateInfo information about a currently running update

NOTE: Experimental

func GetCurrentUpdateInfo added in v1.27.0

func GetCurrentUpdateInfo(ctx Context) *UpdateInfo

GetCurrentUpdateInfo returns information about the currently running update if any from the context.

NOTE: Experimental

type Version

type Version = internal.Version

Version represents a change version. See GetVersion call.

const DefaultVersion Version = internal.DefaultVersion

DefaultVersion is a version returned by GetVersion for code that wasn't versioned before

func GetVersion

func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It is not allowed to update workflow code while there are workflows running as it is going to break determinism. The solution is to have both old code that is used to replay existing workflows as well as the new one that is used when it is executed for the first time. GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. For example initially workflow has the following code:

err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)

it should be updated to

err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)

The backwards compatible way to execute the update is

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 0)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
}

Then bar has to be changed to baz:

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 1)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else if v == 0 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

Later when there are no workflow executions running DefaultVersion the correspondent branch can be removed:

v :=  GetVersion(ctx, "fooChange", 0, 1)
if v == 0 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

It is recommended to keep the GetVersion() call even if single branch is left:

GetVersion(ctx, "fooChange", 1, 1)
err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)

The reason to keep it is: 1) it ensures that if there is older version execution still running, it will fail here and not proceed; 2) if you ever need to make more changes for “fooChange”, for example change activity from baz to qux, you just need to update the maxVersion from 1 to 2.

Note that, you only need to preserve the first call to GetVersion() for each changeID. All subsequent call to GetVersion() with same changeID are safe to remove. However, if you really want to get rid of the first GetVersion() call as well, you can do so, but you need to make sure: 1) all older version executions are completed; 2) you can no longer use “fooChange” as changeID. If you ever need to make changes to that same part like change from baz to qux, you would need to use a different changeID like “fooChange-fix2”, and start minVersion from DefaultVersion again. The code would looks like:

v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 0)
if v == workflow.DefaultVersion {
  err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil)
} else {
  err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil)
}

type WaitGroup

type WaitGroup = internal.WaitGroup

WaitGroup is used to wait for a collection of coroutines to finish

func NewWaitGroup

func NewWaitGroup(ctx Context) WaitGroup

NewWaitGroup creates a new WaitGroup instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL