Documentation ¶
Index ¶
- Constants
- Variables
- func Await(ctx Context, condition func() bool) error
- func CompleteSession(ctx Context)
- func EnableVerboseLogging(enable bool)
- func GetActivityLogger(ctx context.Context) *zap.Logger
- func GetActivityMetricsScope(ctx context.Context) tally.Scope
- func GetHeartbeatDetails(ctx context.Context, d ...interface{}) error
- func GetLastCompletionResult(ctx Context, d ...interface{}) error
- func GetLogger(ctx Context) *zap.Logger
- func GetMetricsScope(ctx Context) tally.Scope
- func GetRegisteredWorkflowTypes() []string
- func GetUnhandledSignalNames(ctx Context) []string
- func GetWorkerStopChannel(ctx context.Context) <-chan struct{}
- func Go(ctx Context, f func(ctx Context))
- func GoNamed(ctx Context, name string, f func(ctx Context))
- func HasHeartbeatDetails(ctx context.Context) bool
- func HasLastCompletionResult(ctx Context) bool
- func IsCanceledError(err error) bool
- func IsReplayDomain(dn string) bool
- func IsReplaying(ctx Context) bool
- func NewAdminJwtAuthorizationProvider(privateKey []byte) auth.AuthorizationProvider
- func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc)
- func NewFuture(ctx Context) (Future, Settable)
- func NewWorker(service workflowserviceclient.Interface, domain string, taskList string, ...) *aggregatedWorker
- func Now(ctx Context) time.Time
- func RecordActivityHeartbeat(ctx context.Context, details ...interface{})
- func RegisterActivity(activityFunc interface{})
- func RegisterActivityWithOptions(activityFunc interface{}, opts RegisterActivityOptions)
- func RegisterWorkflow(workflowFunc interface{})
- func RegisterWorkflowWithOptions(workflowFunc interface{}, opts RegisterWorkflowOptions)
- func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
- func ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, ...) error
- func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error
- func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error
- func SetBinaryChecksum(checksum string)
- func SetQueryHandler(ctx Context, queryType string, handler interface{}) error
- func SetStickyWorkflowCacheSize(cacheSize int)
- func Sleep(ctx Context, d time.Duration) (err error)
- func StartVersionMetrics(metricsScope tally.Scope)
- func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error
- func WithActivityTask(ctx context.Context, task *shared.PollForActivityTaskResponse, taskList string, ...) context.Context
- func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
- type ActivityInfo
- type ActivityOptions
- type ActivityTaskHandler
- type ActivityType
- type Bugports
- type CancelFunc
- type CancelReason
- type CanceledError
- type Channel
- type ChildWorkflowFuture
- type ChildWorkflowOptions
- type Client
- type ClientOptions
- type Context
- func Background() Context
- func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error)
- func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionOptions) (Context, error)
- func WithActivityOptions(ctx Context, options ActivityOptions) Context
- func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context
- func WithDataConverter(ctx Context, dc DataConverter) Context
- func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context
- func WithHeartbeatTimeout(ctx Context, d time.Duration) Context
- func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context
- func WithRetryPolicy(ctx Context, retryPolicy RetryPolicy) Context
- func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context
- func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context
- func WithStartToCloseTimeout(ctx Context, d time.Duration) Context
- func WithTaskList(ctx Context, name string) Context
- func WithValue(parent Context, key interface{}, val interface{}) Context
- func WithWaitForCancellation(ctx Context, wait bool) Context
- func WithWorkflowDomain(ctx Context, name string) Context
- func WithWorkflowID(ctx Context, workflowID string) Context
- func WithWorkflowTaskList(ctx Context, name string) Context
- func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context
- type ContextPropagator
- type ContinueAsNewError
- func (e *ContinueAsNewError) Args() []interface{}
- func (e *ContinueAsNewError) Error() string
- func (e *ContinueAsNewError) Header() *shared.Header
- func (e *ContinueAsNewError) Input() []byte
- func (e *ContinueAsNewError) WorkflowIDReusePolicy() WorkflowIDReusePolicy
- func (e *ContinueAsNewError) WorkflowType() *WorkflowType
- type CustomError
- type DataConverter
- type DomainClient
- type EncodedValue
- type EncodedValues
- type ErrorDetailsValues
- type FeatureFlags
- type Future
- func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future
- func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}) Future
- func NewTimer(ctx Context, d time.Duration) Future
- func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future
- func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future
- type GenericError
- type HeaderReader
- type HeaderWriter
- type HistoryEventIterator
- type HistoryIterator
- type JWTAuthProvider
- type LocalActivityOptions
- type MockCallWrapper
- func (c *MockCallWrapper) After(d time.Duration) *MockCallWrapper
- func (c *MockCallWrapper) AfterFn(fn func() time.Duration) *MockCallWrapper
- func (c *MockCallWrapper) Once() *MockCallWrapper
- func (c *MockCallWrapper) Return(returnArguments ...interface{}) *MockCallWrapper
- func (c *MockCallWrapper) Run(fn func(args mock.Arguments)) *MockCallWrapper
- func (c *MockCallWrapper) Times(i int) *MockCallWrapper
- func (c *MockCallWrapper) Twice() *MockCallWrapper
- type NonDeterministicWorkflowPolicy
- type Option
- type PanicError
- type ParentClosePolicy
- type QueryBuilder
- type QueryWorkflowWithOptionsRequest
- type QueryWorkflowWithOptionsResponse
- type RegisterActivityOptions
- type RegisterWorkflowOptions
- type ReplayOptions
- type RetryPolicy
- type Selector
- type ServiceInvoker
- type SessionInfo
- type SessionOptions
- type Settable
- type ShadowExitCondition
- type ShadowMode
- type ShadowOptions
- type StartWorkflowOptions
- type TerminatedError
- type TestActivityEnvironment
- func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args ...interface{}) (Value, error)
- func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, err error)
- func (t *TestActivityEnvironment) RegisterActivity(a interface{})
- func (t *TestActivityEnvironment) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
- func (t *TestActivityEnvironment) SetHeartbeatDetails(details interface{})
- func (t *TestActivityEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestActivityEnvironment
- func (t *TestActivityEnvironment) SetWorkerOptions(options WorkerOptions) *TestActivityEnvironment
- func (t *TestActivityEnvironment) SetWorkerStopChannel(c chan struct{})
- type TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) CancelWorkflow()
- func (t *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result interface{}, err error) error
- func (t *TestWorkflowEnvironment) ExecuteWorkflow(workflowFn interface{}, args ...interface{})
- func (t *TestWorkflowEnvironment) GetWorkflowError() error
- func (t *TestWorkflowEnvironment) GetWorkflowResult(valuePtr interface{}) error
- func (t *TestWorkflowEnvironment) IsWorkflowCompleted() bool
- func (t *TestWorkflowEnvironment) Now() time.Time
- func (t *TestWorkflowEnvironment) OnActivity(activity interface{}, args ...interface{}) *MockCallWrapper
- func (t *TestWorkflowEnvironment) OnGetVersion(changeID string, minSupported, maxSupported Version) *MockCallWrapper
- func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(domainName, workflowID, runID string) *MockCallWrapper
- func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(domainName, workflowID, runID, signalName, arg interface{}) *MockCallWrapper
- func (t *TestWorkflowEnvironment) OnUpsertSearchAttributes(attributes map[string]interface{}) *MockCallWrapper
- func (t *TestWorkflowEnvironment) OnWorkflow(workflow interface{}, args ...interface{}) *MockCallWrapper
- func (t *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interface{}) (Value, error)
- func (t *TestWorkflowEnvironment) RegisterActivity(a interface{})
- func (t *TestWorkflowEnvironment) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
- func (t *TestWorkflowEnvironment) RegisterDelayedCallback(callback func(), delayDuration time.Duration)
- func (t *TestWorkflowEnvironment) RegisterWorkflow(w interface{})
- func (t *TestWorkflowEnvironment) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions)
- func (t *TestWorkflowEnvironment) SetActivityTaskList(tasklist string, activityFn ...interface{})
- func (t *TestWorkflowEnvironment) SetLastCompletionResult(result interface{})
- func (t *TestWorkflowEnvironment) SetMemoOnStart(memo map[string]interface{}) error
- func (t *TestWorkflowEnvironment) SetOnActivityCanceledListener(listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnActivityCompletedListener(listener func(activityInfo *ActivityInfo, result Value, err error)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnActivityHeartbeatListener(listener func(activityInfo *ActivityInfo, details Values)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnActivityStartedListener(listener func(activityInfo *ActivityInfo, ctx context.Context, args Values)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener(listener func(workflowInfo *WorkflowInfo)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener(listener func(workflowInfo *WorkflowInfo, result Value, err error)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnChildWorkflowStartedListener(listener func(workflowInfo *WorkflowInfo, ctx Context, args Values)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnLocalActivityCanceledListener(listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnLocalActivityCompletedListener(listener func(activityInfo *ActivityInfo, result Value, err error)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnLocalActivityStartedListener(...) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnTimerCancelledListener(listener func(timerID string)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnTimerFiredListener(listener func(timerID string)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetOnTimerScheduledListener(listener func(timerID string, duration time.Duration)) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes map[string]interface{}) error
- func (t *TestWorkflowEnvironment) SetStartTime(startTime time.Time)
- func (t *TestWorkflowEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetWorkerStopChannel(c chan struct{})
- func (t *TestWorkflowEnvironment) SetWorkflowCronMaxIterations(maxIterations int) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetWorkflowCronSchedule(cron string) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SetWorkflowTimeout(executionTimeout time.Duration) *TestWorkflowEnvironment
- func (t *TestWorkflowEnvironment) SignalWorkflow(name string, input interface{})
- func (t *TestWorkflowEnvironment) SignalWorkflowByID(workflowID, signalName string, input interface{}) error
- func (t *TestWorkflowEnvironment) SignalWorkflowSkippingDecision(name string, input interface{})
- type TimeFilter
- type TimeoutError
- type UnknownExternalWorkflowExecutionError
- type Value
- type Values
- type Version
- type WaitGroup
- type WorkerOptions
- type WorkflowExecution
- type WorkflowExecutionContext
- type WorkflowIDReusePolicy
- type WorkflowInfo
- type WorkflowInterceptor
- type WorkflowInterceptorBase
- func (t *WorkflowInterceptorBase) ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future
- func (t *WorkflowInterceptorBase) ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture
- func (t *WorkflowInterceptorBase) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future
- func (t *WorkflowInterceptorBase) ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{}
- func (t *WorkflowInterceptorBase) GetLastCompletionResult(ctx Context, d ...interface{}) error
- func (t *WorkflowInterceptorBase) GetLogger(ctx Context) *zap.Logger
- func (t *WorkflowInterceptorBase) GetMetricsScope(ctx Context) tally.Scope
- func (t *WorkflowInterceptorBase) GetSignalChannel(ctx Context, signalName string) Channel
- func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
- func (t *WorkflowInterceptorBase) GetWorkflowInfo(ctx Context) *WorkflowInfo
- func (t *WorkflowInterceptorBase) HasLastCompletionResult(ctx Context) bool
- func (t *WorkflowInterceptorBase) IsReplaying(ctx Context) bool
- func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, ...) Value
- func (t *WorkflowInterceptorBase) NewTimer(ctx Context, d time.Duration) Future
- func (t *WorkflowInterceptorBase) Now(ctx Context) time.Time
- func (t *WorkflowInterceptorBase) RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future
- func (t *WorkflowInterceptorBase) SetQueryHandler(ctx Context, queryType string, handler interface{}) error
- func (t *WorkflowInterceptorBase) SideEffect(ctx Context, f func(ctx Context) interface{}) Value
- func (t *WorkflowInterceptorBase) SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future
- func (t *WorkflowInterceptorBase) Sleep(ctx Context, d time.Duration) (err error)
- func (t *WorkflowInterceptorBase) UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error
- type WorkflowInterceptorFactory
- type WorkflowReplayer
- func (r *WorkflowReplayer) RegisterActivity(a interface{})
- func (r *WorkflowReplayer) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
- func (r *WorkflowReplayer) RegisterWorkflow(w interface{})
- func (r *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions)
- func (r *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSON(logger *zap.Logger, reader io.Reader, lastEventID int64) error
- func (r *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
- func (r *WorkflowReplayer) ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, ...) error
- func (r *WorkflowReplayer) ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error
- func (r *WorkflowReplayer) ReplayWorkflowHistoryFromJSON(logger *zap.Logger, reader io.Reader) error
- func (r *WorkflowReplayer) ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error
- type WorkflowRun
- type WorkflowShadower
- type WorkflowStatus
- type WorkflowTaskHandler
- type WorkflowTestSuite
- func (s *WorkflowTestSuite) GetLogger() *zap.Logger
- func (s *WorkflowTestSuite) NewTestActivityEnvironment() *TestActivityEnvironment
- func (s *WorkflowTestSuite) NewTestWorkflowEnvironment() *TestWorkflowEnvironment
- func (s *WorkflowTestSuite) SetContextPropagators(ctxProps []ContextPropagator)
- func (s *WorkflowTestSuite) SetHeader(header *shared.Header)
- func (s *WorkflowTestSuite) SetLogger(logger *zap.Logger)
- func (s *WorkflowTestSuite) SetMetricsScope(scope tally.Scope)
- type WorkflowType
Constants ¶
const ( // QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call // stack of the workflow. The result will be a string encoded in the EncodedValue. QueryTypeStackTrace string = "__stack_trace" // QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open // sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue. QueryTypeOpenSessions string = "__open_sessions" )
const CadenceChangeVersion = "CadenceChangeVersion"
CadenceChangeVersion is used as search attributes key to find workflows with specific change version.
const FeatureVersion = "1.7.0"
FeatureVersion is a semver that informs the server of what high-level behaviors this client supports. This is sent in a header on every request.
If you wish to tie new behavior to a client release, rather than a feature flag, increment the major/minor/patch as seems appropriate here.
It can in principle be inferred from the release version in nearly all "normal" scenarios, but release versions are not always available (debug.BuildInfo is not guaranteed) and non-released versions do not have any way to safely infer behavior. So it is a hard-coded string instead.
const LibraryVersion = "1.0.1"
LibraryVersion is a historical way to report the "library release" version, prior to go modules providing a far more consistent way to do so. It is sent in a header on every request.
deprecated: This cannot accurately report pre-release version information, and it is easy for it to drift from the release version (especially if an old commit is tagged, to avoid branching, as this behaves poorly with go modules).
Ideally it would be replaced by runtime/debug.ReadBuildInfo()... but that is not guaranteed to exist, and even if this is a fallback it still needs to be maintained and may be inherently out of date at any time.
Due to all of this unreliability, this should be used as strictly informational metadata, e.g. for caller version monitoring, never behavioral (use FeatureVersion or feature flags instead).
Variables ¶
var ( // WorkflowStatusCompleted is the WorkflowStatus for completed workflow WorkflowStatusCompleted = WorkflowStatus(shared.WorkflowExecutionCloseStatusCompleted.String()) // WorkflowStatusFailed is the WorkflowStatus for failed workflows WorkflowStatusFailed = WorkflowStatus(shared.WorkflowExecutionCloseStatusFailed.String()) // WorkflowStatusCanceled is the WorkflowStatus for canceled workflows WorkflowStatusCanceled = WorkflowStatus(shared.WorkflowExecutionCloseStatusCanceled.String()) // WorkflowStatusTerminated is the WorkflowStatus for terminated workflows WorkflowStatusTerminated = WorkflowStatus(shared.WorkflowExecutionCloseStatusTerminated.String()) // WorkflowStatusContinuedAsNew is the WorkflowStatus for continuedAsNew workflows WorkflowStatusContinuedAsNew = WorkflowStatus(shared.WorkflowExecutionCloseStatusContinuedAsNew.String()) // WorkflowStatusTimedOut is the WorkflowStatus for timedout workflows WorkflowStatusTimedOut = WorkflowStatus(shared.WorkflowExecutionCloseStatusTimedOut.String()) )
var DefaultDataConverter = getDefaultDataConverter()
DefaultDataConverter is default data converter used by Cadence worker
var ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")
ErrActivityResultPending is returned from activity's implementation to indicate the activity is not completed when activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an activity require human interaction (like approve an expense report), the activity could return activity.ErrResultPending which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something that could report the activity completed event to cadence server via Client.CompleteActivity() API.
var ErrCanceled = NewCanceledError()
ErrCanceled is the error returned by Context.Err when the context is canceled.
var ErrDeadlineExceeded = NewTimeoutError(shared.TimeoutTypeScheduleToClose)
ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.
var ErrMockStartChildWorkflowFailed = fmt.Errorf("start child workflow failed: %v", shared.ChildWorkflowExecutionFailedCauseWorkflowAlreadyRunning)
ErrMockStartChildWorkflowFailed is special error used to indicate the mocked child workflow should fail to start. This error is also exposed as public as testsuite.ErrMockStartChildWorkflowFailed
var ErrNoData = errors.New("no data available")
ErrNoData is returned when trying to extract strong typed data while there is no data available.
var ( // ErrSessionFailed is the error returned when user tries to execute an activity but the // session it belongs to has already failed ErrSessionFailed = errors.New("session has failed") )
var ErrTooManyArg = errors.New("too many arguments")
ErrTooManyArg is returned when trying to extract strong typed data with more arguments than available data.
var StopMetrics = make(chan struct{})
Functions ¶
func Await ¶ added in v0.12.0
Await blocks the calling thread until condition() returns true Returns CanceledError if the ctx is canceled.
func CompleteSession ¶ added in v0.8.4
func CompleteSession(ctx Context)
CompleteSession completes a session. It releases worker resources, so other sessions can be created. CompleteSession won't do anything if the context passed in doesn't contain any session information or the session has already completed or failed.
After a session has completed, user can continue to use the context, but the activities will be scheduled on the normal taskList (as user specified in ActivityOptions) and may be picked up by another worker since it's not in a session.
func EnableVerboseLogging ¶
func EnableVerboseLogging(enable bool)
EnableVerboseLogging enable or disable verbose logging. This is for internal use only.
func GetActivityLogger ¶
GetActivityLogger returns a logger that can be used in activity
func GetActivityMetricsScope ¶
GetActivityMetricsScope returns a metrics scope that can be used in activity
func GetHeartbeatDetails ¶ added in v0.7.5
GetHeartbeatDetails extract heartbeat details from last failed attempt. This is used in combination with retry policy. An activity could be scheduled with an optional retry policy on ActivityOptions. If the activity failed then server would attempt to dispatch another activity task to retry according to the retry policy. If there was heartbeat details reported by activity from the failed attempt, the details would be delivered along with the activity task for retry attempt. Activity could extract the details by GetHeartbeatDetails() and resume from the progress.
func GetLastCompletionResult ¶ added in v0.8.0
GetLastCompletionResult extract last completion result from previous run for this cron workflow. This is used in combination with cron schedule. A workflow can be started with an optional cron schedule. If a cron workflow wants to pass some data to next schedule, it can return any data and that data will become available when next run starts. This GetLastCompletionResult() extract the data into expected data structure.
func GetMetricsScope ¶
GetMetricsScope returns a metrics scope to be used in workflow's context
func GetRegisteredWorkflowTypes ¶ added in v1.0.0
func GetRegisteredWorkflowTypes() []string
GetRegisteredWorkflowTypes returns the registered workflow function/alias names. The public form is: workflow.GetRegisteredWorkflowTypes(...)
func GetUnhandledSignalNames ¶ added in v1.0.0
GetUnhandledSignalNames returns signal names that have unconsumed signals.
func GetWorkerStopChannel ¶ added in v0.8.2
GetWorkerStopChannel returns a read-only channel. The closure of this channel indicates the activity worker is stopping. When the worker is stopping, it will close this channel and wait until the worker stop timeout finishes. After the timeout hit, the worker will cancel the activity context and then exit. The timeout can be defined by worker option: WorkerStopTimeout. Use this channel to handle activity graceful exit when the activity worker stops.
func Go ¶
Go creates a new coroutine. It has similar semantic to goroutine in a context of the workflow.
func GoNamed ¶
GoNamed creates a new coroutine with a given human readable name. It has similar semantic to goroutine in a context of the workflow. Name appears in stack traces that are blocked on this Channel.
func HasHeartbeatDetails ¶ added in v0.7.5
HasHeartbeatDetails checks if there is heartbeat details from last attempt.
func HasLastCompletionResult ¶ added in v0.8.0
HasLastCompletionResult checks if there is completion result from previous runs. This is used in combination with cron schedule. A workflow can be started with an optional cron schedule. If a cron workflow wants to pass some data to next schedule, it can return any data and that data will become available when next run starts. This HasLastCompletionResult() checks if there is such data available passing down from previous successful run.
func IsCanceledError ¶ added in v0.10.1
IsCanceledError return whether error in CanceledError
func IsReplayDomain ¶ added in v0.8.6
IsReplayDomain checks if the domainName is from replay
func IsReplaying ¶ added in v0.5.1
IsReplaying returns whether the current workflow code is replaying.
Warning! Never make decisions, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on this flag as it is going to break workflow determinism requirement. The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or metric reporting. Please note that Cadence already provide standard logging/metric via workflow.GetLogger(ctx) and workflow.GetMetricsScope(ctx), and those standard mechanism are replay-aware and it will automatically suppress during replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.
Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a workflow causes decision task to fail and cadence server will rescheduled later to retry.
func NewAdminJwtAuthorizationProvider ¶ added in v0.19.0
func NewAdminJwtAuthorizationProvider(privateKey []byte) auth.AuthorizationProvider
func NewDisconnectedContext ¶ added in v0.5.1
func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc)
NewDisconnectedContext returns a new context that won't propagate parent's cancellation to the new child context. One common use case is to do cleanup work after workflow is cancelled.
err := workflow.ExecuteActivity(ctx, ActivityFoo).Get(ctx, &activityFooResult) if err != nil && cadence.IsCanceledError(ctx.Err()) { // activity failed, and workflow context is canceled disconnectedCtx, _ := workflow.newDisconnectedContext(ctx); workflow.ExecuteActivity(disconnectedCtx, handleCancellationActivity).Get(disconnectedCtx, nil) return err // workflow return CanceledError }
func NewFuture ¶
NewFuture creates a new future as well as associated Settable that is used to set its value.
func NewWorker ¶
func NewWorker( service workflowserviceclient.Interface, domain string, taskList string, options WorkerOptions, ) *aggregatedWorker
NewWorker creates an instance of worker for managing workflow and activity executions. service - thrift connection to the cadence server. domain - the name of the cadence domain. taskList - is the task list name you use to identify your client worker, also
identifies group of workflow and activity implementations that are hosted by a single worker process.
options - configure any worker specific options like logger, metrics, identity.
func Now ¶
Now returns the current time in UTC. It corresponds to the time when the decision task is started or replayed. Workflow needs to use this method to get the wall clock time instead of the one from the golang library.
func RecordActivityHeartbeat ¶
RecordActivityHeartbeat sends heartbeat for the currently executing activity If the activity is either cancelled (or) workflow/activity doesn't exist then we would cancel the context with error context.Canceled.
TODO: we don't have a way to distinguish between the two cases when context is cancelled because context doesn't support overriding value of ctx.Error. TODO: Implement automatic heartbeating with cancellation through ctx.
details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you can check error TimeoutType()/Details().
func RegisterActivity ¶
func RegisterActivity(activityFunc interface{})
RegisterActivity - register an activity function or a pointer to a structure with the framework. The public form is: activity.Register(...) An activity function takes a context and input and returns a (result, error) or just error.
And activity struct is a structure with all its exported methods treated as activities. The default name of each activity is the <structure name>_<method name>. Use RegisterActivityWithOptions to override the "<structure name>_" prefix.
Examples:
func sampleActivity(ctx context.Context, input []byte) (result []byte, err error) func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) func sampleActivity(ctx context.Context) (err error) func sampleActivity() (result string, err error) func sampleActivity(arg1 bool) (result int, err error) func sampleActivity(arg1 bool) (err error) type Activities struct { // fields } func (a *Activities) SampleActivity1(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) { ... } func (a *Activities) SampleActivity2(ctx context.Context, arg1 int, arg2 *customerStruct) (result string, err error) { ... }
Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if activityFunc doesn't comply with the expected format. Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods. This method is kept to maintain backward compatibility and should not be used.
func RegisterActivityWithOptions ¶
func RegisterActivityWithOptions(activityFunc interface{}, opts RegisterActivityOptions)
RegisterActivityWithOptions registers the activity function or struct pointer with options. The public form is: activity.RegisterWithOptions(...) The user can use options to provide an external name for the activity or leave it empty if no external name is required. This can be used as
activity.RegisterWithOptions(barActivity, RegisterActivityOptions{}) activity.RegisterWithOptions(barActivity, RegisterActivityOptions{Name: "barExternal"})
When registering the structure that implements activities the name is used as a prefix that is prepended to the activity method name.
activity.RegisterWithOptions(&Activities{ ... }, RegisterActivityOptions{Name: "MyActivities_"})
To override each name of activities defined through a structure register the methods one by one: activities := &Activities{ ... } activity.RegisterWithOptions(activities.SampleActivity1, RegisterActivityOptions{Name: "Sample1"}) activity.RegisterWithOptions(activities.SampleActivity2, RegisterActivityOptions{Name: "Sample2"}) See RegisterActivity function for more info. The other use of options is to disable duplicated activity registration check which might be useful for integration tests. activity.RegisterWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true}) Deprecated: Global activity registration methods are replaced by equivalent Worker instance methods. This method is kept to maintain backward compatibility and should not be used.
func RegisterWorkflow ¶
func RegisterWorkflow(workflowFunc interface{})
RegisterWorkflow - registers a workflow function with the framework. The public form is: workflow.Register(...) A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:
func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error) func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error) func sampleWorkflow(ctx workflow.Context) (result []byte, err error) func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)
Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format. Deprecated: Global workflow registration methods are replaced by equivalent Worker instance methods. This method is kept to maintain backward compatibility and should not be used.
func RegisterWorkflowWithOptions ¶
func RegisterWorkflowWithOptions(workflowFunc interface{}, opts RegisterWorkflowOptions)
RegisterWorkflowWithOptions registers the workflow function with options. The public form is: workflow.RegisterWithOptions(...) The user can use options to provide an external name for the workflow or leave it empty if no external name is required. This can be used as
workflow.RegisterWithOptions(sampleWorkflow, RegisterWorkflowOptions{}) workflow.RegisterWithOptions(sampleWorkflow, RegisterWorkflowOptions{Name: "foo"})
A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:
func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error) func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error) func sampleWorkflow(ctx workflow.Context) (result []byte, err error) func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)
Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format or tries to register the same workflow type name twice. Use workflow.RegisterOptions.DisableAlreadyRegisteredCheck to allow multiple registrations. Deprecated: Global workflow registration methods are replaced by equivalent Worker instance methods. This method is kept to maintain backward compatibility and should not be used.
func ReplayPartialWorkflowHistoryFromJSONFile ¶ added in v0.9.1
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the given json history file upto provided lastEventID(inclusive). Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger. Deprecated: Global workflow replay methods are replaced by equivalent WorkflowReplayer instance methods. This method is kept to maintain backward compatibility and should not be used.
func ReplayWorkflowExecution ¶ added in v0.7.0
func ReplayWorkflowExecution( ctx context.Context, service workflowserviceclient.Interface, logger *zap.Logger, domain string, execution WorkflowExecution, ) error
ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is the only optional parameter. Defaults to the noop logger. Deprecated: Global workflow replay methods are replaced by equivalent WorkflowReplayer instance methods. This method is kept to maintain backward compatibility and should not be used.
func ReplayWorkflowHistory ¶ added in v0.7.0
ReplayWorkflowHistory executes a single decision task for the given history. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger. Deprecated: Global workflow replay methods are replaced by equivalent WorkflowReplayer instance methods. This method is kept to maintain backward compatibility and should not be used.
func ReplayWorkflowHistoryFromJSONFile ¶ added in v0.7.1
ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file. Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger. Deprecated: Global workflow replay methods are replaced by equivalent WorkflowReplayer instance methods. This method is kept to maintain backward compatibility and should not be used.
func SetBinaryChecksum ¶ added in v0.10.0
func SetBinaryChecksum(checksum string)
SetBinaryChecksum set binary checksum
func SetQueryHandler ¶
SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler should handle. The handler must be a function that returns 2 values. The first return value must be a serializable result. The second return value must be an error. The handler function could receive any number of input parameters. All the input parameter must be serializable. You should call workflow.SetQueryHandler() at the beginning of the workflow code. When client calls Client.QueryWorkflow() to cadence server, a task will be generated on server that will be dispatched to a workflow worker, which will replay the history events and then execute a query handler based on the query type. The query handler will be invoked out of the context of the workflow, meaning that the handler code must not use cadence context to do things like workflow.NewChannel(), workflow.Go() or to call any workflow blocking functions like Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive QueryFailedError. Example of workflow code that support query type "current_state":
func MyWorkflow(ctx workflow.Context, input string) error { currentState := "started" // this could be any serializable struct err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) { return currentState, nil }) if err != nil { currentState = "failed to register query handler" return err } // your normal workflow code begins here, and you update the currentState as the code makes progress. currentState = "waiting timer" err = NewTimer(ctx, time.Hour).Get(ctx, nil) if err != nil { currentState = "timer failed" return err } currentState = "waiting activity" ctx = WithActivityOptions(ctx, myActivityOptions) err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil) if err != nil { currentState = "activity failed" return err } currentState = "done" return nil }
func SetStickyWorkflowCacheSize ¶ added in v0.7.0
func SetStickyWorkflowCacheSize(cacheSize int)
SetStickyWorkflowCacheSize sets the cache size for sticky workflow cache. Sticky workflow execution is the affinity between decision tasks of a specific workflow execution to a specific worker. The affinity is set if sticky execution is enabled via Worker.Options (It is enabled by default unless disabled explicitly). The benefit of sticky execution is that workflow does not have to reconstruct the state by replaying from beginning of history events. But the cost is it consumes more memory as it rely on caching workflow execution's running state on the worker. The cache is shared between workers running within same process. This must be called before any worker is started. If not called, the default size of 10K (might change in future) will be used.
func Sleep ¶
Sleep pauses the current workflow for at least the duration d. A negative or zero duration causes Sleep to return immediately. Workflow code needs to use this Sleep() to sleep instead of the Go lang library one(timer.Sleep()). You can cancel the pending sleep by cancel the Context (using context from workflow.WithCancel(ctx)). Sleep() returns nil if the duration d is passed, or it returns *CanceledError if the ctx is canceled. There are 2 reasons the ctx could be canceled: 1) your workflow code cancel the ctx (with workflow.WithCancel(ctx)); 2) your workflow itself is canceled by external request. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func StartVersionMetrics ¶ added in v1.0.0
StartVersionMetrics starts emitting version metrics
func UpsertSearchAttributes ¶ added in v0.9.0
UpsertSearchAttributes is used to add or update workflow search attributes. The search attributes can be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on cadence server side; The value has to deterministic when replay; The value has to be Json serializable. UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code:
func MyWorkflow(ctx workflow.Context, input string) error { attr1 := map[string]interface{}{ "CustomIntField": 1, "CustomBoolField": true, } workflow.UpsertSearchAttributes(ctx, attr1) attr2 := map[string]interface{}{ "CustomIntField": 2, "CustomKeywordField": "seattle", } workflow.UpsertSearchAttributes(ctx, attr2) }
will eventually have search attributes:
map[string]interface{}{ "CustomIntField": 2, "CustomBoolField": true, "CustomKeywordField": "seattle", }
This is only supported when using ElasticSearch.
func WithActivityTask ¶
func WithActivityTask( ctx context.Context, task *shared.PollForActivityTaskResponse, taskList string, invoker ServiceInvoker, logger *zap.Logger, scope tally.Scope, dataConverter DataConverter, workerStopChannel <-chan struct{}, contextPropagators []ContextPropagator, tracer opentracing.Tracer, ) context.Context
WithActivityTask adds activity specific information into context. Use this method to unit test activity implementations that use context extractor methodshared.
func WithCancel ¶
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.
Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.
Types ¶
type ActivityInfo ¶
type ActivityInfo struct { TaskToken []byte WorkflowType *WorkflowType WorkflowDomain string WorkflowExecution WorkflowExecution ActivityID string ActivityType ActivityType TaskList string HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed. ScheduledTimestamp time.Time // Time of activity scheduled by a workflow StartedTimestamp time.Time // Time of activity start Deadline time.Time // Time of activity timeout Attempt int32 // Attempt starts from 0, and increased by 1 for every retry if retry policy is specified. }
ActivityInfo contains information about currently executing activity.
func GetActivityInfo ¶
func GetActivityInfo(ctx context.Context) ActivityInfo
GetActivityInfo returns information about currently executing activity.
type ActivityOptions ¶
type ActivityOptions struct { // TaskList that the activity needs to be scheduled on. // optional: The default task list with the same name as the workflow task list. TaskList string // ScheduleToCloseTimeout - The end to end timeout for the activity needed. // The zero value of this uses default value. // Optional: The default value is the sum of ScheduleToStartTimeout and StartToCloseTimeout ScheduleToCloseTimeout time.Duration // ScheduleToStartTimeout - The queue timeout before the activity starts executed. // Mandatory: No default. ScheduleToStartTimeout time.Duration // StartToCloseTimeout - The timeout from the start of execution to end of it. // Mandatory: No default. StartToCloseTimeout time.Duration // HeartbeatTimeout - The periodic timeout while the activity is in execution. This is // the max interval the server needs to hear at-least one ping from the activity. // Optional: Default zero, means no heart beating is needed. HeartbeatTimeout time.Duration // WaitForCancellation - Whether to wait for cancelled activity to be completed( // activity can be failed, completed, cancel accepted) // Optional: default false WaitForCancellation bool // ActivityID - Business level activity ID, this is not needed for most of the cases if you have // to specify this then talk to cadence team. This is something will be done in future. // Optional: default empty string ActivityID string // RetryPolicy specify how to retry activity if error happens. When RetryPolicy.ExpirationInterval is specified // and it is larger than the activity's ScheduleToStartTimeout, then the ExpirationInterval will override activity's // ScheduleToStartTimeout. This is to avoid retrying on ScheduleToStartTimeout error which only happen when worker // is not picking up the task within the timeout. Retrying ScheduleToStartTimeout does not make sense as it just // mark the task as failed and create a new task and put back in the queue waiting worker to pick again. Cadence // server also make sure the ScheduleToStartTimeout will not be larger than the workflow's timeout. // Same apply to ScheduleToCloseTimeout. See more details about RetryPolicy on the doc for RetryPolicy. // Optional: default is no retry RetryPolicy *RetryPolicy }
ActivityOptions stores all activity-specific parameters that will be stored inside of a context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
type ActivityTaskHandler ¶
type ActivityTaskHandler interface { // Executes the activity task // The response is one of the types: // - RespondActivityTaskCompletedRequest // - RespondActivityTaskFailedRequest // - RespondActivityTaskCanceledRequest Execute(taskList string, task *s.PollForActivityTaskResponse) (interface{}, error) }
ActivityTaskHandler represents activity task handlers.
type ActivityType ¶
type ActivityType struct {
Name string
}
ActivityType identifies a activity type.
type Bugports ¶ added in v0.18.4
type Bugports struct { // StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4. // // Prior to the fix, child workflows would be started and keep running when their context was canceled in two // situations: // 1) when the context was canceled before ExecuteChildWorkflow is called, and // 2) when the context was canceled after ExecuteChildWorkflow but before the child workflow was started. // // 1 is unfortunately easy to trigger, though many workflows will encounter an error earlier and not reach the // child-workflow-executing code. 2 is expected to be very rare in practice. // // To permanently emulate old behavior, use a disconnected context when starting child workflows, and // cancel it only after `childfuture.GetWorkflowExecution().Get(...)` returns. This can be used when this flag // is removed in the future. // // If you have currently-broken workflows and need to repair them, there are two primary options: // // 1: Check the BinaryChecksum value of your new deploy and/or of the decision that is currently failing // workflows. Then set this flag when replaying history on those not-fixed checksums. Concretely, this means // checking both `workflow.GetInfo(ctx).BinaryChecksum` (note that sufficiently old clients may not have // recorded a value, and it may be nil) and `workflow.IsReplaying(ctx)`. // // 2: Reset broken workflows back to either before the buggy behavior was recorded, or before the fixed behavior // was deployed. A "bad binary" reset type can do the latter in bulk, see the CLI's // `cadence workflow reset-batch --reset_type BadBinary --help` for details. For the former, check the failing // histories, identify the point at which the bug occurred, and reset to prior to that decision task. // // Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP. // // deprecated StartChildWorkflowsOnCanceledContext bool }
Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily emulating old behavior until a fix is deployed. By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior. Back-ported buggy behavior *may* be available via these flags.
Fields in here are NOT guaranteed to be stable. They will almost certainly be removed in the next major release, and might be removed earlier if a need arises, e.g. if the historical behavior causes too much of an increase in code complexity.
See each individual field for details.
Bugports are always deprecated and may be removed in future versions. Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to allow cleaning up the additional code complexity that they cause.
deprecated
type CancelFunc ¶
type CancelFunc func()
A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.
type CancelReason ¶ added in v1.0.0
type CancelReason string
type CanceledError ¶
type CanceledError struct {
// contains filtered or unexported fields
}
CanceledError returned when operation was canceled.
func NewCanceledError ¶
func NewCanceledError(details ...interface{}) *CanceledError
NewCanceledError creates CanceledError instance
func (*CanceledError) Details ¶
func (e *CanceledError) Details(d ...interface{}) error
Details extracts strong typed detail data of this error.
func (*CanceledError) HasDetails ¶ added in v0.5.1
func (e *CanceledError) HasDetails() bool
HasDetails return if this error has strong typed detail data.
type Channel ¶
type Channel interface { // Receive blocks until it receives a value, and then assigns the received value to the provided pointer. // It returns false when the Channel is closed and all data has already been consumed from the Channel, in the // same way as Go channel reads work, but the assignment only occurs if there was a value in the Channel. // // This is technically equivalent to: // received, ok := <- aChannel: // if ok { // *valuePtr = received // } // // But if your output values are zero values, this is equivalent to a normal channel read: // value, ok <- aChannel // // valuePtr must be assignable, and will be used to assign (for in-memory data in regular channels) or decode // (for signal channels) the data in the channel. // // If decoding or assigning fails: // - an error will be logged // - the value will be dropped from the channel // - Receive will automatically try again // - This will continue until a successful value is found, or the channel is emptied and it resumes blocking. // Closed channels with no values will always succeed, but they will not change valuePtr. // // Go would normally prevent incorrect-type failures like this at compile time, but the same cannot be done // here. If you need to "try" to assign to multiple things, similar to a Future you can use: // - for signal channels, a []byte pointer. This will give you the raw data that Cadence received, and no // decoding will be attempted, so you can try it yourself. // - for other channels, an interface{} pointer. All values are interfaces, so this will never fail, and you // can inspect the type with reflection or type assertions. Receive(ctx Context, valuePtr interface{}) (ok bool) // ReceiveAsync tries to Receive from Channel without blocking. // If there is data available from the Channel, it assigns the data to valuePtr and returns true. // Otherwise, it returns false immediately. // // This is technically equivalent to: // select { // case received, ok := <- aChannel: // if ok { // *valuePtr = received // } // default: // // no value was read // ok = false // } // // But if your output values are zero values, this is equivalent to a simpler form: // select { // case value, ok := <- aChannel: // default: // // no value was read // ok = false // } // // Decoding or assigning failures are handled like Receive. ReceiveAsync(valuePtr interface{}) (ok bool) // ReceiveAsyncWithMoreFlag is the same as ReceiveAsync, with an extra return to indicate if there could be // more values from the Channel in the future. // `more` is false only when Channel is closed and the read failed (empty). // // This is technically equivalent to: // select { // case received, ok := <- aChannel: // if ok { // *valuePtr = received // } // more = ok // default: // // no value was read // ok = false // // but the read would have blocked, so the channel is not closed // more = true // } // // But if your output values are zero values, this is equivalent to a simpler form: // select { // case value, ok := <- aChannel: // more = ok // default: // // no value was read // ok = false // // but the read would have blocked, so the channel is not closed // more = true // } // // Decoding or assigning failures are handled like Receive. ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool) // Send blocks until the data is sent. // // This is equivalent to `aChannel <- v`. Send(ctx Context, v interface{}) // SendAsync will try to send without blocking. // It returns true if the data was sent (i.e. there was room in the buffer, or a reader was waiting to receive // it), otherwise it returns false. // // This is equivalent to: // select { // case aChannel <- v: ok = true // default: ok = false // } SendAsync(v interface{}) (ok bool) // Close closes the Channel, and prohibits subsequent sends. // As with a normal Go channel that has been closed, sending to a closed channel will panic. Close() }
Channel must be used in workflows instead of a native Go chan.
Use workflow.NewChannel(ctx) to create an unbuffered Channel instance, workflow.NewBufferedChannel(ctx, size) to create a Channel which has a buffer, or workflow.GetSignalChannel(ctx, "name") to get a Channel that contains data sent to this workflow by a call to SignalWorkflow (e.g. on the Client, or similar methods like SignalExternalWorkflow or SignalChildWorkflow).
Both NewChannel and NewBufferedChannel have "Named" constructors as well. These names will be visible in stack-trace queries, so they can help with debugging, but they do not otherwise impact behavior at all, and are not recorded anywhere (so you can change them without versioning your code).
Also note that channels created by NewChannel and NewBufferedChannel do not do any serialization or deserialization - you will receive whatever value was sent, and non-(de)serializable values like function references and interfaces are fine, the same as using a normal Go channel.
Signal channels, however, contain whatever bytes were sent to your workflow, and the values must be decoded into the output value. By default, this means that Receive(ctx, &out) will use json.Unmarshal(data, &out), but this can be overridden at a worker level (worker.Options) or at a context level (workflow.WithDataConverter(ctx, dc)).
You are able to send values to your own signal channels, and these values will behave the same as they do in normal channels (i.e. they will not be (de)serialized). However, doing so is not generally recommended, as mixing the value types can increase the risk that you fail to read a value, causing values to be lost. See Receive for more details about that behavior.
func GetSignalChannel ¶
GetSignalChannel returns channel corresponding to the signal name.
func NewBufferedChannel ¶
NewBufferedChannel create new buffered Channel instance
func NewNamedBufferedChannel ¶
NewNamedBufferedChannel create new BufferedChannel instance with a given human readable name. Name appears in stack traces that are blocked on this Channel.
func NewNamedChannel ¶
NewNamedChannel create new Channel instance with a given human readable name. Name appears in stack traces that are blocked on this channel.
type ChildWorkflowFuture ¶
type ChildWorkflowFuture interface { Future // GetChildWorkflowExecution returns a future that will be ready when child workflow execution started. You can // get the WorkflowExecution of the child workflow from the future. Then you can use Workflow ID and RunID of // child workflow to cancel or send signal to child workflow. // childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, child, ...) // var childWE WorkflowExecution // if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWE); err == nil { // // child workflow started, you can use childWE to get the WorkflowID and RunID of child workflow // } GetChildWorkflowExecution() Future // SignalWorkflowByID sends a signal to the child workflow. This call will block until child workflow is started. SignalChildWorkflow(ctx Context, signalName string, data interface{}) Future }
ChildWorkflowFuture represents the result of a child workflow execution
func ExecuteChildWorkflow ¶
func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interface{}) ChildWorkflowFuture
ExecuteChildWorkflow requests child workflow execution in the context of a workflow. Context can be used to pass the settings for the child workflow. For example: task list that this child workflow should be routed, timeouts that need to be configured. Use ChildWorkflowOptions to pass down the options.
cwo := ChildWorkflowOptions{ ExecutionStartToCloseTimeout: 10 * time.Minute, TaskStartToCloseTimeout: time.Minute, } ctx := WithChildWorkflowOptions(ctx, cwo)
Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. Input args are the arguments that need to be passed to the child workflow function represented by childWorkflow. If the child workflow failed to complete then the future get error would indicate the failure and it can be one of CustomError, TimeoutError, CanceledError, GenericError. You can cancel the pending child workflow using context(workflow.WithCancel(ctx)) and that will fail the workflow with error CanceledError. ExecuteChildWorkflow returns ChildWorkflowFuture.
type ChildWorkflowOptions ¶
type ChildWorkflowOptions struct { // Domain of the child workflow. // Optional: the current workflow (parent)'s domain will be used if this is not provided. Domain string // WorkflowID of the child workflow to be scheduled. // Optional: an auto generated workflowID will be used if this is not provided. WorkflowID string // TaskList that the child workflow needs to be scheduled on. // Optional: the parent workflow task list will be used if this is not provided. TaskList string // ExecutionStartToCloseTimeout - The end to end timeout for the child workflow execution. // Mandatory: no default ExecutionStartToCloseTimeout time.Duration // TaskStartToCloseTimeout - The decision task timeout for the child workflow. // Optional: default is 10s if this is not provided (or if 0 is provided). TaskStartToCloseTimeout time.Duration // WaitForCancellation - Whether to wait for cancelled child workflow to be ended (child workflow can be ended // as: completed/failed/timedout/terminated/canceled) // Optional: default false WaitForCancellation bool // WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful // for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate WorkflowIDReusePolicy WorkflowIDReusePolicy // RetryPolicy specify how to retry child workflow if error happens. // Optional: default is no retry RetryPolicy *RetryPolicy // CronSchedule - Optional cron schedule for workflow. If a cron schedule is specified, the workflow will run // as a cron based on the schedule. The scheduling will be based on UTC time. Schedule for next run only happen // after the current run is completed/failed/timeout. If a RetryPolicy is also supplied, and the workflow failed // or timeout, the workflow will be retried based on the retry policy. While the workflow is retrying, it won't // schedule its next run. If next schedule is due while workflow is running (or retrying), then it will skip that // schedule. Cron workflow will not stop until it is terminated or cancelled (by returning cadence.CanceledError). // The cron spec is as following: // ┌───────────── minute (0 - 59) // │ ┌───────────── hour (0 - 23) // │ │ ┌───────────── day of the month (1 - 31) // │ │ │ ┌───────────── month (1 - 12) // │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) // │ │ │ │ │ // │ │ │ │ │ // * * * * * CronSchedule string // Memo - Optional non-indexed info that will be shown in list workflow. Memo map[string]interface{} // SearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs (only // supported when Cadence server is using ElasticSearch). The key and value type must be registered on Cadence server side. // Use GetSearchAttributes API to get valid key and corresponding value type. SearchAttributes map[string]interface{} // ParentClosePolicy - Optional policy to decide what to do for the child. // Default is Terminate (if onboarded to this feature) ParentClosePolicy ParentClosePolicy // Bugports allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily // emulating old behavior until a fix is deployed. // // Bugports are always deprecated and may be removed in future versions. // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // // deprecated Bugports Bugports }
ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
type Client ¶
type Client interface { // StartWorkflow starts a workflow execution // The user can use this to start using a function or workflow type name. // Either by // StartWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3) // or // StartWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3) // The errors it can return: // - EntityNotExistsError, if domain does not exists // - BadRequestError // - WorkflowExecutionAlreadyStartedError // - InternalServiceError // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error) // ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error // The user can use this to start using a function or workflow type name. // Either by // ExecuteWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3) // or // ExecuteWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3) // The errors it can return: // - EntityNotExistsError, if domain does not exists // - BadRequestError // - InternalServiceError // // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. // // WorkflowRun has three methods: // - GetID() string: which return workflow ID (which is same as StartWorkflowOptions.ID if provided) // - GetRunID() string: which return the first started workflow run ID (please see below) // - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow // execution result to valuePtr, if workflow execution is a success, or return corresponding // error. This is a blocking API. // NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the // return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError, // however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError. // Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError, // the second run has run ID "run ID 2" and return some result other than ContinueAsNewError: // GetRunID() will always return "run ID 1" and Get(ctx context.Context, valuePtr interface{}) will return the result of second run. // NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error) // GetWorkfow retrieves a workflow execution and return a WorkflowRun instance // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID. // // WorkflowRun has three methods: // - GetID() string: which return workflow ID (which is same as StartWorkflowOptions.ID if provided) // - GetRunID() string: which return the first started workflow run ID (please see below) // - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow // execution result to valuePtr, if workflow execution is a success, or return corresponding // error. This is a blocking API. // NOTE: if the retrieved workflow returned ContinueAsNewError during the workflow execution, the // return result of GetRunID() will be the retrieved workflow run ID, not the new run ID caused by ContinueAsNewError, // however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError. GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun // SignalWorkflow sends a signals to a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. // - signalName name to identify the signal. // The errors it can return: // - EntityNotExistsError // - InternalServiceError // - WorkflowExecutionAlreadyCompletedError SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error // SignalWithStartWorkflow sends a signal to a running workflow. // If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction. // - workflowID, signalName, signalArg are same as SignalWorkflow's parameters // - options, workflow, workflowArgs are same as StartWorkflow's parameters // Note: options.WorkflowIDReusePolicy is default to WorkflowIDReusePolicyAllowDuplicate in this API; // while in StartWorkflow/ExecuteWorkflow APIs it is default to WorkflowIdReusePolicyAllowDuplicateFailedOnly. // The errors it can return: // - EntityNotExistsError, if domain does not exist // - BadRequestError // - InternalServiceError SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error) // CancelWorkflow cancels a workflow in execution // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. // The errors it can return: // - EntityNotExistsError // - BadRequestError // - InternalServiceError // - WorkflowExecutionAlreadyCompletedError CancelWorkflow(ctx context.Context, workflowID string, runID string, opts ...Option) error // TerminateWorkflow terminates a workflow execution. // workflowID is required, other parameters are optional. // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. // The errors it can return: // - EntityNotExistsError // - BadRequestError // - InternalServiceError // - WorkflowExecutionAlreadyCompletedError TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error // GetWorkflowHistory gets history events of a particular workflow // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID. // - whether use long poll for tracking new events: when the workflow is running, there can be new events generated during iteration // of HistoryEventIterator, if isLongPoll == true, then iterator will do long poll, tracking new history event, i.e. the iteration // will not be finished until workflow is finished; if isLongPoll == false, then iterator will only return current history events. // - whether return all history events or just the last event, which contains the workflow execution end result // Example:- // To iterate all events, // iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType) // events := []*shared.HistoryEvent{} // for iter.HasNext() { // event, err := iter.Next() // if err != nil { // return err // } // events = append(events, event) // } GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator // CompleteActivity reports activity completed. // activity Execute method can return acitivity.activity.ErrResultPending to // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method // should be called when that activity is completed with the actual result and error. If err is nil, activity task // completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise, // activity task failed event will be reported. // An activity implementation should use GetActivityInfo(ctx).TaskToken function to get task token to use for completion. // Example:- // To complete with a result. // CompleteActivity(token, "Done", nil) // To fail the activity with an error. // CompleteActivity(token, nil, cadence.NewCustomError("reason", details) // The activity can fail with below errors ErrorWithDetails, TimeoutError, CanceledError. CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error // CompleteActivityById reports activity completed. // Similar to CompleteActivity, but may save user from keeping taskToken info. // activity Execute method can return activity.ErrResultPending to // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivityById() method // should be called when that activity is completed with the actual result and error. If err is nil, activity task // completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise, // activity task failed event will be reported. // An activity implementation should use activityID provided in ActivityOption to use for completion. // domain name, workflowID, activityID are required, runID is optional. // The errors it can return: // - ErrorWithDetails // - TimeoutError // - CanceledError CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string, result interface{}, err error) error // RecordActivityHeartbeat records heartbeat for an activity. // details - is the progress you want to record along with heart beat for this activity. // The errors it can return: // - EntityNotExistsError // - InternalServiceError RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error // RecordActivityHeartbeatByID records heartbeat for an activity. // details - is the progress you want to record along with heart beat for this activity. // The errors it can return: // - EntityNotExistsError // - InternalServiceError RecordActivityHeartbeatByID(ctx context.Context, domain, workflowID, runID, activityID string, details ...interface{}) error // ListClosedWorkflow gets closed workflow executions based on request filters // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) // ListClosedWorkflow gets open workflow executions based on request filters // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) // ListWorkflow gets workflow executions based on query. This API only works with ElasticSearch, // and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause, // examples: // - "(WorkflowID = 'wid1' or (WorkflowType = 'type2' and WorkflowID = 'wid2'))". // - "CloseTime between '2019-08-27T15:04:05+00:00' and '2019-08-28T15:04:05+00:00'". // - to list only open workflow use "CloseTime = missing" // Retrieved workflow executions are sorted by StartTime in descending order when list open workflow, // and sorted by CloseTime in descending order for other queries. // The errors it can return: // - BadRequestError // - InternalServiceError ListWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) // ListArchivedWorkflow gets archived workflow executions based on query. This API will return BadRequest if Cadence // cluster or target domain is not configured for visibility archival or read is not enabled. The query is basically the SQL WHERE clause. // However, different visibility archivers have different limitations on the query. Please check the documentation of the visibility archiver used // by your domain to see what kind of queries are accept and whether retrieved workflow executions are ordered or not. // The errors it can return: // - BadRequestError // - InternalServiceError ListArchivedWorkflow(ctx context.Context, request *s.ListArchivedWorkflowExecutionsRequest) (*s.ListArchivedWorkflowExecutionsResponse, error) // ScanWorkflow gets workflow executions based on query. This API only works with ElasticSearch, // and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause // (see ListWorkflow for query examples). // ScanWorkflow should be used when retrieving large amount of workflows and order is not needed. // It will use more ElasticSearch resources than ListWorkflow, but will be several times faster // when retrieving millions of workflows. // The errors it can return: // - BadRequestError // - InternalServiceError ScanWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) // CountWorkflow gets number of workflow executions based on query. This API only works with ElasticSearch, // and will return BadRequestError when using Cassandra or MySQL. The query is basically the SQL WHERE clause // (see ListWorkflow for query examples). // The errors it can return: // - BadRequestError // - InternalServiceError CountWorkflow(ctx context.Context, request *s.CountWorkflowExecutionsRequest) (*s.CountWorkflowExecutionsResponse, error) // GetSearchAttributes returns valid search attributes keys and value types. // The search attributes can be used in query of List/Scan/Count APIs. Adding new search attributes requires cadence server // to update dynamic config ValidSearchAttributes. GetSearchAttributes(ctx context.Context) (*s.GetSearchAttributesResponse, error) // QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID // and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the // target workflow execution that this query will be send to. If runID is not specified (empty string), server will // use the currently running execution of that workflowID. The queryType specifies the type of query you want to // run. By default, cadence supports "__stack_trace" as a standard query type, which will return string value // representing the call stack of the target workflow. The target workflow could also setup different query handler // to handle custom query types. // See comments at workflow.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details // on how to setup query handler within the target workflow. // - workflowID is required. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. // - queryType is the type of the query. // - args... are the optional query parameters. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError // - QueryFailError QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (Value, error) // QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously. // See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResponse for more information. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError // - QueryFailError QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) // ResetWorkflow reset a given workflow execution and returns a new execution // See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResponse for more information. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError ResetWorkflow(ctx context.Context, request *s.ResetWorkflowExecutionRequest) (*s.ResetWorkflowExecutionResponse, error) // DescribeWorkflowExecution returns information about the specified workflow execution. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) // DescribeTaskList returns information about the target tasklist, right now this API returns the // pollers which polled this tasklist in last few minutes. // The errors it can return: // - BadRequestError // - InternalServiceError // - EntityNotExistError DescribeTaskList(ctx context.Context, tasklist string, tasklistType s.TaskListType) (*s.DescribeTaskListResponse, error) // RefreshWorkflowTasks refreshes all the tasks of a given workflow. // - workflow ID of the workflow. // - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. // The errors it can return: // - BadRequestError // - DomainNotActiveError // - ServiceBusyError // - EntityNotExistError RefreshWorkflowTasks(ctx context.Context, workflowID, runID string) error }
Client is the client for starting and getting information about a workflow executions as well as completing activities asynchronously.
func NewClient ¶
func NewClient(service workflowserviceclient.Interface, domain string, options *ClientOptions) Client
NewClient creates an instance of a workflow client
type ClientOptions ¶
type ClientOptions struct { MetricsScope tally.Scope Identity string IsolationGroup string DataConverter DataConverter Tracer opentracing.Tracer ContextPropagators []ContextPropagator FeatureFlags FeatureFlags Authorization auth.AuthorizationProvider }
ClientOptions are optional parameters for Client creation.
type Context ¶
type Context interface { // Deadline returns the time when work done on behalf of this context // should be canceled. Deadline returns ok==false when no deadline is // set. Successive calls to Deadline return the same results. Deadline() (deadline time.Time, ok bool) // Done returns a channel that's closed when work done on behalf of this // context should be canceled. Done may return nil if this context can // never be canceled. Successive calls to Done return the same value. // // WithCancel arranges for Done to be closed when cancel is called; // WithDeadline arranges for Done to be closed when the deadline // expires; WithTimeout arranges for Done to be closed when the timeout // elapses. // // Done is provided for use in select statements: // // // Stream generates values with DoSomething and sends them to out // // until DoSomething returns an error or ctx.Done is closed. // func Stream(ctx Context, out Channel) (err error) { // for { // v, err := DoSomething(ctx) // if err != nil { // return err // } // s := NewSelector(ctx) // s.AddReceive(ctx.Done(), func(v interface{}) { err = ctx.Err() }) // s.AddReceive(v, func(v interface{}, more bool) { out.Send(ctx, v) }) // s.Select(ctx) // if err != nil { // return err // } // } // } // // See http://blog.golang.org/pipelines for more examples of how to use // a Done channel for cancellation. Done() Channel // Err returns a non-nil error value after Done is closed. Err returns // Canceled if the context was canceled or DeadlineExceeded if the // context's deadline passed. No other values for Err are defined. // After Done is closed, successive calls to Err return the same value. Err() error // Value returns the value associated with this context for key, or nil // if no value is associated with key. Successive calls to Value with // the same key returns the same result. // // Use context values only for request-scoped data that transits // processes and API boundaries, not for passing optional parameters to // functions. // // A key identifies a specific value in a Context. Functions that wish // to store values in Context typically allocate a key in a global // variable then use that key as the argument to context.WithValue and // Context.Value. A key can be any type that supports equality; // packages should define keys as an unexported type to avoid // collisions. // // Packages that define a Context key should provide type-safe accessors // for the values stores using that key: // // // Package user defines a User type that's stored in Contexts. // package user // // import "golang.org/x/net/context" // // // User is the type of value stored in the Contexts. // type User struct {...} // // // key is an unexported type for keys defined in this package. // // This prevents collisions with keys defined in other packages. // type key int // // // userKey is the key for user.User values in Contexts. It is // // unexported; clients use user.NewContext and user.FromContext // // instead of using this key directly. // var userKey key = 0 // // // NewContext returns a new Context that carries value u. // func NewContext(ctx context.Context, u *User) context.Context { // return context.WithValue(ctx, userKey, u) // } // // // FromContext returns the User value stored in ctx, if any. // func FromContext(ctx context.Context) (*User, bool) { // u, ok := ctx.Value(userKey).(*User) // return u, ok // } Value(key interface{}) interface{} }
Context is a clone of context.Context with Done() returning Channel instead of native channel. A Context carries a deadline, a cancellation signal, and other values across API boundaries.
Context's methods may be called by multiple goroutines simultaneously.
func Background ¶ added in v0.8.4
func Background() Context
Background returns a non-nil, empty Context. It is never canceled, has no values, and has no deadline
func CreateSession ¶ added in v0.8.4
func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error)
CreateSession creates a session and returns a new context which contains information of the created session. The session will be created on the tasklist user specified in ActivityOptions. If none is specified, the default one will be used.
CreationSession will fail in the following situations:
- The context passed in already contains a session which is still open (not closed and failed).
- All the workers are busy (number of sessions currently running on all the workers have reached MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session cannot be created within a specified timeout.
If an activity is executed using the returned context, it's regarded as part of the session. All activities within the same session will be executed by the same worker. User still needs to handle the error returned when executing an activity. Session will not be marked as failed if an activity within it returns an error. Only when the worker executing the session is down, that session will be marked as failed. Executing an activity within a failed session will return ErrSessionFailed immediately without scheduling that activity.
The returned session Context will be cancelled if the session fails (worker died) or CompleteSession() is called. This means that in these two cases, all user activities scheduled using the returned session Context will also be cancelled.
If user wants to end a session since activity returns some error, use CompleteSession API below. New session can be created if necessary to retry the whole session.
Example:
so := &SessionOptions{ ExecutionTimeout: time.Minute, CreationTimeout: time.Minute, } sessionCtx, err := CreateSession(ctx, so) if err != nil { // Creation failed. Wrong ctx or too many outstanding sessions. } defer CompleteSession(sessionCtx) err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil) if err == ErrSessionFailed { // Session has failed } else { // Handle activity error } ... // execute more activities using sessionCtx
func RecreateSession ¶ added in v0.8.4
func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionOptions) (Context, error)
RecreateSession recreate a session based on the sessionInfo passed in. Activities executed within the recreated session will be executed by the same worker as the previous session. RecreateSession() returns an error under the same situation as CreateSession() or the token passed in is invalid. It also has the same usage as CreateSession().
The main usage of RecreateSession is for long sessions that are splited into multiple runs. At the end of one run, complete the current session, get recreateToken from sessionInfo by calling SessionInfo.GetRecreateToken() and pass the token to the next run. In the new run, session can be recreated using that token.
func WithActivityOptions ¶
func WithActivityOptions(ctx Context, options ActivityOptions) Context
WithActivityOptions adds all options to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithChildWorkflowOptions ¶
func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context
WithChildWorkflowOptions adds all workflow options to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithDataConverter ¶ added in v0.7.0
func WithDataConverter(ctx Context, dc DataConverter) Context
WithDataConverter adds DataConverter to the context.
func WithExecutionStartToCloseTimeout ¶
WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithHeartbeatTimeout ¶
WithHeartbeatTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithLocalActivityOptions ¶ added in v0.5.1
func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context
WithLocalActivityOptions adds local activity options to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithRetryPolicy ¶ added in v0.7.5
func WithRetryPolicy(ctx Context, retryPolicy RetryPolicy) Context
WithRetryPolicy adds retry policy to the copy of the context
func WithScheduleToCloseTimeout ¶
WithScheduleToCloseTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithScheduleToStartTimeout ¶
WithScheduleToStartTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithStartToCloseTimeout ¶
WithStartToCloseTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func WithTaskList ¶
WithTaskList adds a task list to the copy of the context.
func WithValue ¶
WithValue returns a copy of parent in which the value associated with key is val.
Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.
func WithWaitForCancellation ¶
WithWaitForCancellation adds wait for the cacellation to the copy of the context.
func WithWorkflowDomain ¶
WithWorkflowDomain adds a domain to the context.
func WithWorkflowID ¶
WithWorkflowID adds a workflowID to the context.
func WithWorkflowTaskList ¶
WithWorkflowTaskList adds a task list to the context.
func WithWorkflowTaskStartToCloseTimeout ¶
WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
type ContextPropagator ¶ added in v0.8.4
type ContextPropagator interface { // Inject injects information from a Go Context into headers Inject(context.Context, HeaderWriter) error // Extract extracts context information from headers and returns a context // object Extract(context.Context, HeaderReader) (context.Context, error) // InjectFromWorkflow injects information from workflow context into headers InjectFromWorkflow(Context, HeaderWriter) error // ExtractToWorkflow extracts context information from headers and returns // a workflow context ExtractToWorkflow(Context, HeaderReader) (Context, error) }
ContextPropagator is an interface that determines what information from context to pass along
func NewTracingContextPropagator ¶ added in v0.8.4
func NewTracingContextPropagator(logger *zap.Logger, tracer opentracing.Tracer) ContextPropagator
NewTracingContextPropagator returns new tracing context propagator object
type ContinueAsNewError ¶
type ContinueAsNewError struct {
// contains filtered or unexported fields
}
ContinueAsNewError contains information about how to continue the workflow as new.
func NewContinueAsNewError ¶
func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError
NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.
ctx - use context to override any options for the new workflow like execution timeout, decision task timeout, task list. if not mentioned it would use the defaults that the current workflow is using. ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute) ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute) ctx := WithWorkflowTaskList(ctx, "example-group") wfn - workflow function. for new execution it can be different from the currently running. args - arguments for the new workflow.
func (*ContinueAsNewError) Args ¶ added in v0.8.4
func (e *ContinueAsNewError) Args() []interface{}
Args return workflow argument of the new run
func (*ContinueAsNewError) Error ¶
func (e *ContinueAsNewError) Error() string
Error from error interface
func (*ContinueAsNewError) Header ¶ added in v1.0.0
func (e *ContinueAsNewError) Header() *shared.Header
Header return the header to start a workflow
func (*ContinueAsNewError) Input ¶ added in v1.0.0
func (e *ContinueAsNewError) Input() []byte
Input return serialized workflow argument
func (*ContinueAsNewError) WorkflowIDReusePolicy ¶ added in v1.0.0
func (e *ContinueAsNewError) WorkflowIDReusePolicy() WorkflowIDReusePolicy
WorkflowIDReusePolicy return workflow id reuse policy in the new run
func (*ContinueAsNewError) WorkflowType ¶ added in v0.8.4
func (e *ContinueAsNewError) WorkflowType() *WorkflowType
WorkflowType return workflowType of the new run
type CustomError ¶
type CustomError struct {
// contains filtered or unexported fields
}
CustomError returned from workflow and activity implementations with reason and optional details.
func NewCustomError ¶
func NewCustomError(reason string, details ...interface{}) *CustomError
NewCustomError create new instance of *CustomError with reason and optional details.
func (*CustomError) Details ¶
func (e *CustomError) Details(d ...interface{}) error
Details extracts strong typed detail data of this custom error. If there is no details, it will return ErrNoData.
func (*CustomError) HasDetails ¶ added in v0.5.1
func (e *CustomError) HasDetails() bool
HasDetails return if this error has strong typed detail data.
func (*CustomError) Reason ¶
func (e *CustomError) Reason() string
Reason gets the reason of this custom error
type DataConverter ¶ added in v0.9.0
type DataConverter interface { // ToData implements conversion of a list of values. ToData(value ...interface{}) ([]byte, error) // FromData implements conversion of an array of values of different types. // Useful for deserializing arguments of function invocations. FromData(input []byte, valuePtr ...interface{}) error }
DataConverter is used by the framework to serialize/deserialize input and output of activity/workflow that need to be sent over the wire. To encode/decode workflow arguments, one should set DataConverter in two places:
- Workflow worker, through worker.Options
- Client, through client.Options
To encode/decode Activity/ChildWorkflow arguments, one should set DataConverter in two places:
- Inside workflow code, use workflow.WithDataConverter to create new Context,
and pass that context to ExecuteActivity/ExecuteChildWorkflow calls. Cadence support using different DataConverters for different activity/childWorkflow in same workflow.
- Activity/Workflow worker that run these activity/childWorkflow, through worker.Options.
type DomainClient ¶
type DomainClient interface { // Register a domain with cadence server // The errors it can throw: // - DomainAlreadyExistsError // - BadRequestError // - InternalServiceError Register(ctx context.Context, request *s.RegisterDomainRequest) error // Describe a domain. The domain has 3 part of information // DomainInfo - Which has Name, Status, Description, Owner Email // DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError Describe(ctx context.Context, name string) (*s.DescribeDomainResponse, error) // Update a domain. // The errors it can throw: // - EntityNotExistsError // - BadRequestError // - InternalServiceError Update(ctx context.Context, request *s.UpdateDomainRequest) error }
DomainClient is the client for managing operations on the domain. CLI, tools, ... can use this layer to manager operations on domain.
func NewDomainClient ¶
func NewDomainClient(service workflowserviceclient.Interface, options *ClientOptions) DomainClient
NewDomainClient creates an instance of a domain client, to manager lifecycle of domains.
type EncodedValue ¶
type EncodedValue struct {
// contains filtered or unexported fields
}
EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity.
func (EncodedValue) Get ¶
func (b EncodedValue) Get(valuePtr interface{}) error
Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.
func (EncodedValue) HasValue ¶ added in v0.5.1
func (b EncodedValue) HasValue() bool
HasValue return whether there is value
type EncodedValues ¶
type EncodedValues struct {
// contains filtered or unexported fields
}
EncodedValues is a type alias used to encapsulate/extract encoded arguments from workflow/activity.
func (EncodedValues) Get ¶
func (b EncodedValues) Get(valuePtr ...interface{}) error
Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.
func (EncodedValues) HasValues ¶ added in v0.5.1
func (b EncodedValues) HasValues() bool
HasValues return whether there are values
type ErrorDetailsValues ¶ added in v0.7.0
type ErrorDetailsValues []interface{}
ErrorDetailsValues is a type alias used hold error details objects.
func (ErrorDetailsValues) Get ¶ added in v0.7.0
func (b ErrorDetailsValues) Get(valuePtr ...interface{}) error
Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.
func (ErrorDetailsValues) HasValues ¶ added in v0.7.0
func (b ErrorDetailsValues) HasValues() bool
HasValues return whether there are values.
type FeatureFlags ¶ added in v0.18.1
type Future ¶
type Future interface { // Get blocks until the future is ready. // When ready it either returns the Future's contained error, or assigns the contained value to the output var. // Failures to assign or decode the value will panic. // // Two common patterns to retrieve data are: // var out string // // this will assign the string value, which may be "", or an error and leave out as "". // err := f.Get(ctx, &out) // and // var out *string // // this will assign the string value, which may be "" or nil, or an error and leave out as nil. // err := f.Get(ctx, &out) // // The valuePtr parameter can be nil when the encoded result value is not needed: // err := f.Get(ctx, nil) // // Futures with values set in-memory via a call to their Settable's methods can be retrieved without knowing the // type with an interface, i.e. this will not ever panic: // var out interface{} // // this will assign the same value that was set, // // and you can check its type with reflection or type assertions. // err := f.Get(ctx, &out) // // Futures with encoded data from e.g. activities or child workflows can bypass decoding with a byte slice, and // similarly this will not ever panic: // var out []byte // // out will contain the raw bytes given to Cadence's servers, you should decode it however is necessary // err := f.Get(ctx, &out) // err can only be the Future's contained error Get(ctx Context, valuePtr interface{}) error // IsReady will return true Get is guaranteed to not block. IsReady() bool }
Future represents the result of an asynchronous computation.
func ExecuteActivity ¶
ExecuteActivity requests activity execution in the context of a workflow. Context can be used to pass the settings for this activity. For example: task list that this need to be routed, timeouts that need to be configured. Use ActivityOptions to pass down the options.
ao := ActivityOptions{ TaskList: "exampleTaskList", ScheduleToStartTimeout: 10 * time.Second, StartToCloseTimeout: 5 * time.Second, ScheduleToCloseTimeout: 10 * time.Second, HeartbeatTimeout: 0, } ctx := WithActivityOptions(ctx, ao)
Or to override a single option
ctx := WithTaskList(ctx, "exampleTaskList")
Input activity is either an activity name (string) or a function representing an activity that is getting scheduled. Input args are the arguments that need to be passed to the scheduled activity.
If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity using context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.
ExecuteActivity returns Future with activity result or failure.
func ExecuteLocalActivity ¶ added in v0.5.1
ExecuteLocalActivity requests to run a local activity. A local activity is like a regular activity with some key differences: * Local activity is scheduled and run by the workflow worker locally. * Local activity does not need Cadence server to schedule activity task and does not rely on activity worker. * No need to register local activity. * The parameter activity to ExecuteLocalActivity() must be a function. * Local activity is for short living activities (usually finishes within seconds). * Local activity cannot heartbeat.
Context can be used to pass the settings for this local activity. For now there is only one setting for timeout to be set:
lao := LocalActivityOptions{ ScheduleToCloseTimeout: 5 * time.Second, } ctx := WithLocalActivityOptions(ctx, lao)
The timeout here should be relative shorter than the DecisionTaskStartToCloseTimeout of the workflow. If you need a longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is designed to be used for short living activities (usually finishes within seconds).
Input args are the arguments that will to be passed to the local activity. The input args will be hand over directly to local activity function without serialization/deserialization because we don't need to pass the input across process boundary. However, the result will still go through serialization/deserialization because we need to record the result as history to cadence server so if the workflow crashes, a different worker can replay the history without running the local activity again.
If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity by cancel the context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.
ExecuteLocalActivity returns Future with local activity result or failure.
func NewTimer ¶
NewTimer returns immediately and the future becomes ready after the specified duration d. The workflow needs to use this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer()). You can cancel the pending timer by cancel the Context (using context from workflow.WithCancel(ctx)) and that will cancel the timer. After timer is canceled, the returned Future become ready, and Future.Get() will return *CanceledError. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
func RequestCancelExternalWorkflow ¶ added in v0.5.1
RequestCancelExternalWorkflow can be used to request cancellation of an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:
ctx := WithWorkflowDomain(ctx, "domain-name")
RequestCancelExternalWorkflow return Future with failure or empty success result.
func SignalExternalWorkflow ¶ added in v0.5.1
func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future
SignalExternalWorkflow can be used to send signal info to an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:
ctx := WithWorkflowDomain(ctx, "domain-name")
SignalExternalWorkflow return Future with failure or empty success result.
type GenericError ¶
type GenericError struct {
// contains filtered or unexported fields
}
GenericError returned from workflow/workflow when the implementations return errors other than from NewCustomError() API.
type HeaderReader ¶ added in v0.8.4
HeaderReader is an interface to read information from cadence headers
func NewHeaderReader ¶ added in v0.8.4
func NewHeaderReader(header *shared.Header) HeaderReader
NewHeaderReader returns a header reader interface
type HeaderWriter ¶ added in v0.8.4
HeaderWriter is an interface to write information to cadence headers
func NewHeaderWriter ¶ added in v0.8.4
func NewHeaderWriter(header *shared.Header) HeaderWriter
NewHeaderWriter returns a header writer interface
type HistoryEventIterator ¶
type HistoryEventIterator interface { // HasNext return whether this iterator has next value HasNext() bool // Next returns the next history events and error // The errors it can return: // - EntityNotExistsError // - BadRequestError // - InternalServiceError Next() (*s.HistoryEvent, error) }
HistoryEventIterator represents the interface for history event iterator
type HistoryIterator ¶
type HistoryIterator interface { // GetNextPage returns next page of history events GetNextPage() (*s.History, error) // Reset resets the internal state so next GetNextPage() call will return first page of events from beginning. Reset() // HasNextPage returns if there are more page of events HasNextPage() bool }
HistoryIterator iterator through history events
type JWTAuthProvider ¶ added in v0.19.0
type JWTAuthProvider struct {
PrivateKey []byte
}
func (*JWTAuthProvider) GetAuthToken ¶ added in v0.19.0
func (j *JWTAuthProvider) GetAuthToken() ([]byte, error)
type LocalActivityOptions ¶ added in v0.5.1
type LocalActivityOptions struct { // ScheduleToCloseTimeout - The end to end timeout for the local activity. // This field is required. ScheduleToCloseTimeout time.Duration // RetryPolicy specify how to retry activity if error happens. // Optional: default is no retry RetryPolicy *RetryPolicy }
LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
type MockCallWrapper ¶
type MockCallWrapper struct {
// contains filtered or unexported fields
}
MockCallWrapper is a wrapper to mock.Call. It offers the ability to wait on workflow's clock instead of wall clock.
func (*MockCallWrapper) After ¶
func (c *MockCallWrapper) After(d time.Duration) *MockCallWrapper
After sets how long to wait on workflow's clock before the mock call returns.
func (*MockCallWrapper) AfterFn ¶ added in v0.7.2
func (c *MockCallWrapper) AfterFn(fn func() time.Duration) *MockCallWrapper
AfterFn sets a function which will tell how long to wait on workflow's clock before the mock call returns.
func (*MockCallWrapper) Once ¶
func (c *MockCallWrapper) Once() *MockCallWrapper
Once indicates that the mock should only return the value once.
func (*MockCallWrapper) Return ¶
func (c *MockCallWrapper) Return(returnArguments ...interface{}) *MockCallWrapper
Return specifies the return arguments for the expectation.
func (*MockCallWrapper) Run ¶
func (c *MockCallWrapper) Run(fn func(args mock.Arguments)) *MockCallWrapper
Run sets a handler to be called before returning. It can be used when mocking a method such as unmarshalers that takes a pointer to a struct and sets properties in such struct.
func (*MockCallWrapper) Times ¶
func (c *MockCallWrapper) Times(i int) *MockCallWrapper
Times indicates that the mock should only return the indicated number of times.
func (*MockCallWrapper) Twice ¶
func (c *MockCallWrapper) Twice() *MockCallWrapper
Twice indicates that the mock should only return the value twice.
type NonDeterministicWorkflowPolicy ¶ added in v0.7.0
type NonDeterministicWorkflowPolicy int
NonDeterministicWorkflowPolicy is an enum for configuring how client's decision task handler deals with mismatched history events (presumably arising from non-deterministic workflow definitions).
const ( // NonDeterministicWorkflowPolicyBlockWorkflow is the default policy for handling detected non-determinism. // This option simply logs to console with an error message that non-determinism is detected, but // does *NOT* reply anything back to the server. // It is chosen as default for backward compatibility reasons because it preserves the old behavior // for handling non-determinism that we had before NonDeterministicWorkflowPolicy type was added to // allow more configurability. NonDeterministicWorkflowPolicyBlockWorkflow NonDeterministicWorkflowPolicy = iota // NonDeterministicWorkflowPolicyFailWorkflow behaves exactly the same as Ignore, up until the very // end of processing a decision task. // Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request // to fail the workflow execution. NonDeterministicWorkflowPolicyFailWorkflow )
type Option ¶ added in v1.0.0
type Option interface {
// contains filtered or unexported methods
}
func WithCancelReason ¶ added in v1.0.0
WithCancelReason can be passed to Client.CancelWorkflow to provide an explicit cancellation reason, which will be recorded in the cancellation event in the workflow's history, similar to termination reasons. This is purely informational, and does not influence Cadence behavior at all.
type PanicError ¶
type PanicError struct {
// contains filtered or unexported fields
}
PanicError contains information about panicked workflow/activity.
func (*PanicError) StackTrace ¶
func (e *PanicError) StackTrace() string
StackTrace return stack trace of the panic
type ParentClosePolicy ¶ added in v0.10.0
type ParentClosePolicy int
ParentClosePolicy defines the action on children when parent is closed
const ( // ParentClosePolicyTerminate means terminating the child workflow ParentClosePolicyTerminate ParentClosePolicy = iota // ParentClosePolicyRequestCancel means requesting cancellation on the child workflow ParentClosePolicyRequestCancel // ParentClosePolicyAbandon means not doing anything on the child workflow ParentClosePolicyAbandon )
type QueryBuilder ¶ added in v0.17.0
type QueryBuilder interface { WorkflowTypes([]string) QueryBuilder WorkflowStatus([]WorkflowStatus) QueryBuilder StartTime(time.Time, time.Time) QueryBuilder Build() string }
QueryBuilder builds visibility query
func NewQueryBuilder ¶ added in v0.17.0
func NewQueryBuilder() QueryBuilder
NewQueryBuilder creates a new visibility QueryBuilder
type QueryWorkflowWithOptionsRequest ¶ added in v0.9.3
type QueryWorkflowWithOptionsRequest struct { // WorkflowID is a required field indicating the workflow which should be queried. WorkflowID string // RunID is an optional field used to identify a specific run of the queried workflow. // If RunID is not provided the latest run will be used. RunID string // QueryType is a required field which specifies the query you want to run. // By default, cadence supports "__stack_trace" as a standard query type, which will return string value // representing the call stack of the target workflow. The target workflow could also setup different query handler to handle custom query types. // See comments at workflow.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details on how to setup query handler within the target workflow. QueryType string // Args is an optional field used to identify the arguments passed to the query. Args []interface{} // QueryRejectCondition is an optional field used to reject queries based on workflow state. // QueryRejectConditionNotOpen will reject queries to workflows which are not open // QueryRejectConditionNotCompletedCleanly will reject queries to workflows which completed in any state other than completed (e.g. terminated, canceled timeout etc...) QueryRejectCondition *s.QueryRejectCondition // QueryConsistencyLevel is an optional field used to control the consistency level. // QueryConsistencyLevelEventual means that query will eventually reflect up to date state of a workflow. // QueryConsistencyLevelStrong means that query will reflect a workflow state of having applied all events which came before the query. QueryConsistencyLevel *s.QueryConsistencyLevel }
QueryWorkflowWithOptionsRequest is the request to QueryWorkflowWithOptions
type QueryWorkflowWithOptionsResponse ¶ added in v0.9.3
type QueryWorkflowWithOptionsResponse struct { // QueryResult contains the result of executing the query. // This will only be set if the query was completed successfully and not rejected. QueryResult Value // QueryRejected contains information about the query rejection. QueryRejected *s.QueryRejected }
QueryWorkflowWithOptionsResponse is the response to QueryWorkflowWithOptions
type RegisterActivityOptions ¶
type RegisterActivityOptions struct { // When an activity is a function the name is an actual activity type name. // When an activity is part of a structure then each member of the structure becomes an activity with // this Name as a prefix + activity function name. Name string // Activity type name is equal to function name instead of fully qualified // name including function package (and struct type if used). // This option has no effect when explicit Name is provided. EnableShortName bool DisableAlreadyRegisteredCheck bool // Automatically send heartbeats for this activity at an interval that is less than the HeartbeatTimeout. // This option has no effect if the activity is executed with a HeartbeatTimeout of 0. // Default: false EnableAutoHeartbeat bool }
RegisterActivityOptions consists of options for registering an activity
type RegisterWorkflowOptions ¶
type RegisterWorkflowOptions struct { Name string // Workflow type name is equal to function name instead of fully qualified name including function package. // This option has no effect when explicit Name is provided. EnableShortName bool DisableAlreadyRegisteredCheck bool }
RegisterWorkflowOptions consists of options for registering a workflow
type ReplayOptions ¶ added in v0.17.0
type ReplayOptions struct { // Optional: Sets DataConverter to customize serialization/deserialization of arguments in Cadence // default: defaultDataConverter, an combination of thriftEncoder and jsonEncoder DataConverter DataConverter // Optional: Specifies factories used to instantiate workflow interceptor chain // The chain is instantiated per each replay of a workflow execution WorkflowInterceptorChainFactories []WorkflowInterceptorFactory // Optional: Sets ContextPropagators that allows users to control the context information passed through a workflow // default: no ContextPropagators ContextPropagators []ContextPropagator // Optional: Sets opentracing Tracer that is to be used to emit tracing information // default: no tracer - opentracing.NoopTracer Tracer opentracing.Tracer // Optional: flags to turn on/off some features on server side // default: all features under the struct is turned off FeatureFlags FeatureFlags }
ReplayOptions is used to configure the replay decision task worker.
type RetryPolicy ¶ added in v0.7.5
type RetryPolicy struct { // Backoff interval for the first retry. If coefficient is 1.0 then it is used for all retries. // Required, no default value. InitialInterval time.Duration // Coefficient used to calculate the next retry backoff interval. // The next retry interval is previous interval multiplied by this coefficient. // Must be 1 or larger. Default is 2.0. BackoffCoefficient float64 // Maximum backoff interval between retries. Exponential backoff leads to interval increase. // This value is the cap of the interval. Default is 100x of initial interval. MaximumInterval time.Duration // Maximum time to retry. Either ExpirationInterval or MaximumAttempts is required. // When exceeded the retries stop even if maximum retries is not reached yet. ExpirationInterval time.Duration // Maximum number of attempts. When exceeded the retries stop even if not expired yet. // If not set or set to 0, it means unlimited, and rely on ExpirationInterval to stop. // Either MaximumAttempts or ExpirationInterval is required. MaximumAttempts int32 // Non-Retriable errors. This is optional. Cadence server will stop retry if error reason matches this list. // Error reason for custom error is specified when your activity/workflow return cadence.NewCustomError(reason). // Error reason for panic error is "cadenceInternal:Panic". // Error reason for any other error is "cadenceInternal:Generic". // Error reason for timeouts is: "cadenceInternal:Timeout TIMEOUT_TYPE". TIMEOUT_TYPE could be START_TO_CLOSE or HEARTBEAT. // Note, cancellation is not a failure, so it won't be retried. NonRetriableErrorReasons []string }
RetryPolicy defines the retry policy. Note that the history of activity with retry policy will be different: the started event will be written down into history only when the activity completes or "finally" timeouts/fails. And the started event only records the last started time. Because of that, to check an activity has started or not, you cannot rely on history events. Instead, you can use CLI to describe the workflow to see the status of the activity:
cadence --do <domain> wf desc -w <wf-id>
type Selector ¶
type Selector interface { // AddReceive waits until a value can be received from a channel. // f is invoked when the channel has data or is closed. // // This is equivalent to `case v, ok := <- aChannel`, and `ok` will only be false when // the channel is both closed and no data was received. // // When f is invoked, the data (or closed state) remains untouched in the channel, so // you need to `c.Receive(ctx, &out)` (or `c.ReceiveAsync(&out)`) to remove and decode the value. // Failure to do this is not an error - the value will simply remain in the channel until a future // Receive retrieves it. // // The `ok` argument will match what a call to c.Receive would return (on a successful read), so it // may be used to check for closed + empty channels without needing to try to read from the channel. // See Channel.Receive for additional details about reading from channels. AddReceive(c Channel, f func(c Channel, ok bool)) Selector // AddSend waits to send a value to a channel. // f is invoked when the value was successfully sent to the channel. // // This is equivalent to `case aChannel <- value`. // // Unlike AddReceive, the value has already been sent on the channel when f is invoked. AddSend(c Channel, v interface{}, f func()) Selector // AddFuture waits until a Future is ready, and then invokes f only once. // If the Future is ready before Select is called, it is eligible to be invoked immediately. // // There is no direct equivalent in a native Go select statement. // It was added because Futures are common in Cadence code, and some patterns are much simpler with it. // // Each call to AddFuture will invoke its f at most one time, regardless of how many times Select is called. // This means, for a Future that is (or will be) ready: // - Adding the Future once, then calling Select twice, will invoke the callback once with the first Select // call, and then wait for other Selector conditions in the second Select call (or block forever if there are // no other eligible conditions). // - Adding the same Future twice, then calling Select twice, will invoke each callback once. // - Adding the same Future to two different Selectors, then calling Select once on each Selector, will invoke // each Selector's callback once. // // Therefore, with a Future "f" that is or will become ready, this is an infinite loop that will consume as much // CPU as possible: // for { // workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){}).Select(ctx) // } // While this will loop once, and then wait idle forever: // s := workflow.NewSelector(ctx).AddFuture(f, func(f workflow.Future){}) // for { // s.Select(ctx) // } AddFuture(future Future, f func(f Future)) Selector // AddDefault adds a default branch to the selector. // f is invoked immediately when none of the other conditions (AddReceive, AddSend, AddFuture) are met for a // Select call. // // This is equivalent to a `default:` case. // // Note that this applies to each Select call. If you create a Selector with only one AddDefault, and then call // Select on it twice, f will be invoked twice. AddDefault(f func()) // Select waits for one of the added conditions to be met and invokes the callback as described above. // If no condition is met, Select will block until one or more are available, then one callback will be invoked. // If no condition is ever met, Select will block forever. // // Note that Select does not return an error, and does not stop waiting if its Context is canceled. // This mimics a native Go select statement, which has no way to be interrupted except for its listed cases. // // If you wish to stop Selecting when the Context is canceled, use AddReceive with the Context's Done() channel, // in the same way as you would use a `case <- ctx.Done():` in a Go select statement. E.g.: // cancelled := false // s := workflow.NewSelector(ctx) // s.AddFuture(f, func(f workflow.Future) {}) // assume this is never ready // s.AddReceive(ctx.Done(), func(c workflow.Channel, more bool) { // // this will be invoked when the Context is cancelled for any reason, // // and more will be false. // cancelled = true // }) // s.Select(ctx) // if cancelled { // // this will be executed // } Select(ctx Context) }
Selector must be used in workflows instead of a native Go select statement.
Use workflow.NewSelector(ctx) to create a Selector instance, and then add cases to it with its methods. The interface is intended to simulate Go's select statement, and any Go select can be fairly trivially rewritten for a Selector with effectively identical behavior.
For example, normal Go code like below (which will receive values forever, until idle for an hour):
chA := make(chan int) chB := make(chan int) counter := 0 for { select { case x := <- chA: counter += i case y := <- chB: counter += i case <- time.After(time.Hour): break } }
can be written as:
chA := workflow.NewChannel(ctx) chB := workflow.NewChannel(ctx) counter := 0 for { timedout := false s := workflow.NewSelector(ctx) s.AddReceive(chA, func(c workflow.Channel, more bool) { var x int c.Receive(ctx, &x) counter += i }) s.AddReceive(chB, func(c workflow.Channel, more bool) { var y int c.Receive(ctx, &y) counter += i }) s.AddFuture(workflow.NewTimer(ctx, time.Hour), func(f workflow.Future) { timedout = true }) s.Select(ctx) if timedout { break } }
You can create a new Selector as needed or mutate one and call Select multiple times, but note that:
1. AddFuture will not behave the same across both patterns. Read AddFuture for more details.
2. There is no way to remove a case from a Selector, so you must make a new Selector to "remove" them.
Finally, note that Select will not return until a condition's needs are met, like a Go selector - canceling the Context used to construct the Selector, or the Context used to Select, will not (directly) unblock a Select call. Read Select for more details.
func NewNamedSelector ¶
NewNamedSelector creates a new Selector instance with a given human readable name. Name appears in stack traces that are blocked on this Selector.
func NewSelector ¶
NewSelector creates a new Selector instance.
type ServiceInvoker ¶
type ServiceInvoker interface { // All the heartbeat methods will return ActivityTaskCanceledError if activity is cancelled. // Heartbeat sends a record heartbeat request to Cadence server directly without buffering. // It should only be used by the sessions framework. Heartbeat(details []byte) error // BatchHeartbeat sends heartbeat on the first attempt, and batches additional requests // to send it later according to heartbeat timeout. BatchHeartbeat(details []byte) error // BackgroundHeartbeat should only be used by Cadence library internally to heartbeat automatically // without detail. BackgroundHeartbeat() error Close(flushBufferedHeartbeat bool) SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error }
ServiceInvoker abstracts calls to the Cadence service from an activity implementation. Implement to unit test activities.
type SessionInfo ¶ added in v0.8.4
type SessionInfo struct { SessionID string HostName string // contains filtered or unexported fields }
SessionInfo contains information of a created session. For now, exported fields are SessionID and HostName. SessionID is a uuid generated when CreateSession() or RecreateSession() is called and can be used to uniquely identify a session. HostName specifies which host is executing the session
func GetSessionInfo ¶ added in v0.8.4
func GetSessionInfo(ctx Context) *SessionInfo
GetSessionInfo returns the sessionInfo stored in the context. If there are multiple sessions in the context, (for example, the same context is used to create, complete, create another session. Then user found that the session has failed, and created a new one on it), the most recent sessionInfo will be returned.
This API will return nil if there's no sessionInfo in the context.
func (*SessionInfo) GetRecreateToken ¶ added in v0.8.4
func (s *SessionInfo) GetRecreateToken() []byte
GetRecreateToken returns the token needed to recreate a session. The returned value should be passed to RecreateSession() API.
type SessionOptions ¶ added in v0.8.4
type SessionOptions struct { ExecutionTimeout time.Duration CreationTimeout time.Duration HeartbeatTimeout time.Duration }
SessionOptions specifies metadata for a session. ExecutionTimeout: required, no default
Specifies the maximum amount of time the session can run
CreationTimeout: required, no default
Specifies how long session creation can take before returning an error
HeartbeatTimeout: optional, default 20s
Specifies the heartbeat timeout. If heartbeat is not received by server within the timeout, the session will be declared as failed
type Settable ¶
type Settable interface { Set(value interface{}, err error) SetValue(value interface{}) SetError(err error) Chain(future Future) // Value (or error) of the future become the same of the chained one. }
Settable is used to set value or error on a future. See more: workflow.NewFuture(ctx).
type ShadowExitCondition ¶ added in v0.17.0
type ShadowExitCondition struct { // Optional: Expiration interval for shadowing. // Shadowing will exit when this interval has passed. // default: no expiration interval ExpirationInterval time.Duration // Optional: Target number of shadowed workflows. // Shadowing will exit after this number is reached. // default: no limit on shadow count ShadowCount int }
ShadowExitCondition configures when the workflow shadower should exit. If not specified shadower will exit after replaying all workflows satisfying the visibility query.
type ShadowMode ¶ added in v0.17.0
type ShadowMode int
ShadowMode is an enum for configuring if shadowing should continue after all workflows matches the WorkflowQuery have been replayed.
const ( // ShadowModeNormal is the default mode for workflow shadowing. // Shadowing will complete after all workflows matches WorkflowQuery have been replayed. ShadowModeNormal ShadowMode = iota // ShadowModeContinuous mode will start a new round of shadowing // after all workflows matches WorkflowQuery have been replayed. // There will be a 5 min wait period between each round, // currently this wait period is not configurable. // Shadowing will complete only when ExitCondition is met. // ExitCondition must be specified when using this mode ShadowModeContinuous )
type ShadowOptions ¶ added in v0.17.0
type ShadowOptions struct { // Optional: Workflow visibility query for getting workflows that should be replayed // if specified, WorkflowTypes, WorkflowStatus, WorkflowStartTimeFilter fields must not be specified. // default: empty query, which matches all workflows WorkflowQuery string // Optional: A list of workflow type names. // The list will be used to construct WorkflowQuery. Only workflows with types listed will be replayed. // default: empty list, which matches all workflow types WorkflowTypes []string // Optional: A list of workflow status. // The list will be used to construct WorkflowQuery. Only workflows with status listed will be replayed. // accepted values (case-insensitive): OPEN, CLOSED, ALL, COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT // default: OPEN, which matches only open workflows WorkflowStatus []string // Optional: Min and Max workflow start timestamp. // Timestamps will be used to construct WorkflowQuery. Only workflows started within the time range will be replayed. // default: no time filter, which matches all workflow start timestamp WorkflowStartTimeFilter TimeFilter // Optional: sampling rate for the workflows matches WorkflowQuery // only sampled workflows will be replayed // default: 1.0 SamplingRate float64 // Optional: sets if shadowing should continue after all workflows matches the WorkflowQuery have been replayed. // If set to ShadowModeContinuous, ExitCondition must be specified. // default: ShadowModeNormal, which means shadowing will complete after all workflows have been replayed Mode ShadowMode // Reqired if Mode is set to ShadowModeContinuous: controls when shadowing should complete ExitCondition ShadowExitCondition // Optional: workflow shadowing concurrency (# of concurrent workflow replay activities) // Note: this field only applies to shadow worker. For the local WorkflowShadower, // the concurrency will always be 1. // An error will be returned if it's set to be larger than 1 when used to NewWorkflowShadower // default: 1 Concurrency int }
ShadowOptions is used to configure workflow shadowing.
type StartWorkflowOptions ¶
type StartWorkflowOptions struct { // ID - The business identifier of the workflow execution. // Optional: defaulted to a uuid. ID string // TaskList - The decisions of the workflow are scheduled on this queue. // This is also the default task list on which activities are scheduled. The workflow author can choose // to override this using activity options. // Mandatory: No default. TaskList string // ExecutionStartToCloseTimeout - The timeout for duration of workflow execution. // The resolution is seconds. // Mandatory: No default. ExecutionStartToCloseTimeout time.Duration // DecisionTaskStartToCloseTimeout - The timeout for processing decision task from the time the worker // pulled this task. If a decision task is lost, it is retried after this timeout. // The resolution is seconds. // Optional: defaulted to 10 secs. DecisionTaskStartToCloseTimeout time.Duration // WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful // for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate. // Optional: defaulted to WorkflowIDReusePolicyAllowDuplicateFailedOnly. WorkflowIDReusePolicy WorkflowIDReusePolicy // RetryPolicy - Optional retry policy for workflow. If a retry policy is specified, in case of workflow failure // server will start new workflow execution if needed based on the retry policy. RetryPolicy *RetryPolicy // CronSchedule - Optional cron schedule for workflow. If a cron schedule is specified, the workflow will run // as a cron based on the schedule. The scheduling will be based on UTC time. Schedule for next run only happen // after the current run is completed/failed/timeout. If a RetryPolicy is also supplied, and the workflow failed // or timeout, the workflow will be retried based on the retry policy. While the workflow is retrying, it won't // schedule its next run. If next schedule is due while workflow is running (or retrying), then it will skip that // schedule. Cron workflow will not stop until it is terminated or cancelled (by returning cadence.CanceledError). // The cron spec is as following: // ┌───────────── minute (0 - 59) // │ ┌───────────── hour (0 - 23) // │ │ ┌───────────── day of the month (1 - 31) // │ │ │ ┌───────────── month (1 - 12) // │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) // │ │ │ │ │ // │ │ │ │ │ // * * * * * CronSchedule string // Memo - Optional non-indexed info that will be shown in list workflow. Memo map[string]interface{} // SearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs (only // supported when Cadence server is using ElasticSearch). The key and value type must be registered on Cadence server side. // Use GetSearchAttributes API to get valid key and corresponding value type. SearchAttributes map[string]interface{} // DelayStartSeconds - Seconds to delay the workflow start // The resolution is seconds. // Optional: defaulted to 0 seconds DelayStart time.Duration // JitterStart - Seconds to jitter the workflow start. For example, if set to 10, the workflow will start some time between 0-10 seconds. // This works with CronSchedule and with DelayStart. // Optional: defaulted to 0 seconds JitterStart time.Duration }
StartWorkflowOptions configuration parameters for starting a workflow execution. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
type TerminatedError ¶
type TerminatedError struct { }
TerminatedError returned when workflow was terminated.
type TestActivityEnvironment ¶
type TestActivityEnvironment struct {
// contains filtered or unexported fields
}
TestActivityEnvironment is the environment that you use to test activity
func (*TestActivityEnvironment) ExecuteActivity ¶
func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args ...interface{}) (Value, error)
ExecuteActivity executes an activity. The tested activity will be executed synchronously in the calling goroutinue. Caller should use Value.Get() to extract strong typed result value.
func (*TestActivityEnvironment) ExecuteLocalActivity ¶ added in v0.5.1
func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, err error)
ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue. Caller should use Value.Get() to extract strong typed result value.
func (*TestActivityEnvironment) RegisterActivity ¶ added in v0.12.2
func (t *TestActivityEnvironment) RegisterActivity(a interface{})
RegisterActivity registers activity implementation with TestWorkflowEnvironment
func (*TestActivityEnvironment) RegisterActivityWithOptions ¶ added in v0.12.2
func (t *TestActivityEnvironment) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
RegisterActivityWithOptions registers activity implementation with TestWorkflowEnvironment
func (*TestActivityEnvironment) SetHeartbeatDetails ¶ added in v0.8.1
func (t *TestActivityEnvironment) SetHeartbeatDetails(details interface{})
SetHeartbeatDetails sets the heartbeat details to be returned from activity.GetHeartbeatDetails()
func (*TestActivityEnvironment) SetTestTimeout ¶ added in v0.7.0
func (t *TestActivityEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestActivityEnvironment
SetTestTimeout sets the wall clock timeout for this activity test run. When test timeout happen, it means activity is taking too long.
func (*TestActivityEnvironment) SetWorkerOptions ¶
func (t *TestActivityEnvironment) SetWorkerOptions(options WorkerOptions) *TestActivityEnvironment
SetWorkerOptions sets the WorkerOptions that will be use by TestActivityEnvironment. TestActivityEnvironment will use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored. Note: WorkerOptions is defined in internal package, use public type worker.Options instead.
func (*TestActivityEnvironment) SetWorkerStopChannel ¶ added in v0.8.2
func (t *TestActivityEnvironment) SetWorkerStopChannel(c chan struct{})
SetWorkerStopChannel sets the worker stop channel to be returned from activity.GetWorkerStopChannel(context) To test your activity on worker stop, you can provide a go channel with this function and call ExecuteActivity(). Then call close(channel) to test the activity worker stop logic.
type TestWorkflowEnvironment ¶
TestWorkflowEnvironment is the environment that you use to test workflow
func (*TestWorkflowEnvironment) CancelWorkflow ¶
func (t *TestWorkflowEnvironment) CancelWorkflow()
CancelWorkflow requests cancellation (through workflow Context) to the currently running test workflow.
func (*TestWorkflowEnvironment) CompleteActivity ¶
func (t *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result interface{}, err error) error
CompleteActivity complete an activity that had returned activity.ErrResultPending error
func (*TestWorkflowEnvironment) ExecuteWorkflow ¶
func (t *TestWorkflowEnvironment) ExecuteWorkflow(workflowFn interface{}, args ...interface{})
ExecuteWorkflow executes a workflow, wait until workflow complete. It will fail the test if workflow is blocked and cannot complete within TestTimeout (set by SetTestTimeout()).
func (*TestWorkflowEnvironment) GetWorkflowError ¶
func (t *TestWorkflowEnvironment) GetWorkflowError() error
GetWorkflowError return the error from test workflow
func (*TestWorkflowEnvironment) GetWorkflowResult ¶
func (t *TestWorkflowEnvironment) GetWorkflowResult(valuePtr interface{}) error
GetWorkflowResult extracts the encoded result from test workflow, it also returns error from test workflow.
func (*TestWorkflowEnvironment) IsWorkflowCompleted ¶
func (t *TestWorkflowEnvironment) IsWorkflowCompleted() bool
IsWorkflowCompleted check if test is completed or not
func (*TestWorkflowEnvironment) Now ¶
func (t *TestWorkflowEnvironment) Now() time.Time
Now returns the current workflow time (a.k.a workflow.Now() time) of this TestWorkflowEnvironment.
func (*TestWorkflowEnvironment) OnActivity ¶
func (t *TestWorkflowEnvironment) OnActivity(activity interface{}, args ...interface{}) *MockCallWrapper
OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked activity, or it should be mock values with the same types as the mocked activity function returns. Example: assume the activity you want to mock has function signature as:
func MyActivity(ctx context.Context, msg string) (string, error)
You can mock it by return a function with exact same signature:
t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return(func(ctx context.Context, msg string) (string, error) { // your mock function implementation return "", nil })
OR return mock values with same types as activity function's return types:
t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return("mock_result", nil)
func (*TestWorkflowEnvironment) OnGetVersion ¶ added in v0.8.1
func (t *TestWorkflowEnvironment) OnGetVersion(changeID string, minSupported, maxSupported Version) *MockCallWrapper
OnGetVersion setup a mock for workflow.GetVersion() call. By default, if mock is not setup, the GetVersion call from workflow code will always return the maxSupported version. Make it not possible to test old version branch. With this mock support, it is possible to test code branch for different versions.
Note: mock can be setup for a specific changeID. Or if mock.Anything is used as changeID then all calls to GetVersion will be mocked. Mock for a specific changeID has higher priority over mock.Anything.
func (*TestWorkflowEnvironment) OnRequestCancelExternalWorkflow ¶ added in v0.5.1
func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(domainName, workflowID, runID string) *MockCallWrapper
OnRequestCancelExternalWorkflow setup a mock for cancellation of external workflow. This TestWorkflowEnvironment handles cancellation of workflows that are started from the root workflow. For example, cancellation sent from parent to child workflows. Or cancellation between 2 child workflows. However, it does not know what to do if your tested workflow code is sending cancellation to external unknown workflows. In that case, you will need to setup mock for those cancel calls. Some examples of how to setup mock:
- mock for specific target workflow that matches specific workflow ID and run ID env.OnRequestCancelExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1").Return(nil).Once()
- mock for anything and succeed the cancellation env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
- mock for anything and fail the cancellation env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once()
- mock function for RequestCancelExternalWorkflow env.OnRequestCancelExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return( func(domainName, workflowID, runID) error { // you can do differently based on the parameters return nil })
func (*TestWorkflowEnvironment) OnSignalExternalWorkflow ¶ added in v0.5.1
func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(domainName, workflowID, runID, signalName, arg interface{}) *MockCallWrapper
OnSignalExternalWorkflow setup a mock for sending signal to external workflow. This TestWorkflowEnvironment handles sending signals between the workflows that are started from the root workflow. For example, sending signals between parent and child workflows. Or sending signals between 2 child workflows. However, it does not know what to do if your tested workflow code is sending signal to external unknown workflows. In that case, you will need to setup mock for those signal calls. Some examples of how to setup mock:
- mock for specific target workflow that matches specific signal name and signal data env.OnSignalExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1", "test-signal", "test-data").Return(nil).Once()
- mock for anything and succeed the send env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
- mock for anything and fail the send env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once()
- mock function for SignalExternalWorkflow env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( func(domainName, workflowID, runID, signalName string, arg interface{}) error { // you can do differently based on the parameters return nil })
func (*TestWorkflowEnvironment) OnUpsertSearchAttributes ¶ added in v0.9.0
func (t *TestWorkflowEnvironment) OnUpsertSearchAttributes(attributes map[string]interface{}) *MockCallWrapper
OnUpsertSearchAttributes setup a mock for workflow.UpsertSearchAttributes call. If mock is not setup, the UpsertSearchAttributes call will only validate input attributes. If mock is setup, all UpsertSearchAttributes calls in workflow have to be mocked.
func (*TestWorkflowEnvironment) OnWorkflow ¶
func (t *TestWorkflowEnvironment) OnWorkflow(workflow interface{}, args ...interface{}) *MockCallWrapper
OnWorkflow setup a mock call for workflow. Parameter workflow must be workflow function (func) or workflow name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked workflow, or it should be mock values with the same types as the mocked workflow function returns. Example: assume the workflow you want to mock has function signature as:
func MyChildWorkflow(ctx workflow.Context, msg string) (string, error)
You can mock it by return a function with exact same signature:
t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return(func(ctx workflow.Context, msg string) (string, error) { // your mock function implementation return "", nil })
OR return mock values with same types as workflow function's return types:
t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return("mock_result", nil)
You could also setup mock to simulate start child workflow failure case by returning ErrMockStartChildWorkflowFailed as error.
func (*TestWorkflowEnvironment) QueryWorkflow ¶
func (t *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interface{}) (Value, error)
QueryWorkflow queries to the currently running test workflow and returns result synchronously.
func (*TestWorkflowEnvironment) RegisterActivity ¶ added in v0.12.0
func (t *TestWorkflowEnvironment) RegisterActivity(a interface{})
RegisterActivity registers activity
func (*TestWorkflowEnvironment) RegisterActivityWithOptions ¶ added in v0.12.0
func (t *TestWorkflowEnvironment) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
RegisterActivityWithOptions registers activity
func (*TestWorkflowEnvironment) RegisterDelayedCallback ¶
func (t *TestWorkflowEnvironment) RegisterDelayedCallback(callback func(), delayDuration time.Duration)
RegisterDelayedCallback creates a new timer with specified delayDuration using workflow clock (not wall clock). When the timer fires, the callback will be called. By default, this test suite uses mock clock which automatically move forward to fire next timer when workflow is blocked. Use this API to make some event (like activity completion, signal or workflow cancellation) at desired time. Use 0 delayDuration to send a signal to simulate SignalWithStart.
func (*TestWorkflowEnvironment) RegisterWorkflow ¶ added in v0.12.0
func (t *TestWorkflowEnvironment) RegisterWorkflow(w interface{})
RegisterWorkflow register workflows
func (*TestWorkflowEnvironment) RegisterWorkflowWithOptions ¶ added in v0.12.0
func (t *TestWorkflowEnvironment) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions)
RegisterWorkflowWithOptions register workflows
func (*TestWorkflowEnvironment) SetActivityTaskList ¶
func (t *TestWorkflowEnvironment) SetActivityTaskList(tasklist string, activityFn ...interface{})
SetActivityTaskList set the affinity between activity and tasklist. By default, activity can be invoked by any tasklist in this test environment. Use this SetActivityTaskList() to set affinity between activity and a tasklist. Once activity is set to a particular tasklist, that activity will only be available to that tasklist.
func (*TestWorkflowEnvironment) SetLastCompletionResult ¶ added in v0.8.1
func (t *TestWorkflowEnvironment) SetLastCompletionResult(result interface{})
SetLastCompletionResult sets the result to be returned from workflow.GetLastCompletionResult().
func (*TestWorkflowEnvironment) SetMemoOnStart ¶ added in v0.9.0
func (t *TestWorkflowEnvironment) SetMemoOnStart(memo map[string]interface{}) error
SetMemoOnStart sets the memo when start workflow.
func (*TestWorkflowEnvironment) SetOnActivityCanceledListener ¶
func (t *TestWorkflowEnvironment) SetOnActivityCanceledListener( listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment
SetOnActivityCanceledListener sets a listener that will be called after an activity is canceled. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnActivityCompletedListener ¶
func (t *TestWorkflowEnvironment) SetOnActivityCompletedListener( listener func(activityInfo *ActivityInfo, result Value, err error)) *TestWorkflowEnvironment
SetOnActivityCompletedListener sets a listener that will be called after an activity is completed. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnActivityHeartbeatListener ¶
func (t *TestWorkflowEnvironment) SetOnActivityHeartbeatListener( listener func(activityInfo *ActivityInfo, details Values)) *TestWorkflowEnvironment
SetOnActivityHeartbeatListener sets a listener that will be called when activity heartbeat. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnActivityStartedListener ¶
func (t *TestWorkflowEnvironment) SetOnActivityStartedListener( listener func(activityInfo *ActivityInfo, ctx context.Context, args Values)) *TestWorkflowEnvironment
SetOnActivityStartedListener sets a listener that will be called before activity starts execution. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener ¶
func (t *TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener( listener func(workflowInfo *WorkflowInfo)) *TestWorkflowEnvironment
SetOnChildWorkflowCanceledListener sets a listener that will be called when a child workflow is canceled. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.
func (*TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener ¶
func (t *TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener( listener func(workflowInfo *WorkflowInfo, result Value, err error)) *TestWorkflowEnvironment
SetOnChildWorkflowCompletedListener sets a listener that will be called after a child workflow is completed. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.
func (*TestWorkflowEnvironment) SetOnChildWorkflowStartedListener ¶
func (t *TestWorkflowEnvironment) SetOnChildWorkflowStartedListener( listener func(workflowInfo *WorkflowInfo, ctx Context, args Values)) *TestWorkflowEnvironment
SetOnChildWorkflowStartedListener sets a listener that will be called before a child workflow starts execution. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.
func (*TestWorkflowEnvironment) SetOnLocalActivityCanceledListener ¶ added in v0.5.1
func (t *TestWorkflowEnvironment) SetOnLocalActivityCanceledListener( listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment
SetOnLocalActivityCanceledListener sets a listener that will be called after local activity is canceled. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnLocalActivityCompletedListener ¶ added in v0.5.1
func (t *TestWorkflowEnvironment) SetOnLocalActivityCompletedListener( listener func(activityInfo *ActivityInfo, result Value, err error)) *TestWorkflowEnvironment
SetOnLocalActivityCompletedListener sets a listener that will be called after local activity is completed. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnLocalActivityStartedListener ¶ added in v0.5.1
func (t *TestWorkflowEnvironment) SetOnLocalActivityStartedListener( listener func(activityInfo *ActivityInfo, ctx context.Context, args []interface{})) *TestWorkflowEnvironment
SetOnLocalActivityStartedListener sets a listener that will be called before local activity starts execution. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.
func (*TestWorkflowEnvironment) SetOnTimerCancelledListener ¶
func (t *TestWorkflowEnvironment) SetOnTimerCancelledListener(listener func(timerID string)) *TestWorkflowEnvironment
SetOnTimerCancelledListener sets a listener that will be called after a timer is cancelled
func (*TestWorkflowEnvironment) SetOnTimerFiredListener ¶
func (t *TestWorkflowEnvironment) SetOnTimerFiredListener(listener func(timerID string)) *TestWorkflowEnvironment
SetOnTimerFiredListener sets a listener that will be called after a timer is fired.
func (*TestWorkflowEnvironment) SetOnTimerScheduledListener ¶
func (t *TestWorkflowEnvironment) SetOnTimerScheduledListener( listener func(timerID string, duration time.Duration)) *TestWorkflowEnvironment
SetOnTimerScheduledListener sets a listener that will be called before a timer is scheduled.
func (*TestWorkflowEnvironment) SetSearchAttributesOnStart ¶ added in v0.9.0
func (t *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes map[string]interface{}) error
SetSearchAttributesOnStart sets the search attributes when start workflow.
func (*TestWorkflowEnvironment) SetStartTime ¶ added in v0.6.1
func (t *TestWorkflowEnvironment) SetStartTime(startTime time.Time)
SetStartTime sets the start time of the workflow. This is optional, default start time will be the wall clock time when workflow starts. Start time is the workflow.Now(ctx) time at the beginning of the workflow.
func (*TestWorkflowEnvironment) SetTestTimeout ¶
func (t *TestWorkflowEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestWorkflowEnvironment
SetTestTimeout sets the idle timeout based on wall clock for this tested workflow. Idle is when workflow is blocked waiting on events (including timer, activity, child workflow, signal etc). If there is no event happening longer than this idle timeout, the test framework would stop the workflow and return timeout error. This is based on real wall clock time, not the workflow time (a.k.a workflow.Now() time).
func (*TestWorkflowEnvironment) SetWorkerOptions ¶
func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestWorkflowEnvironment
SetWorkerOptions sets the WorkerOptions for TestWorkflowEnvironment. TestWorkflowEnvironment will use options set by use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored. Note: WorkerOptions is defined in internal package, use public type worker.Options instead.
func (*TestWorkflowEnvironment) SetWorkerStopChannel ¶ added in v0.8.2
func (t *TestWorkflowEnvironment) SetWorkerStopChannel(c chan struct{})
SetWorkerStopChannel sets the activity worker stop channel to be returned from activity.GetWorkerStopChannel(context) You can use this function to set the activity worker stop channel and use close(channel) to test your activity execution from workflow execution.
func (*TestWorkflowEnvironment) SetWorkflowCronMaxIterations ¶ added in v0.15.0
func (t *TestWorkflowEnvironment) SetWorkflowCronMaxIterations(maxIterations int) *TestWorkflowEnvironment
SetWorkflowCronMaxIterations sets the a limit on the number of Cron iterations, not including the first one of the tested workflow.
func (*TestWorkflowEnvironment) SetWorkflowCronSchedule ¶ added in v0.15.0
func (t *TestWorkflowEnvironment) SetWorkflowCronSchedule(cron string) *TestWorkflowEnvironment
SetWorkflowCronSchedule sets the Cron schedule for this tested workflow. The first execution of the workflow will not adhere to the Cron schedule and will start executing immediately. Consecutive iterations will follow the specified schedule. Use SetWorkflowCronMaxIterations() to enforce a limit on the number of consecutive iterations after the initial execution.
func (*TestWorkflowEnvironment) SetWorkflowTimeout ¶ added in v0.7.5
func (t *TestWorkflowEnvironment) SetWorkflowTimeout(executionTimeout time.Duration) *TestWorkflowEnvironment
SetWorkflowTimeout sets the execution timeout for this tested workflow. This test framework uses mock clock internally and when workflow is blocked on timer, it will auto forward the mock clock. Use SetWorkflowTimeout() to enforce a workflow execution timeout to return timeout error when the workflow mock clock is moved head of the timeout. This is based on the workflow time (a.k.a workflow.Now() time).
func (*TestWorkflowEnvironment) SignalWorkflow ¶
func (t *TestWorkflowEnvironment) SignalWorkflow(name string, input interface{})
SignalWorkflow sends signal to the currently running test workflow.
func (*TestWorkflowEnvironment) SignalWorkflowByID ¶ added in v0.7.5
func (t *TestWorkflowEnvironment) SignalWorkflowByID(workflowID, signalName string, input interface{}) error
SignalWorkflowByID sends signal to the currently running test workflow.
func (*TestWorkflowEnvironment) SignalWorkflowSkippingDecision ¶ added in v0.8.4
func (t *TestWorkflowEnvironment) SignalWorkflowSkippingDecision(name string, input interface{})
SignalWorkflowSkippingDecision sends signal to the currently running test workflow without invoking workflow code. Used to test processing of multiple buffered signals before completing workflow. It must be followed by SignalWorkflow, CancelWorkflow or CompleteActivity to force a decision.
type TimeFilter ¶ added in v0.17.0
TimeFilter represents a time range through the min and max timestamp
type TimeoutError ¶
type TimeoutError struct {
// contains filtered or unexported fields
}
TimeoutError returned when activity or child workflow timed out.
func NewHeartbeatTimeoutError ¶
func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError
NewHeartbeatTimeoutError creates TimeoutError instance
func NewTimeoutError ¶
func NewTimeoutError(timeoutType shared.TimeoutType, details ...interface{}) *TimeoutError
NewTimeoutError creates TimeoutError instance. Use NewHeartbeatTimeoutError to create heartbeat TimeoutError
func (*TimeoutError) Details ¶
func (e *TimeoutError) Details(d ...interface{}) error
Details extracts strong typed detail data of this error. If there is no details, it will return ErrNoData.
func (*TimeoutError) HasDetails ¶ added in v0.5.1
func (e *TimeoutError) HasDetails() bool
HasDetails return if this error has strong typed detail data.
func (*TimeoutError) TimeoutType ¶
func (e *TimeoutError) TimeoutType() shared.TimeoutType
TimeoutType return timeout type of this error
type UnknownExternalWorkflowExecutionError ¶ added in v0.8.1
type UnknownExternalWorkflowExecutionError struct{}
UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
func (*UnknownExternalWorkflowExecutionError) Error ¶ added in v0.8.1
func (e *UnknownExternalWorkflowExecutionError) Error() string
Error from error interface
type Value ¶ added in v0.9.0
type Value interface { // HasValue return whether there is value encoded. HasValue() bool // Get extract the encoded value into strong typed value pointer. Get(valuePtr interface{}) error }
Value is used to encapsulate/extract encoded value from workflow/activity.
func MutableSideEffect ¶ added in v0.6.1
func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
MutableSideEffect executes the provided function once, then it looks up the history for the value with the given id. If there is no existing value, then it records the function result as a value with the given id on history; otherwise, it compares whether the existing value from history has changed from the new function result by calling the provided equals function. If they are equal, it returns the value without recording a new one in history;
otherwise, it records the new value with the same id on history.
Caution: do not use MutableSideEffect to modify closures. Always retrieve result from MutableSideEffect's encoded return value.
The difference between MutableSideEffect() and SideEffect() is that every new SideEffect() call in non-replay will result in a new marker being recorded on history. However, MutableSideEffect() only records a new marker if the value changed. During replay, MutableSideEffect() will not execute the function again, but it will return the exact same value as it was returning during the non-replay run.
One good use case of MutableSideEffect() is to access dynamically changing config without breaking determinism.
func NewValue ¶ added in v0.6.1
NewValue creates a new encoded.Value which can be used to decode binary data returned by Cadence. For example: User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution. The response contains binary field PendingActivityInfo.HeartbeatDetails, which can be decoded by using:
var result string // This need to be same type as the one passed to RecordHeartbeat NewValue(data).Get(&result)
func SideEffect ¶
SideEffect executes the provided function once, records its result into the workflow history. The recorded result on history will be returned without executing the provided function during replay. This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay. Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID. The only way to fail SideEffect is to panic which causes decision task failure. The decision task after timeout is rescheduled and re-executed giving SideEffect another chance to succeed.
Caution: do not use SideEffect to modify closures. Always retrieve result from SideEffect's encoded return value. For example this code is BROKEN:
// Bad example: var random int workflow.SideEffect(func(ctx workflow.Context) interface{} { random = rand.Intn(100) return nil }) // random will always be 0 in replay, thus this code is non-deterministic if random < 50 { .... } else { .... }
On replay the provided function is not executed, the random will always be 0, and the workflow could takes a different path breaking the determinism.
Here is the correct way to use SideEffect:
// Good example: encodedRandom := SideEffect(func(ctx workflow.Context) interface{} { return rand.Intn(100) }) var random int encodedRandom.Get(&random) if random < 50 { .... } else { .... }
type Values ¶ added in v0.9.0
type Values interface { // HasValues return whether there are values encoded. HasValues() bool // Get extract the encoded values into strong typed value pointers. Get(valuePtr ...interface{}) error }
Values is used to encapsulate/extract encoded one or more values from workflow/activity.
func NewValues ¶ added in v0.6.1
NewValues creates a new encoded.Values which can be used to decode binary data returned by Cadence. For example: User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution. The response contains binary field PendingActivityInfo.HeartbeatDetails, which can be decoded by using:
var result1 string var result2 int // These need to be same type as those arguments passed to RecordHeartbeat NewValues(data).Get(&result1, &result2)
type Version ¶
type Version int
Version represents a change version. See GetVersion call.
const DefaultVersion Version = -1
DefaultVersion is a version returned by GetVersion for code that wasn't versioned before
func GetVersion ¶
GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It is not allowed to update workflow code while there are workflows running as it is going to break determinism. The solution is to have both old code that is used to replay existing workflows as well as the new one that is used when it is executed for the first time. GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. For example initially workflow has the following code:
err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
it should be updated to
err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
The backwards compatible way to execute the update is
v := GetVersion(ctx, "fooChange", DefaultVersion, 1) if v == DefaultVersion { err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) } else { err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) }
Then bar has to be changed to baz:
v := GetVersion(ctx, "fooChange", DefaultVersion, 2) if v == DefaultVersion { err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) } else if v == 1 { err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) } else { err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) }
Later when there are no workflow executions running DefaultVersion the correspondent branch can be removed:
v := GetVersion(ctx, "fooChange", 1, 2) if v == 1 { err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) } else { err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) }
It is recommended to keep the GetVersion() call even if single branch is left:
GetVersion(ctx, "fooChange", 2, 2) err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
The reason to keep it is: 1) it ensures that if there is older version execution still running, it will fail here and not proceed; 2) if you ever need to make more changes for “fooChange”, for example change activity from baz to qux, you just need to update the maxVersion from 2 to 3.
Note that, you only need to preserve the first call to GetVersion() for each changeID. All subsequent call to GetVersion() with same changeID are safe to remove. However, if you really want to get rid of the first GetVersion() call as well, you can do so, but you need to make sure: 1) all older version executions are completed; 2) you can no longer use “fooChange” as changeID. If you ever need to make changes to that same part like change from baz to qux, you would need to use a different changeID like “fooChange-fix2”, and start minVersion from DefaultVersion again. The code would looks like:
v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 1) if v == workflow.DefaultVersion { err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil) } else { err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil) }
type WaitGroup ¶ added in v0.9.0
WaitGroup must be used instead of native go sync.WaitGroup by workflow code. Use workflow.NewWaitGroup(ctx) method to create a new WaitGroup instance
func NewWaitGroup ¶ added in v0.9.0
NewWaitGroup creates a new WaitGroup instance.
type WorkerOptions ¶
type WorkerOptions struct { // Optional: To set the maximum concurrent activity executions this worker can have. // The zero value of this uses the default value. // default: defaultMaxConcurrentActivityExecutionSize(1k) MaxConcurrentActivityExecutionSize int // Optional: Sets the rate limiting on number of activities that can be executed per second per // worker. This can be used to limit resources used by the worker. // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. // The zero value of this uses the default value. Default: 100k WorkerActivitiesPerSecond float64 // Optional: To set the maximum concurrent local activity executions this worker can have. // The zero value of this uses the default value. // default: 1k MaxConcurrentLocalActivityExecutionSize int // Optional: Sets the rate limiting on number of local activities that can be executed per second per // worker. This can be used to limit resources used by the worker. // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your local activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. // The zero value of this uses the default value. Default: 100k WorkerLocalActivitiesPerSecond float64 // Optional: Sets the rate limiting on number of activities that can be executed per second. // This is managed by the server and controls activities per second for your entire tasklist // whereas WorkerActivityTasksPerSecond controls activities only per worker. // Notice that the number is represented in float, so that you can set it to less than // 1 if needed. For example, set the number to 0.1 means you want your activity to be executed // once for every 10 seconds. This can be used to protect down stream services from flooding. // The zero value of this uses the default value. Default: 100k TaskListActivitiesPerSecond float64 // optional: Sets the maximum number of goroutines that will concurrently poll the // cadence-server to retrieve activity tasks. Changing this value will affect the // rate at which the worker is able to consume tasks from a task list. // Default value is 2 MaxConcurrentActivityTaskPollers int // optional: Sets the minimum number of goroutines that will concurrently poll the // cadence-server to retrieve activity tasks. Changing this value will NOT affect the // rate at which the worker is able to consume tasks from a task list, // unless FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 1 MinConcurrentActivityTaskPollers int // Optional: To set the maximum concurrent decision task executions this worker can have. // The zero value of this uses the default value. // default: defaultMaxConcurrentTaskExecutionSize(1k) MaxConcurrentDecisionTaskExecutionSize int // Optional: Sets the rate limiting on number of decision tasks that can be executed per second per // worker. This can be used to limit resources used by the worker. // The zero value of this uses the default value. Default: 100k WorkerDecisionTasksPerSecond float64 // optional: Sets the maximum number of goroutines that will concurrently poll the // cadence-server to retrieve decision tasks. Changing this value will affect the // rate at which the worker is able to consume tasks from a task list. // Default value is 2 MaxConcurrentDecisionTaskPollers int // optional: Sets the minimum number of goroutines that will concurrently poll the // cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true, // changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list. // Default value is 1 MinConcurrentDecisionTaskPollers int // optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count // based on poll result. It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 1 min PollerAutoScalerCooldown time.Duration // optional: Sets the target utilization rate between [0,1]. // Utilization Rate = pollResultWithTask / (pollResultWithTask + pollResultWithNoTask) // It takes effect if FeatureFlags.PollerAutoScalerEnabled is set to true. // Default value is 0.6 PollerAutoScalerTargetUtilization float64 // optional: Sets whether to start dry run mode of autoscaler. // Default value is false PollerAutoScalerDryRun bool // Optional: Sets an identify that can be used to track this host for debugging. // default: default identity that include hostname, groupName and process ID. Identity string // Optional: Defines the 'zone' or the failure group that the worker belongs to IsolationGroup string // Optional: Metrics to be reported. Metrics emitted by the cadence client are not prometheus compatible by // default. To ensure metrics are compatible with prometheus make sure to create tally scope with sanitizer // options set. // var ( // _safeCharacters = []rune{'_'} // _sanitizeOptions = tally.SanitizeOptions{ // NameCharacters: tally.ValidCharacters{ // Ranges: tally.AlphanumericRange, // Characters: _safeCharacters, // }, // KeyCharacters: tally.ValidCharacters{ // Ranges: tally.AlphanumericRange, // Characters: _safeCharacters, // }, // ValueCharacters: tally.ValidCharacters{ // Ranges: tally.AlphanumericRange, // Characters: _safeCharacters, // }, // ReplacementCharacter: tally.DefaultReplacementCharacter, // } // ) // opts := tally.ScopeOptions{ // Reporter: reporter, // SanitizeOptions: &_sanitizeOptions, // } // scope, _ := tally.NewRootScope(opts, time.Second) // default: no metrics. MetricsScope tally.Scope // Optional: Logger framework can use to log. // default: default logger provided. Logger *zap.Logger // Optional: Enable logging in replay. // In the workflow code you can use workflow.GetLogger(ctx) to write logs. By default, the logger will skip log // entry during replay mode so you won't see duplicate logs. This option will enable the logging in replay mode. // This is only useful for debugging purpose. // default: false EnableLoggingInReplay bool // Optional: Disable running workflow workers. // default: false DisableWorkflowWorker bool // Optional: Disable running activity workers. // default: false DisableActivityWorker bool // Optional: Disable sticky execution. // default: false // Sticky Execution is to run the decision tasks for one workflow execution on same worker host. This is an // optimization for workflow execution. When sticky execution is enabled, worker keeps the workflow state in // memory. New decision task contains the new history events will be dispatched to the same worker. If this // worker crashes, the sticky decision task will timeout after StickyScheduleToStartTimeout, and cadence server // will clear the stickiness for that workflow execution and automatically reschedule a new decision task that // is available for any worker to pick up and resume the progress. DisableStickyExecution bool // Optional: Sticky schedule to start timeout. // default: 5s // The resolution is seconds. See details about StickyExecution on the comments for DisableStickyExecution. StickyScheduleToStartTimeout time.Duration // Optional: sets context for activity. The context can be used to pass any configuration to activity // like common logger for all activities. BackgroundActivityContext context.Context // Optional: Sets how decision worker deals with non-deterministic history events // (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow definition changes). // default: NonDeterministicWorkflowPolicyBlockWorkflow, which just logs error but reply nothing back to server NonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy // Optional: Sets DataConverter to customize serialization/deserialization of arguments in Cadence // default: defaultDataConverter, an combination of thriftEncoder and jsonEncoder DataConverter DataConverter // Optional: worker graceful shutdown timeout // default: 0s WorkerStopTimeout time.Duration // Optional: Enable running session workers. // Session workers is for activities within a session. // Enable this option to allow worker to process sessions. // default: false EnableSessionWorker bool // Optional: Sets the maximum number of concurrently running sessions the resource support. // default: 1000 MaxConcurrentSessionExecutionSize int // Optional: Specifies factories used to instantiate workflow interceptor chain // The chain is instantiated per each replay of a workflow execution WorkflowInterceptorChainFactories []WorkflowInterceptorFactory // Optional: Sets ContextPropagators that allows users to control the context information passed through a workflow // default: no ContextPropagators ContextPropagators []ContextPropagator // Optional: Sets opentracing Tracer that is to be used to emit tracing information // default: no tracer - opentracing.NoopTracer Tracer opentracing.Tracer // Optional: Enable worker for running shadowing workflows to replay existing workflows // If set to true: // 1. Worker will run in shadow mode and all other workers (decision, activity, session) // will be disabled to prevent them from updating existing workflow states. // 2. DataConverter, WorkflowInterceptorChainFactories, ContextPropagators, Tracer will be // used as ReplayOptions and forwarded to the underlying WorkflowReplayer. // The actual shadower activity worker will not use them. // 3. TaskList will become Domain-TaskList, to prevent conflict across domains as there's // only one shadowing domain which is responsible for shadowing workflows for all domains. // default: false EnableShadowWorker bool // Optional: Configures shadowing workflow // default: please check the documentation for ShadowOptions for default options ShadowOptions ShadowOptions // Optional: Flags to turn on/off some server side options // default: all the features in the struct are turned off FeatureFlags FeatureFlags // Optional: Authorization interface to get the Auth Token // default: No provider Authorization auth.AuthorizationProvider // Optional: Host is just string on the machine running the client // default: empty string Host string }
WorkerOptions is used to configure a worker instance. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.
type WorkflowExecution ¶
WorkflowExecution Details.
type WorkflowExecutionContext ¶ added in v0.7.1
type WorkflowExecutionContext interface { Lock() Unlock(err error) ProcessWorkflowTask(workflowTask *workflowTask) (completeRequest interface{}, err error) ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (interface{}, error) // CompleteDecisionTask try to complete current decision task and get response that needs to be sent back to server. // The waitLocalActivity is used to control if we should wait for outstanding local activities. // If there is no outstanding local activities or if waitLocalActivity is false, the complete will return response // which will be one of following: // - RespondDecisionTaskCompletedRequest // - RespondDecisionTaskFailedRequest // - RespondQueryTaskCompletedRequest // If waitLocalActivity is true, and there is outstanding local activities, this call will return nil. CompleteDecisionTask(workflowTask *workflowTask, waitLocalActivity bool) interface{} // GetDecisionTimeout returns the TaskStartToCloseTimeout GetDecisionTimeout() time.Duration GetCurrentDecisionTask() *s.PollForDecisionTaskResponse IsDestroyed() bool StackTrace() string }
WorkflowExecutionContext represents one instance of workflow execution state in memory. Lock must be obtained before calling into any methods.
type WorkflowIDReusePolicy ¶
type WorkflowIDReusePolicy int
WorkflowIDReusePolicy defines workflow ID reuse behavior.
const ( // WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution // when workflow not running, and the last execution close state is in // [terminated, cancelled, timeouted, failed]. WorkflowIDReusePolicyAllowDuplicateFailedOnly WorkflowIDReusePolicy = iota // WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using // the same workflow ID, when workflow not running. WorkflowIDReusePolicyAllowDuplicate // WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all WorkflowIDReusePolicyRejectDuplicate // WorkflowIDReusePolicyTerminateIfRunning terminate current running workflow using the same workflow ID if exist, // then start a new run in one transaction WorkflowIDReusePolicyTerminateIfRunning )
type WorkflowInfo ¶
type WorkflowInfo struct { WorkflowExecution WorkflowExecution OriginalRunId string // The original runID before resetting. Using it instead of current runID can make workflow decision determinstic after reset WorkflowType WorkflowType TaskListName string ExecutionStartToCloseTimeoutSeconds int32 TaskStartToCloseTimeoutSeconds int32 Domain string Attempt int32 // Attempt starts from 0 and increased by 1 for every retry if retry policy is specified. CronSchedule *string ContinuedExecutionRunID *string ParentWorkflowDomain *string ParentWorkflowExecution *WorkflowExecution Memo *s.Memo // Value can be decoded using data converter (DefaultDataConverter, or custom one if set). SearchAttributes *s.SearchAttributes // Value can be decoded using DefaultDataConverter. BinaryChecksum *string // The identifier(generated by md5sum by default) of worker code that is making the current decision(can be used for auto-reset feature) DecisionStartedEventID int64 // the eventID of DecisionStarted that is making the current decision(can be used for reset API) RetryPolicy *s.RetryPolicy // contains filtered or unexported fields }
WorkflowInfo information about currently executing workflow
func GetWorkflowInfo ¶
func GetWorkflowInfo(ctx Context) *WorkflowInfo
GetWorkflowInfo extracts info of a current workflow from a context.
func (*WorkflowInfo) GetBinaryChecksum ¶ added in v0.12.2
func (wInfo *WorkflowInfo) GetBinaryChecksum() string
GetBinaryChecksum returns the binary checksum(identifier) of this worker It is the identifier(generated by md5sum by default) of worker code that is making the current decision(can be used for auto-reset feature) In replay mode, it's from DecisionTaskCompleted event. In non-replay mode, it's from the currently executing worker.
func (*WorkflowInfo) GetDecisionStartedEventID ¶ added in v0.18.3
func (wInfo *WorkflowInfo) GetDecisionStartedEventID() int64
GetDecisionCompletedEventID returns the eventID of DecisionStartedEvent that is making the current decision(can be used for reset API: decisionFinishEventID = DecisionStartedEventID + 1)
type WorkflowInterceptor ¶ added in v0.12.2
type WorkflowInterceptor interface { // Intercepts workflow function invocation. As calls to other intercepted functions are done from a workflow // function this function is the first to be called and completes workflow as soon as it returns. // workflowType argument is for information purposes only and should not be mutated. ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{} ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture GetWorkflowInfo(ctx Context) *WorkflowInfo GetLogger(ctx Context) *zap.Logger GetMetricsScope(ctx Context) tally.Scope Now(ctx Context) time.Time NewTimer(ctx Context, d time.Duration) Future Sleep(ctx Context, d time.Duration) (err error) RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error GetSignalChannel(ctx Context, signalName string) Channel SideEffect(ctx Context, f func(ctx Context) interface{}) Value MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version SetQueryHandler(ctx Context, queryType string, handler interface{}) error IsReplaying(ctx Context) bool HasLastCompletionResult(ctx Context) bool GetLastCompletionResult(ctx Context, d ...interface{}) error }
WorkflowInterceptor is an interface that can be implemented to intercept calls to the workflow function as well calls done by the workflow code. Use worker.WorkflowInterceptorBase as a base struct for implementations that do not want to implement every method. Interceptor implementation must forward calls to the next in the interceptor chain. All code in the interceptor is executed in the context of a workflow. So all the rules and restrictions that apply to the workflow code should be obeyed by the interceptor implementation. Use workflow.IsReplaying(ctx) to filter out duplicated calls.
type WorkflowInterceptorBase ¶ added in v0.12.2
type WorkflowInterceptorBase struct {
Next WorkflowInterceptor
}
WorkflowInterceptorBase is a helper type that can simplify creation of WorkflowInterceptorChainFactories
func (*WorkflowInterceptorBase) ExecuteActivity ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) ExecuteActivity(ctx Context, activityType string, args ...interface{}) Future
ExecuteActivity forwards to t.Next
func (*WorkflowInterceptorBase) ExecuteChildWorkflow ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) ExecuteChildWorkflow(ctx Context, childWorkflowType string, args ...interface{}) ChildWorkflowFuture
ExecuteChildWorkflow forwards to t.Next
func (*WorkflowInterceptorBase) ExecuteLocalActivity ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) ExecuteLocalActivity(ctx Context, activityType string, args ...interface{}) Future
ExecuteLocalActivity forwards to t.Next
func (*WorkflowInterceptorBase) ExecuteWorkflow ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) ExecuteWorkflow(ctx Context, workflowType string, args ...interface{}) []interface{}
ExecuteWorkflow forwards to t.Next
func (*WorkflowInterceptorBase) GetLastCompletionResult ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetLastCompletionResult(ctx Context, d ...interface{}) error
GetLastCompletionResult forwards to t.Next
func (*WorkflowInterceptorBase) GetLogger ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetLogger(ctx Context) *zap.Logger
GetLogger forwards to t.Next
func (*WorkflowInterceptorBase) GetMetricsScope ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetMetricsScope(ctx Context) tally.Scope
GetMetricsScope forwards to t.Next
func (*WorkflowInterceptorBase) GetSignalChannel ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetSignalChannel(ctx Context, signalName string) Channel
GetSignalChannel forwards to t.Next
func (*WorkflowInterceptorBase) GetVersion ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
GetVersion forwards to t.Next
func (*WorkflowInterceptorBase) GetWorkflowInfo ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) GetWorkflowInfo(ctx Context) *WorkflowInfo
GetWorkflowInfo forwards to t.Next
func (*WorkflowInterceptorBase) HasLastCompletionResult ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) HasLastCompletionResult(ctx Context) bool
HasLastCompletionResult forwards to t.Next
func (*WorkflowInterceptorBase) IsReplaying ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) IsReplaying(ctx Context) bool
IsReplaying forwards to t.Next
func (*WorkflowInterceptorBase) MutableSideEffect ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) Value
MutableSideEffect forwards to t.Next
func (*WorkflowInterceptorBase) NewTimer ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) NewTimer(ctx Context, d time.Duration) Future
NewTimer forwards to t.Next
func (*WorkflowInterceptorBase) Now ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) Now(ctx Context) time.Time
Now forwards to t.Next
func (*WorkflowInterceptorBase) RequestCancelExternalWorkflow ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future
RequestCancelExternalWorkflow forwards to t.Next
func (*WorkflowInterceptorBase) SetQueryHandler ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) SetQueryHandler(ctx Context, queryType string, handler interface{}) error
SetQueryHandler forwards to t.Next
func (*WorkflowInterceptorBase) SideEffect ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) SideEffect(ctx Context, f func(ctx Context) interface{}) Value
SideEffect forwards to t.Next
func (*WorkflowInterceptorBase) SignalExternalWorkflow ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future
SignalExternalWorkflow forwards to t.Next
func (*WorkflowInterceptorBase) Sleep ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) Sleep(ctx Context, d time.Duration) (err error)
Sleep forwards to t.Next
func (*WorkflowInterceptorBase) UpsertSearchAttributes ¶ added in v0.12.2
func (t *WorkflowInterceptorBase) UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error
UpsertSearchAttributes forwards to t.Next
type WorkflowInterceptorFactory ¶ added in v0.12.2
type WorkflowInterceptorFactory interface { // NewInterceptor creates an interceptor instance. The created instance must delegate every call to // the next parameter for workflow code function correctly. NewInterceptor(info *WorkflowInfo, next WorkflowInterceptor) WorkflowInterceptor }
WorkflowInterceptorFactory is used to create a single link in the interceptor chain
type WorkflowReplayer ¶ added in v0.12.2
type WorkflowReplayer struct {
// contains filtered or unexported fields
}
WorkflowReplayer is used to replay workflow code from an event history
func NewWorkflowReplayer ¶ added in v0.12.2
func NewWorkflowReplayer() *WorkflowReplayer
NewWorkflowReplayer creates an instance of the WorkflowReplayer
func NewWorkflowReplayerWithOptions ¶ added in v0.17.0
func NewWorkflowReplayerWithOptions( options ReplayOptions, ) *WorkflowReplayer
NewWorkflowReplayerWithOptions creates an instance of the WorkflowReplayer with provided replay worker options
func (*WorkflowReplayer) RegisterActivity ¶ added in v1.0.0
func (r *WorkflowReplayer) RegisterActivity(a interface{})
RegisterActivity registers an activity function for this replayer
func (*WorkflowReplayer) RegisterActivityWithOptions ¶ added in v1.0.0
func (r *WorkflowReplayer) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions)
RegisterActivityWithOptions registers an activity function for this replayer with custom options, e.g. an explicit name.
func (*WorkflowReplayer) RegisterWorkflow ¶ added in v0.12.2
func (r *WorkflowReplayer) RegisterWorkflow(w interface{})
RegisterWorkflow registers workflow function to replay
func (*WorkflowReplayer) RegisterWorkflowWithOptions ¶ added in v0.12.2
func (r *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions)
RegisterWorkflowWithOptions registers workflow function with custom workflow name to replay
func (*WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSON ¶ added in v1.0.0
func (*WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile ¶ added in v0.12.2
func (r *WorkflowReplayer) ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the given json history file up to provided lastEventID(inclusive). Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
func (*WorkflowReplayer) ReplayWorkflowExecution ¶ added in v0.12.2
func (r *WorkflowReplayer) ReplayWorkflowExecution( ctx context.Context, service workflowserviceclient.Interface, logger *zap.Logger, domain string, execution WorkflowExecution, ) error
ReplayWorkflowExecution replays workflow execution loading it from Cadence service. The logger is an optional parameter. Defaults to the noop logger.
func (*WorkflowReplayer) ReplayWorkflowHistory ¶ added in v0.12.2
ReplayWorkflowHistory executes a single decision task for the given history. Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
func (*WorkflowReplayer) ReplayWorkflowHistoryFromJSON ¶ added in v1.0.0
func (*WorkflowReplayer) ReplayWorkflowHistoryFromJSONFile ¶ added in v0.12.2
func (r *WorkflowReplayer) ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error
ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
type WorkflowRun ¶
type WorkflowRun interface { // GetID return workflow ID, which will be same as StartWorkflowOptions.ID if provided. GetID() string // GetRunID return the first started workflow run ID (please see below) GetRunID() string // Get will fill the workflow execution result to valuePtr, // if workflow execution is a success, or return corresponding, // error. This is a blocking API. Get(ctx context.Context, valuePtr interface{}) error }
WorkflowRun represents a started non child workflow
type WorkflowShadower ¶ added in v0.17.0
type WorkflowShadower struct {
// contains filtered or unexported fields
}
WorkflowShadower retrieves and replays workflow history from Cadence service to determine if there's any nondeterministic changes in the workflow definition
func NewWorkflowShadower ¶ added in v0.17.0
func NewWorkflowShadower( service workflowserviceclient.Interface, domain string, shadowOptions ShadowOptions, replayOptions ReplayOptions, logger *zap.Logger, ) (*WorkflowShadower, error)
NewWorkflowShadower creates an instance of the WorkflowShadower for testing The logger is an optional parameter. Defaults to noop logger if not provided and will override the logger in WorkerOptions
func (*WorkflowShadower) RegisterWorkflow ¶ added in v0.17.0
func (s *WorkflowShadower) RegisterWorkflow(w interface{})
RegisterWorkflow registers workflow function to replay
func (*WorkflowShadower) RegisterWorkflowWithOptions ¶ added in v0.17.0
func (s *WorkflowShadower) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions)
RegisterWorkflowWithOptions registers workflow function with custom workflow name to replay
func (*WorkflowShadower) Run ¶ added in v0.17.0
func (s *WorkflowShadower) Run() error
Run starts WorkflowShadower in a blocking fashion
func (*WorkflowShadower) Stop ¶ added in v0.17.0
func (s *WorkflowShadower) Stop()
Stop stops WorkflowShadower and wait up to one miniute for all goroutines to finish before returning
type WorkflowStatus ¶ added in v0.17.0
type WorkflowStatus string
WorkflowStatus represents the status of a workflow
const ( // WorkflowStatusOpen is the WorkflowStatus for open workflows WorkflowStatusOpen WorkflowStatus = "OPEN" // WorkflowStatusClosed is the WorkflowStatus for closed workflows WorkflowStatusClosed WorkflowStatus = "CLOSED" // WorkflowStatusALL is the WorkflowStatus for all workflows WorkflowStatusALL WorkflowStatus = "ALL" )
func ToWorkflowStatus ¶ added in v0.17.0
func ToWorkflowStatus(statusString string) (WorkflowStatus, error)
ToWorkflowStatus converts workflow status from string type to WorkflowStatus type
type WorkflowTaskHandler ¶
type WorkflowTaskHandler interface { // Processes the workflow task // The response could be: // - RespondDecisionTaskCompletedRequest // - RespondDecisionTaskFailedRequest // - RespondQueryTaskCompletedRequest ProcessWorkflowTask( task *workflowTask, f decisionHeartbeatFunc, ) (response interface{}, err error) }
WorkflowTaskHandler represents decision task handlers.
type WorkflowTestSuite ¶
type WorkflowTestSuite struct {
// contains filtered or unexported fields
}
WorkflowTestSuite is the test suite to run unit tests for workflow/activity.
func (*WorkflowTestSuite) GetLogger ¶
func (s *WorkflowTestSuite) GetLogger() *zap.Logger
GetLogger gets the logger for this WorkflowTestSuite.
func (*WorkflowTestSuite) NewTestActivityEnvironment ¶
func (s *WorkflowTestSuite) NewTestActivityEnvironment() *TestActivityEnvironment
NewTestActivityEnvironment creates a new instance of TestActivityEnvironment. Use the returned TestActivityEnvironment to run your activity in the test environment.
func (*WorkflowTestSuite) NewTestWorkflowEnvironment ¶
func (s *WorkflowTestSuite) NewTestWorkflowEnvironment() *TestWorkflowEnvironment
NewTestWorkflowEnvironment creates a new instance of TestWorkflowEnvironment. Use the returned TestWorkflowEnvironment to run your workflow in the test environment.
func (*WorkflowTestSuite) SetContextPropagators ¶ added in v0.8.4
func (s *WorkflowTestSuite) SetContextPropagators(ctxProps []ContextPropagator)
SetContextPropagators sets the context propagators for this WorkflowTestSuite. If you don't set context propagators, test suite will not use context propagators
func (*WorkflowTestSuite) SetHeader ¶ added in v0.8.4
func (s *WorkflowTestSuite) SetHeader(header *shared.Header)
SetHeader sets the headers for this WorkflowTestSuite. If you don't set header, test suite will not pass headers to the workflow
func (*WorkflowTestSuite) SetLogger ¶
func (s *WorkflowTestSuite) SetLogger(logger *zap.Logger)
SetLogger sets the logger for this WorkflowTestSuite. If you don't set logger, test suite will create a default logger with Debug level logging enabled.
func (*WorkflowTestSuite) SetMetricsScope ¶
func (s *WorkflowTestSuite) SetMetricsScope(scope tally.Scope)
SetMetricsScope sets the metrics scope for this WorkflowTestSuite. If you don't set scope, test suite will use tally.NoopScope
type WorkflowType ¶
type WorkflowType struct {
Name string
}
WorkflowType identifies a workflow type.
Source Files ¶
- activity.go
- client.go
- context.go
- encoded.go
- encoding.go
- error.go
- headers.go
- interceptors.go
- internal_activity.go
- internal_decision_state_machine.go
- internal_event_handlers.go
- internal_logging_tags.go
- internal_poller_autoscaler.go
- internal_pressure_points.go
- internal_public.go
- internal_retry.go
- internal_task_handlers.go
- internal_task_pollers.go
- internal_time.go
- internal_utils.go
- internal_worker.go
- internal_worker_base.go
- internal_workflow.go
- internal_workflow_client.go
- internal_workflow_testsuite.go
- jwt_authorization.go
- propagation.go
- query_builder.go
- registry.go
- session.go
- tracer.go
- version.go
- worker.go
- workflow.go
- workflow_replayer.go
- workflow_replayer_utils.go
- workflow_shadower.go
- workflow_shadower_activities.go
- workflow_shadower_worker.go
- workflow_testsuite.go