workflow

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2018 License: MIT Imports: 6 Imported by: 510

Documentation

Overview

Package workflow contains functions and types used to implement Cadence workflows.

Index

Constants

This section is empty.

Variables

View Source
var ErrCanceled = internal.ErrCanceled

ErrCanceled is the error returned by Context.Err when the context is canceled.

View Source
var ErrDeadlineExceeded = internal.ErrDeadlineExceeded

ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.

Functions

func GetLogger

func GetLogger(ctx Context) *zap.Logger

GetLogger returns a logger to be used in workflow's context

func GetMetricsScope

func GetMetricsScope(ctx Context) tally.Scope

GetMetricsScope returns a metrics scope to be used in workflow's context

func Go

func Go(ctx Context, f func(ctx Context))

Go creates a new coroutine. It has similar semantic to goroutine in a context of the workflow.

func GoNamed

func GoNamed(ctx Context, name string, f func(ctx Context))

GoNamed creates a new coroutine with a given human readable name. It has similar semantic to goroutine in a context of the workflow. Name appears in stack traces that are blocked on this Channel.

func IsReplaying added in v0.5.1

func IsReplaying(ctx Context) bool

IsReplaying returns whether the current workflow code is replaying.

Warning! Never make decisions, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on this flag as it is going to break workflow determinism requirement. The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or metric reporting. Please note that Cadence already provide standard logging/metric via workflow.GetLogger(ctx) and workflow.GetMetricsScope(ctx), and those standard mechanism are replay-aware and it will automatically suppress during replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.

Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a workflow causes decision task to fail and cadence server will rescheduled later to retry.

func MutableSideEffect added in v0.6.1

func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) encoded.Value

MutableSideEffect executes the provided function once, then it looks up the history for the value with the given id. If there is no existing value, then it records the function result as a value with the given id on history; otherwise, it compares whether the existing value from history has changed from the new function result by calling the provided equals function. If they are equal, it returns the value without recording a new one in history;

otherwise, it records the new value with the same id on history.

Caution: do not use MutableSideEffect to modify closures. Always retrieve result from MutableSideEffect's encoded return value.

The difference between MutableSideEffect() and SideEffect() is that every new SideEffect() call in non-replay will result in a new marker being recorded on history. However, MutableSideEffect() only records a new marker if the value changed. During replay, MutableSideEffect() will not execute the function again, but it will return the exact same value as it was returning during the non-replay run.

One good use case of MutableSideEffect() is to access dynamically changing config without breaking determinism.

func NewDisconnectedContext added in v0.5.1

func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc)

NewDisconnectedContext returns a new context that won't propagate parent's cancellation to the new child context. One common use case is to do cleanup work after workflow is cancelled.

err := workflow.ExecuteActivity(ctx, ActivityFoo).Get(ctx, &activityFooResult)
if err != nil && cadence.IsCanceledError(ctx.Err()) {
  // activity failed, and workflow context is canceled
  disconnectedCtx, _ := workflow.newDisconnectedContext(ctx);
  workflow.ExecuteActivity(disconnectedCtx, handleCancellationActivity).Get(disconnectedCtx, nil)
  return err // workflow return CanceledError
}

func NewFuture

func NewFuture(ctx Context) (Future, Settable)

NewFuture creates a new future as well as associated Settable that is used to set its value.

func Now

func Now(ctx Context) time.Time

Now returns the current time when the decision is started or replayed. The workflow needs to use this Now() to get the wall clock time instead of the Go lang library one.

func Register

func Register(workflowFunc interface{})

Register - registers a workflow function with the framework. A workflow takes a workflow context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func RegisterWithOptions

func RegisterWithOptions(workflowFunc interface{}, opts RegisterOptions)

RegisterWithOptions registers the workflow function with options The user can use options to provide an external name for the workflow or leave it empty if no external name is required. This can be used as

client.RegisterWorkflow(sampleWorkflow, RegisterOptions{})
client.RegisterWorkflow(sampleWorkflow, RegisterOptions{Name: "foo"})

A workflow takes a workflow context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func SetQueryHandler

func SetQueryHandler(ctx Context, queryType string, handler interface{}) error

SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler should handle. The handler must be a function that returns 2 values. The first return value must be a serializable result. The second return value must be an error. The handler function could receive any number of input parameters. All the input parameter must be serializable. You should call workflow.SetQueryHandler() at the beginning of the workflow code. When client calls Client.QueryWorkflow() to cadence server, a task will be generated on server that will be dispatched to a workflow worker, which will replay the history events and then execute a query handler based on the query type. The query handler will be invoked out of the context of the workflow, meaning that the handler code must not use workflow context to do things like workflow.NewChannel(), workflow.Go() or to call any workflow blocking functions like Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive QueryFailedError. Example of workflow code that support query type "current_state":

func MyWorkflow(ctx workflow.Context, input string) error {
  currentState := "started" // this could be any serializable struct
  err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) {
    return currentState, nil
  })
  if err != nil {
    currentState = "failed to register query handler"
    return err
  }
  // your normal workflow code begins here, and you update the currentState as the code makes progress.
  currentState = "waiting timer"
  err = NewTimer(ctx, time.Hour).Get(ctx, nil)
  if err != nil {
    currentState = "timer failed"
    return err
  }

  currentState = "waiting activity"
  ctx = WithActivityOptions(ctx, myActivityOptions)
  err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
  if err != nil {
    currentState = "activity failed"
    return err
  }
  currentState = "done"
  return nil
}

func SideEffect

func SideEffect(ctx Context, f func(ctx Context) interface{}) encoded.Value

SideEffect executes the provided function once, records its result into the workflow history. The recorded result on history will be returned without executing the provided function during replay. This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay. Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID. The only way to fail SideEffect is to panic which causes decision task failure. The decision task after timeout is rescheduled and re-executed giving SideEffect another chance to succeed.

Caution: do not use SideEffect to modify closures. Always retrieve result from SideEffect's encoded return value. For example this code is BROKEN:

// Bad example:
var random int
workflow.SideEffect(func(ctx workflow.Context) interface{} {
       random = rand.Intn(100)
       return nil
})
// random will always be 0 in replay, thus this code is non-deterministic
if random < 50 {
       ....
} else {
       ....
}

On replay the provided function is not executed, the random will always be 0, and the workflow could takes a different path breaking the determinism.

Here is the correct way to use SideEffect:

// Good example:
encodedRandom := SideEffect(func(ctx workflow.Context) interface{} {
      return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
if random < 50 {
       ....
} else {
       ....
}

func Sleep

func Sleep(ctx Context, d time.Duration) (err error)

Sleep pauses the current workflow for at least the duration d. A negative or zero duration causes Sleep to return immediately. Workflow code needs to use this Sleep() to sleep instead of the Go lang library one(timer.Sleep()). You can cancel the pending sleep by cancel the Context (using context from workflow.WithCancel(ctx)). Sleep() returns nil if the duration d is passed, or it returns *CanceledError if the ctx is canceled. There are 2 reasons the ctx could be canceled: 1) your workflow code cancel the ctx (with workflow.WithCancel(ctx)); 2) your workflow itself is canceled by external request. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

Types

type ActivityOptions

type ActivityOptions = internal.ActivityOptions

ActivityOptions stores all activity-specific invocation parameters that will be stored inside of a context.

type CancelFunc

type CancelFunc = internal.CancelFunc

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.

type Channel

type Channel = internal.Channel

Channel must be used instead of native go channel by workflow code. Use workflow.NewChannel(ctx) method to create Channel instance.

func GetSignalChannel

func GetSignalChannel(ctx Context, signalName string) Channel

GetSignalChannel returns channel corresponding to the signal name.

func NewBufferedChannel

func NewBufferedChannel(ctx Context, size int) Channel

NewBufferedChannel create new buffered Channel instance

func NewChannel

func NewChannel(ctx Context) Channel

NewChannel create new Channel instance

func NewNamedBufferedChannel

func NewNamedBufferedChannel(ctx Context, name string, size int) Channel

NewNamedBufferedChannel create new BufferedChannel instance with a given human readable name. Name appears in stack traces that are blocked on this Channel.

func NewNamedChannel

func NewNamedChannel(ctx Context, name string) Channel

NewNamedChannel create new Channel instance with a given human readable name. Name appears in stack traces that are blocked on this channel.

type ChildWorkflowFuture

type ChildWorkflowFuture = internal.ChildWorkflowFuture

ChildWorkflowFuture represents the result of a child workflow execution

func ExecuteChildWorkflow

func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interface{}) ChildWorkflowFuture

ExecuteChildWorkflow requests child workflow execution in the context of a workflow. Context can be used to pass the settings for the child workflow. For example: task list that this child workflow should be routed, timeouts that need to be configured. Use ChildWorkflowOptions to pass down the options.

 cwo := ChildWorkflowOptions{
	    ExecutionStartToCloseTimeout: 10 * time.Minute,
	    TaskStartToCloseTimeout: time.Minute,
	}
 ctx := WithChildOptions(ctx, cwo)

Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. Input args are the arguments that need to be passed to the child workflow function represented by childWorkflow. If the child workflow failed to complete then the future get error would indicate the failure and it can be one of CustomError, TimeoutError, CanceledError, GenericError. You can cancel the pending child workflow using context(workflow.WithCancel(ctx)) and that will fail the workflow with error CanceledError. ExecuteChildWorkflow returns ChildWorkflowFuture.

type ChildWorkflowOptions

type ChildWorkflowOptions = internal.ChildWorkflowOptions

ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context.

type ChildWorkflowPolicy

type ChildWorkflowPolicy = internal.ChildWorkflowPolicy

ChildWorkflowPolicy defines child workflow behavior when parent workflow is terminated.

const (
	// ChildWorkflowPolicyTerminate is policy that will terminate all child workflows when parent workflow is terminated.
	ChildWorkflowPolicyTerminate ChildWorkflowPolicy = internal.ChildWorkflowPolicyTerminate
	// ChildWorkflowPolicyRequestCancel is policy that will send cancel request to all open child workflows when parent
	// workflow is terminated.
	ChildWorkflowPolicyRequestCancel ChildWorkflowPolicy = internal.ChildWorkflowPolicyRequestCancel
	// ChildWorkflowPolicyAbandon is policy that will have no impact to child workflow execution when parent workflow is
	// terminated.
	ChildWorkflowPolicyAbandon ChildWorkflowPolicy = internal.ChildWorkflowPolicyAbandon
)

type Context

type Context = internal.Context

Context is a clone of context.Context with Done() returning Channel instead of native channel. A Context carries a deadline, a cancellation signal, and other values across API boundaries.

Context's methods may be called by multiple goroutines simultaneously.

func WithActivityOptions

func WithActivityOptions(ctx Context, options ActivityOptions) Context

WithActivityOptions adds all options to the copy of the context.

func WithChildOptions

func WithChildOptions(ctx Context, cwo ChildWorkflowOptions) Context

WithChildOptions adds all workflow options to the context.

func WithChildPolicy

func WithChildPolicy(ctx Context, childPolicy ChildWorkflowPolicy) Context

WithChildPolicy adds a ChildWorkflowPolicy to the context.

func WithDataConverter added in v0.7.0

func WithDataConverter(ctx Context, dc encoded.DataConverter) Context

WithDataConverter adds DataConverter to the context.

func WithExecutionStartToCloseTimeout

func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context

WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(ctx Context, d time.Duration) Context

WithHeartbeatTimeout adds a timeout to the copy of the context.

func WithLocalActivityOptions added in v0.5.1

func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context

WithLocalActivityOptions adds options for local activity to context

func WithScheduleToCloseTimeout

func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context

WithScheduleToCloseTimeout adds a timeout to the copy of the context.

func WithScheduleToStartTimeout

func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context

WithScheduleToStartTimeout adds a timeout to the copy of the context.

func WithStartToCloseTimeout

func WithStartToCloseTimeout(ctx Context, d time.Duration) Context

WithStartToCloseTimeout adds a timeout to the copy of the context.

func WithTaskList

func WithTaskList(ctx Context, name string) Context

WithTaskList adds a task list to the copy of the context.

func WithValue

func WithValue(parent Context, key interface{}, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

func WithWaitForCancellation

func WithWaitForCancellation(ctx Context, wait bool) Context

WithWaitForCancellation adds wait for the cacellation to the copy of the context.

func WithWorkflowDomain

func WithWorkflowDomain(ctx Context, name string) Context

WithWorkflowDomain adds a domain to the context.

func WithWorkflowID

func WithWorkflowID(ctx Context, workflowID string) Context

WithWorkflowID adds a workflowID to the context.

func WithWorkflowTaskList

func WithWorkflowTaskList(ctx Context, name string) Context

WithWorkflowTaskList adds a task list to the context.

func WithWorkflowTaskStartToCloseTimeout

func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context

WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context.

type ContinueAsNewError

type ContinueAsNewError = internal.ContinueAsNewError

ContinueAsNewError can be returned by a workflow implementation function and indicates that the workflow should continue as new with the same WorkflowID, but new RunID and new history.

func NewContinueAsNewError

func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError

NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.

 ctx - use context to override any options for the new workflow like execution time out, decision task time out, task list.
	  if not mentioned it would use the defaults that the current workflow is using.
       ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute)
       ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute)
	  ctx := WithWorkflowTaskList(ctx, "example-group")
 wfn - workflow function. for new execution it can be different from the currently running.
 args - arguments for the new workflow.

type Execution

type Execution = internal.WorkflowExecution

Execution Details.

type Future

type Future = internal.Future

Future represents the result of an asynchronous computation.

func ExecuteActivity

func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteActivity requests activity execution in the context of a workflow. Context can be used to pass the settings for this activity. For example: task list that this need to be routed, timeouts that need to be configured. Use ActivityOptions to pass down the options.

 ao := ActivityOptions{
	    TaskList: "exampleTaskList",
	    ScheduleToStartTimeout: 10 * time.Second,
	    StartToCloseTimeout: 5 * time.Second,
	    ScheduleToCloseTimeout: 10 * time.Second,
	    HeartbeatTimeout: 0,
	}
	ctx := WithActivityOptions(ctx, ao)

Or to override a single option

ctx := WithTaskList(ctx, "exampleTaskList")

Input activity is either an activity name (string) or a function representing an activity that is getting scheduled. Input args are the arguments that need to be passed to the scheduled activity.

If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity using context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.

ExecuteActivity returns Future with activity result or failure.

func ExecuteLocalActivity added in v0.5.1

func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteLocalActivity requests to run a local activity. A local activity is like a regular activity with some key differences: * Local activity is scheduled and run by the workflow worker locally. * Local activity does not need Cadence server to schedule activity task and does not rely on activity worker. * No need to register local activity. * The parameter activity to ExecuteLocalActivity() must be a function. * Local activity is for short living activities (usually finishes within seconds). * Local activity cannot heartbeat.

Context can be used to pass the settings for this local activity. For now there is only one setting for timeout to be set:

 lao := LocalActivityOptions{
	    ScheduleToCloseTimeout: 5 * time.Second,
	}
	ctx := WithLocalActivityOptions(ctx, lao)

The timeout here should be relative shorter than the DecisionTaskStartToCloseTimeout of the workflow. If you need a longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is designed to be used for short living activities (usually finishes within seconds).

Input args are the arguments that will to be passed to the local activity. The input args will be hand over directly to local activity function without serialization/deserialization because we don't need to pass the input across process boundary. However, the result will still go through serialization/deserialization because we need to record the result as history to cadence server so if the workflow crashes, a different worker can replay the history without running the local activity again.

If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity by cancel the context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.

ExecuteLocalActivity returns Future with local activity result or failure.

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future

NewTimer returns immediately and the future becomes ready after the specified duration d. The workflow needs to use this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer()). You can cancel the pending timer by cancel the Context (using context from workflow.WithCancel(ctx)) and that will cancel the timer. After timer is canceled, the returned Future become ready, and Future.Get() will return *CanceledError. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func RequestCancelExternalWorkflow added in v0.5.1

func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future

RequestCancelExternalWorkflow can be used to request cancellation of an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:

ctx := WithWorkflowDomain(ctx, "domain-name")

RequestCancelExternalWorkflow return Future with failure or empty success result.

func SignalExternalWorkflow added in v0.5.1

func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future

SignalExternalWorkflow can be used to send signal info to an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:

ctx := WithWorkflowDomain(ctx, "domain-name")

SignalExternalWorkflow return Future with failure or empty success result.

type GenericError

type GenericError = internal.GenericError

GenericError is returned from activity or child workflow when an implementations return error other than from workflow.NewCustomError() API.

type Info

type Info = internal.WorkflowInfo

Info information about currently executing workflow

func GetInfo

func GetInfo(ctx Context) *Info

GetInfo extracts info of a current workflow from a context.

type LocalActivityOptions added in v0.5.1

type LocalActivityOptions = internal.LocalActivityOptions

LocalActivityOptions doc

type PanicError

type PanicError = internal.PanicError

PanicError contains information about panicked workflow/activity.

type RegisterOptions

type RegisterOptions = internal.RegisterWorkflowOptions

RegisterOptions consists of options for registering a workflow

type Selector

type Selector = internal.Selector

Selector must be used instead of native go select by workflow code. Use workflow.NewSelector(ctx) method to create a Selector instance.

func NewNamedSelector

func NewNamedSelector(ctx Context, name string) Selector

NewNamedSelector creates a new Selector instance with a given human readable name. Name appears in stack traces that are blocked on this Selector.

func NewSelector

func NewSelector(ctx Context) Selector

NewSelector creates a new Selector instance.

type Settable

type Settable = internal.Settable

Settable is used to set value or error on a future. See more: workflow.NewFuture(ctx).

type TerminatedError

type TerminatedError = internal.TerminatedError

TerminatedError returned when workflow was terminated.

type TimeoutError

type TimeoutError = internal.TimeoutError

TimeoutError returned when activity or child workflow timed out.

func NewHeartbeatTimeoutError

func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError

NewHeartbeatTimeoutError creates TimeoutError instance WARNING: This function is public only to support unit testing of workflows. It shouldn't be used by application level code.

func NewTimeoutError

func NewTimeoutError(timeoutType shared.TimeoutType) *TimeoutError

NewTimeoutError creates TimeoutError instance. Use NewHeartbeatTimeoutError to create heartbeat TimeoutError WARNING: This function is public only to support unit testing of workflows. It shouldn't be used by application level code.

type Type

type Type = internal.WorkflowType

Type identifies a workflow type.

type Version

type Version = internal.Version

Version represents a change version. See GetVersion call.

const DefaultVersion Version = internal.DefaultVersion

DefaultVersion is a version returned by GetVersion for code that wasn't versioned before

func GetVersion

func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It is not allowed to update workflow code while there are workflows running as it is going to break determinism. The solution is to have both old code that is used to replay existing workflows as well as the new one that is used when it is executed for the first time. GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. For example initially workflow has the following code:

err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)

it should be updated to

err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)

The backwards compatible way to execute the update is

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 1)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
}

Then bar has to be changed to baz:

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 2)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else if v == 1 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

Later when there are no workflow executions running DefaultVersion the correspondent branch can be removed:

v :=  GetVersion(ctx, "fooChange", 1, 2)
if v == 1 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

It is recommended to keep the GetVersion() call even if single branch is left:

GetVersion(ctx, "fooChange", 2, 2)
err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)

The reason to keep it is: 1) it ensures that if there is older version execution still running, it will fail here and not proceed; 2) if you ever need to make more changes for “fooChange”, for example change activity from baz to qux, you just need to update the maxVersion from 2 to 3.

Note that, you only need to preserve the first call to GetVersion() for each changeID. All subsequent call to GetVersion() with same changeID are safe to remove. However, if you really want to get rid of the first GetVersion() call as well, you can do so, but you need to make sure: 1) all older version executions are completed; 2) you can no longer use “fooChange” as changeID. If you ever need to make changes to that same part like change from baz to qux, you would need to use a different changeID like “fooChange-fix2”, and start minVersion from DefaultVersion again. The code would looks like:

v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
  err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil)
} else {
  err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL