internal

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2018 License: MIT Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const FeatureVersion = "1.0.0"

FeatureVersion is a semver that represents the feature set of this cadence client library support. This can be used for client capibility check, on Cadence server, for backward compatibility Format: MAJOR.MINOR.PATCH

View Source
const LibraryVersion = "0.7.0"

LibraryVersion is a semver that represents the version of this cadence client library. This represent API changes visibile to Cadence client side library consumers. I.e. developers that are writing workflows. So every time we change API that can affect them we have to change this number. Format: MAJOR.MINOR.PATCH

View Source
const QueryTypeStackTrace string = "__stack_trace"

QueryTypeStackTrace is the build in query type for Client.QueryWorkflow() call. Use this query type to get the call stack of the workflow. The result will be a string encoded in the EncodedValue.

Variables

View Source
var ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")

ErrActivityResultPending is returned from activity's implementation to indicate the activity is not completed when activity method returns. Activity needs to be completed by Client.CompleteActivity() separately. For example, if an activity require human interaction (like approve an expense report), the activity could return activity.ErrResultPending which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something that could report the activity completed event to cadence server via Client.CompleteActivity() API.

View Source
var ErrCanceled = NewCanceledError()

ErrCanceled is the error returned by Context.Err when the context is canceled.

ErrDeadlineExceeded is the error returned by Context.Err when the context's deadline passes.

View Source
var ErrMockStartChildWorkflowFailed = fmt.Errorf("start child workflow failed: %v", shared.ChildWorkflowExecutionFailedCauseWorkflowAlreadyRunning)

ErrMockStartChildWorkflowFailed is special error used to indicate the mocked child workflow should fail to start. This error is also exposed as public as testsuite.ErrMockStartChildWorkflowFailed

View Source
var ErrNoData = errors.New("no data available")

ErrNoData is returned when trying to extract strong typed data while there is no data available.

View Source
var ErrTooManyArg = errors.New("too many arguments")

ErrTooManyArg is returned when trying to extract strong typed data with more arguments than available data.

Functions

func EnableVerboseLogging

func EnableVerboseLogging(enable bool)

EnableVerboseLogging enable or disable verbose logging. This is for internal use only.

func GetActivityLogger

func GetActivityLogger(ctx context.Context) *zap.Logger

GetActivityLogger returns a logger that can be used in activity

func GetActivityMetricsScope

func GetActivityMetricsScope(ctx context.Context) tally.Scope

GetActivityMetricsScope returns a metrics scope that can be used in activity

func GetLogger

func GetLogger(ctx Context) *zap.Logger

GetLogger returns a logger to be used in workflow's context

func GetMetricsScope

func GetMetricsScope(ctx Context) tally.Scope

GetMetricsScope returns a metrics scope to be used in workflow's context

func Go

func Go(ctx Context, f func(ctx Context))

Go creates a new coroutine. It has similar semantic to goroutine in a context of the workflow.

func GoNamed

func GoNamed(ctx Context, name string, f func(ctx Context))

GoNamed creates a new coroutine with a given human readable name. It has similar semantic to goroutine in a context of the workflow. Name appears in stack traces that are blocked on this Channel.

func IsReplaying added in v0.5.1

func IsReplaying(ctx Context) bool

IsReplaying returns whether the current workflow code is replaying.

Warning! Never make decisions, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on this flag as it is going to break workflow determinism requirement. The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or metric reporting. Please note that Cadence already provide standard logging/metric via workflow.GetLogger(ctx) and workflow.GetMetricsScope(ctx), and those standard mechanism are replay-aware and it will automatically suppress during replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.

Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a workflow causes decision task to fail and cadence server will rescheduled later to retry.

func MutableSideEffect added in v0.6.1

func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) encoded.Value

MutableSideEffect executes the provided function once, then it looks up the history for the value with the given id. If there is no existing value, then it records the function result as a value with the given id on history; otherwise, it compares whether the existing value from history has changed from the new function result by calling the provided equals function. If they are equal, it returns the value without recording a new one in history;

otherwise, it records the new value with the same id on history.

Caution: do not use MutableSideEffect to modify closures. Always retrieve result from MutableSideEffect's encoded return value.

The difference between MutableSideEffect() and SideEffect() is that every new SideEffect() call in non-replay will result in a new marker being recorded on history. However, MutableSideEffect() only records a new marker if the value changed. During replay, MutableSideEffect() will not execute the function again, but it will return the exact same value as it was returning during the non-replay run.

One good use case of MutableSideEffect() is to access dynamically changing config without breaking determinism.

func NewDisconnectedContext added in v0.5.1

func NewDisconnectedContext(parent Context) (ctx Context, cancel CancelFunc)

NewDisconnectedContext returns a new context that won't propagate parent's cancellation to the new child context. One common use case is to do cleanup work after workflow is cancelled.

err := workflow.ExecuteActivity(ctx, ActivityFoo).Get(ctx, &activityFooResult)
if err != nil && cadence.IsCanceledError(ctx.Err()) {
  // activity failed, and workflow context is canceled
  disconnectedCtx, _ := workflow.newDisconnectedContext(ctx);
  workflow.ExecuteActivity(disconnectedCtx, handleCancellationActivity).Get(disconnectedCtx, nil)
  return err // workflow return CanceledError
}

func NewFuture

func NewFuture(ctx Context) (Future, Settable)

NewFuture creates a new future as well as associated Settable that is used to set its value.

func NewValue added in v0.6.1

func NewValue(data []byte) encoded.Value

NewValue creates a new encoded.Value which can be used to decode binary data returned by Cadence. For example: User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution. The response contains binary field PendingActivityInfo.HeartbeatDetails, which can be decoded by using:

var result string // This need to be same type as the one passed to RecordHeartbeat
NewValue(data).Get(&result)

func NewValues added in v0.6.1

func NewValues(data []byte) encoded.Values

NewValues creates a new encoded.Values which can be used to decode binary data returned by Cadence. For example: User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution. The response contains binary field PendingActivityInfo.HeartbeatDetails, which can be decoded by using:

var result1 string
var result2 int // These need to be same type as those arguments passed to RecordHeartbeat
NewValues(data).Get(&result1, &result2)

func Now

func Now(ctx Context) time.Time

Now returns the current time when the decision is started or replayed. The workflow needs to use this Now() to get the wall clock time instead of the Go lang library one.

func RecordActivityHeartbeat

func RecordActivityHeartbeat(ctx context.Context, details ...interface{})

RecordActivityHeartbeat sends heartbeat for the currently executing activity If the activity is either cancelled (or) workflow/activity doesn't exist then we would cancel the context with error context.Canceled.

TODO: we don't have a way to distinguish between the two cases when context is cancelled because
context doesn't support overriding value of ctx.Error.
TODO: Implement automatic heartbeating with cancellation through ctx.

details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you can check error TimeOutType()/Details().

func RegisterActivity

func RegisterActivity(activityFunc interface{})

RegisterActivity - register a activity function with the framework. A activity takes a context and input and returns a (result, error) or just error. Examples:

func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
func sampleActivity(ctx context.Context) (err error)
func sampleActivity() (result string, err error)
func sampleActivity(arg1 bool) (result int, err error)
func sampleActivity(arg1 bool) (err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if activityFunc doesn't comply with the expected format.

func RegisterActivityWithOptions

func RegisterActivityWithOptions(activityFunc interface{}, opts RegisterActivityOptions)

RegisterActivityWithOptions registers the activity function with options The user can use options to provide an external name for the activity or leave it empty if no external name is required. This can be used as

client.RegisterActivity(barActivity, RegisterActivityOptions{})
client.RegisterActivity(barActivity, RegisterActivityOptions{Name: "barExternal"})

An activity takes a context and input and returns a (result, error) or just error. Examples:

func sampleActivity(ctx context.Context, input []byte) (result []byte, err error)
func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error)
func sampleActivity(ctx context.Context) (err error)
func sampleActivity() (result string, err error)
func sampleActivity(arg1 bool) (result int, err error)
func sampleActivity(arg1 bool) (err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if activityFunc doesn't comply with the expected format.

func RegisterWorkflow

func RegisterWorkflow(workflowFunc interface{})

RegisterWorkflow - registers a workflow function with the framework. A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func RegisterWorkflowWithOptions

func RegisterWorkflowWithOptions(workflowFunc interface{}, opts RegisterWorkflowOptions)

RegisterWorkflowWithOptions registers the workflow function with options The user can use options to provide an external name for the workflow or leave it empty if no external name is required. This can be used as

client.RegisterWorkflow(sampleWorkflow, RegisterWorkflowOptions{})
client.RegisterWorkflow(sampleWorkflow, RegisterWorkflowOptions{Name: "foo"})

A workflow takes a cadence context and input and returns a (result, error) or just error. Examples:

func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context) (result []byte, err error)
func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error)

Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. This method calls panic if workflowFunc doesn't comply with the expected format.

func ReplayWorkflowExecution added in v0.7.0

func ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, logger *zap.Logger, domain string, execution WorkflowExecution) error

ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is the only optional parameter. Defaults to the noop logger.

func ReplayWorkflowHistory added in v0.7.0

func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error

ReplayWorkflowHistory executes a single decision task for the given history. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.

func ReplayWorkflowHistoryFromJSONFile added in v0.7.1

func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error

ReplayWorkflowHistoryFromJSONFile executes a single decision task for the given json history file. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.

func SetQueryHandler

func SetQueryHandler(ctx Context, queryType string, handler interface{}) error

SetQueryHandler sets the query handler to handle workflow query. The queryType specify which query type this handler should handle. The handler must be a function that returns 2 values. The first return value must be a serializable result. The second return value must be an error. The handler function could receive any number of input parameters. All the input parameter must be serializable. You should call workflow.SetQueryHandler() at the beginning of the workflow code. When client calls Client.QueryWorkflow() to cadence server, a task will be generated on server that will be dispatched to a workflow worker, which will replay the history events and then execute a query handler based on the query type. The query handler will be invoked out of the context of the workflow, meaning that the handler code must not use cadence context to do things like workflow.NewChannel(), workflow.Go() or to call any workflow blocking functions like Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive QueryFailedError. Example of workflow code that support query type "current_state":

func MyWorkflow(ctx workflow.Context, input string) error {
  currentState := "started" // this could be any serializable struct
  err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) {
    return currentState, nil
  })
  if err != nil {
    currentState = "failed to register query handler"
    return err
  }
  // your normal workflow code begins here, and you update the currentState as the code makes progress.
  currentState = "waiting timer"
  err = NewTimer(ctx, time.Hour).Get(ctx, nil)
  if err != nil {
    currentState = "timer failed"
    return err
  }

  currentState = "waiting activity"
  ctx = WithActivityOptions(ctx, myActivityOptions)
  err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil)
  if err != nil {
    currentState = "activity failed"
    return err
  }
  currentState = "done"
  return nil
}

func SetStickyWorkflowCacheSize added in v0.7.0

func SetStickyWorkflowCacheSize(cacheSize int)

SetStickyWorkflowCacheSize sets the cache size for sticky workflow cache. Sticky workflow execution is the affinity between decision tasks of a specific workflow execution to a specific worker. The affinity is set if sticky execution is enabled via Worker.Options (It is enabled by default unless disabled explicitly). The benefit of sticky execution is that workflow does not have to reconstruct the state by replaying from beginning of history events. But the cost is it consumes more memory as it rely on caching workflow execution's running state on the worker. The cache is shared between workers running within same process. This must be called before any worker is started. If not called, the default size of 10K (might change in future) will be used.

func SideEffect

func SideEffect(ctx Context, f func(ctx Context) interface{}) encoded.Value

SideEffect executes the provided function once, records its result into the workflow history. The recorded result on history will be returned without executing the provided function during replay. This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay. Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID. The only way to fail SideEffect is to panic which causes decision task failure. The decision task after timeout is rescheduled and re-executed giving SideEffect another chance to succeed.

Caution: do not use SideEffect to modify closures. Always retrieve result from SideEffect's encoded return value. For example this code is BROKEN:

// Bad example:
var random int
workflow.SideEffect(func(ctx workflow.Context) interface{} {
       random = rand.Intn(100)
       return nil
})
// random will always be 0 in replay, thus this code is non-deterministic
if random < 50 {
       ....
} else {
       ....
}

On replay the provided function is not executed, the random will always be 0, and the workflow could takes a different path breaking the determinism.

Here is the correct way to use SideEffect:

// Good example:
encodedRandom := SideEffect(func(ctx workflow.Context) interface{} {
      return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
if random < 50 {
       ....
} else {
       ....
}

func Sleep

func Sleep(ctx Context, d time.Duration) (err error)

Sleep pauses the current workflow for at least the duration d. A negative or zero duration causes Sleep to return immediately. Workflow code needs to use this Sleep() to sleep instead of the Go lang library one(timer.Sleep()). You can cancel the pending sleep by cancel the Context (using context from workflow.WithCancel(ctx)). Sleep() returns nil if the duration d is passed, or it returns *CanceledError if the ctx is canceled. There are 2 reasons the ctx could be canceled: 1) your workflow code cancel the ctx (with workflow.WithCancel(ctx)); 2) your workflow itself is canceled by external request. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithActivityTask

func WithActivityTask(
	ctx context.Context,
	task *shared.PollForActivityTaskResponse,
	taskList string,
	invoker ServiceInvoker,
	logger *zap.Logger,
	scope tally.Scope,
	dataConverter encoded.DataConverter,
) context.Context

WithActivityTask adds activity specific information into context. Use this method to unit test activity implementations that use context extractor methodshared.

func WithCancel

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

Types

type ActivityInfo

type ActivityInfo struct {
	TaskToken          []byte
	WorkflowExecution  WorkflowExecution
	ActivityID         string
	ActivityType       ActivityType
	TaskList           string
	HeartbeatTimeout   time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed.
	ScheduledTimestamp time.Time     // Time of activity scheduled by a workflow
	StartedTimestamp   time.Time     // Time of activity start
	Deadline           time.Time     // Time of activity timeout
}

ActivityInfo contains information about currently executing activity.

func GetActivityInfo

func GetActivityInfo(ctx context.Context) ActivityInfo

GetActivityInfo returns information about currently executing activity.

type ActivityOptions

type ActivityOptions struct {
	// TaskList that the activity needs to be scheduled on.
	// optional: The default task list with the same name as the workflow task list.
	TaskList string

	// ScheduleToCloseTimeout - The end to end time out for the activity needed.
	// The zero value of this uses default value.
	// Optional: The default value is the sum of ScheduleToStartTimeout and StartToCloseTimeout
	ScheduleToCloseTimeout time.Duration

	// ScheduleToStartTimeout - The queue time out before the activity starts executed.
	// Mandatory: No default.
	ScheduleToStartTimeout time.Duration

	// StartToCloseTimeout - The time out from the start of execution to end of it.
	// Mandatory: No default.
	StartToCloseTimeout time.Duration

	// HeartbeatTimeout - The periodic timeout while the activity is in execution. This is
	// the max interval the server needs to hear at-least one ping from the activity.
	// Optional: Default zero, means no heart beating is needed.
	HeartbeatTimeout time.Duration

	// WaitForCancellation - Whether to wait for cancelled activity to be completed(
	// activity can be failed, completed, cancel accepted)
	// Optional: default false
	WaitForCancellation bool

	// ActivityID - Business level activity ID, this is not needed for most of the cases if you have
	// to specify this then talk to cadence team. This is something will be done in future.
	// Optional: default empty string
	ActivityID string
}

ActivityOptions stores all activity-specific parameters that will be stored inside of a context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

type ActivityTaskHandler

type ActivityTaskHandler interface {
	// Executes the activity task
	// The response is one of the types:
	// - RespondActivityTaskCompletedRequest
	// - RespondActivityTaskFailedRequest
	// - RespondActivityTaskCanceledRequest
	Execute(taskList string, task *s.PollForActivityTaskResponse) (interface{}, error)
}

ActivityTaskHandler represents activity task handlers.

type ActivityType

type ActivityType struct {
	Name string
}

ActivityType identifies a activity type.

type CancelFunc

type CancelFunc func()

A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.

type CanceledError

type CanceledError struct {
	// contains filtered or unexported fields
}

CanceledError returned when operation was canceled.

func NewCanceledError

func NewCanceledError(details ...interface{}) *CanceledError

NewCanceledError creates CanceledError instance

func (*CanceledError) Details

func (e *CanceledError) Details(d ...interface{}) error

Details extracts strong typed detail data of this error.

func (*CanceledError) Error

func (e *CanceledError) Error() string

Error from error interface

func (*CanceledError) HasDetails added in v0.5.1

func (e *CanceledError) HasDetails() bool

HasDetails return if this error has strong typed detail data.

type Channel

type Channel interface {
	// Receive blocks until it receives a value, and then assigns the received value to the provided pointer.
	// Returns false when Channel is closed.
	// Parameter valuePtr is a pointer to the expected data structure to be received. For example:
	//  var v string
	//  c.Receive(ctx, &v)
	Receive(ctx Context, valuePtr interface{}) (more bool)

	// ReceiveAsync try to receive from Channel without blocking. If there is data available from the Channel, it
	// assign the data to valuePtr and returns true. Otherwise, it returns false immediately.
	ReceiveAsync(valuePtr interface{}) (ok bool)

	// ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be
	// more value from the Channel. The more is false when Channel is closed.
	ReceiveAsyncWithMoreFlag(valuePtr interface{}) (ok bool, more bool)

	// Send blocks until the data is sent.
	Send(ctx Context, v interface{})

	// SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false.
	SendAsync(v interface{}) (ok bool)

	// Close close the Channel, and prohibit subsequent sends.
	Close()
}

Channel must be used instead of native go channel by workflow code. Use workflow.NewChannel(ctx) method to create Channel instance.

func GetSignalChannel

func GetSignalChannel(ctx Context, signalName string) Channel

GetSignalChannel returns channel corresponding to the signal name.

func NewBufferedChannel

func NewBufferedChannel(ctx Context, size int) Channel

NewBufferedChannel create new buffered Channel instance

func NewChannel

func NewChannel(ctx Context) Channel

NewChannel create new Channel instance

func NewNamedBufferedChannel

func NewNamedBufferedChannel(ctx Context, name string, size int) Channel

NewNamedBufferedChannel create new BufferedChannel instance with a given human readable name. Name appears in stack traces that are blocked on this Channel.

func NewNamedChannel

func NewNamedChannel(ctx Context, name string) Channel

NewNamedChannel create new Channel instance with a given human readable name. Name appears in stack traces that are blocked on this channel.

type ChildWorkflowFuture

type ChildWorkflowFuture interface {
	Future
	// GetChildWorkflowExecution returns a future that will be ready when child workflow execution started. You can
	// get the WorkflowExecution of the child workflow from the future. Then you can use Workflow ID and RunID of
	// child workflow to cancel or send signal to child workflow.
	//  childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, child, ...)
	//  var childWE WorkflowExecution
	//  if err := childWorkflowFuture.GetChildWorkflowExecution().Get(&childWE); err == nil {
	//      // child workflow started, you can use childWE to get the WorkflowID and RunID of child workflow
	//  }
	GetChildWorkflowExecution() Future

	// SignalChildWorkflow sends a signal to the child workflow. This call will block until child workflow is started.
	SignalChildWorkflow(ctx Context, signalName string, data interface{}) Future
}

ChildWorkflowFuture represents the result of a child workflow execution

func ExecuteChildWorkflow

func ExecuteChildWorkflow(ctx Context, childWorkflow interface{}, args ...interface{}) ChildWorkflowFuture

ExecuteChildWorkflow requests child workflow execution in the context of a workflow. Context can be used to pass the settings for the child workflow. For example: task list that this child workflow should be routed, timeouts that need to be configured. Use ChildWorkflowOptions to pass down the options.

 cwo := ChildWorkflowOptions{
	    ExecutionStartToCloseTimeout: 10 * time.Minute,
	    TaskStartToCloseTimeout: time.Minute,
	}
 ctx := WithChildWorkflowOptions(ctx, cwo)

Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. Input args are the arguments that need to be passed to the child workflow function represented by childWorkflow. If the child workflow failed to complete then the future get error would indicate the failure and it can be one of CustomError, TimeoutError, CanceledError, GenericError. You can cancel the pending child workflow using context(workflow.WithCancel(ctx)) and that will fail the workflow with error CanceledError. ExecuteChildWorkflow returns ChildWorkflowFuture.

type ChildWorkflowOptions

type ChildWorkflowOptions struct {
	// Domain of the child workflow.
	// Optional: the current workflow (parent)'s domain will be used if this is not provided.
	Domain string

	// WorkflowID of the child workflow to be scheduled.
	// Optional: an auto generated workflowID will be used if this is not provided.
	WorkflowID string

	// TaskList that the child workflow needs to be scheduled on.
	// Optional: the parent workflow task list will be used if this is not provided.
	TaskList string

	// ExecutionStartToCloseTimeout - The end to end timeout for the child workflow execution.
	// Mandatory: no default
	ExecutionStartToCloseTimeout time.Duration

	// TaskStartToCloseTimeout - The decision task timeout for the child workflow.
	// Optional: default is 10s if this is not provided (or if 0 is provided).
	TaskStartToCloseTimeout time.Duration

	// ChildPolicy defines the behavior of child workflow when parent workflow is terminated.
	// Optional: default to use ChildWorkflowPolicyTerminate if this is not provided
	ChildPolicy ChildWorkflowPolicy

	// WaitForCancellation - Whether to wait for cancelled child workflow to be ended (child workflow can be ended
	// as: completed/failed/timedout/terminated/canceled)
	// Optional: default false
	WaitForCancellation bool

	// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
	// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate
	WorkflowIDReusePolicy WorkflowIDReusePolicy
}

ChildWorkflowOptions stores all child workflow specific parameters that will be stored inside of a Context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

type ChildWorkflowPolicy

type ChildWorkflowPolicy int32

ChildWorkflowPolicy defines child workflow behavior when parent workflow is terminated.

const (
	// ChildWorkflowPolicyTerminate is policy that will terminate all child workflows when parent workflow is terminated.
	ChildWorkflowPolicyTerminate ChildWorkflowPolicy = 0
	// ChildWorkflowPolicyRequestCancel is policy that will send cancel request to all open child workflows when parent
	// workflow is terminated.
	ChildWorkflowPolicyRequestCancel ChildWorkflowPolicy = 1
	// ChildWorkflowPolicyAbandon is policy that will have no impact to child workflow execution when parent workflow is
	// terminated.
	ChildWorkflowPolicyAbandon ChildWorkflowPolicy = 2
)

type Client

type Client interface {
	// StartWorkflow starts a workflow execution
	// The user can use this to start using a function or workflow type name.
	// Either by
	//     StartWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3)
	//     or
	//     StartWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3)
	// The errors it can return:
	//	- EntityNotExistsError, if domain does not exists
	//	- BadRequestError
	//	- WorkflowExecutionAlreadyStartedError
	//	- InternalServiceError
	// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
	// subjected to change in the future.
	StartWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (*WorkflowExecution, error)

	// ExecuteWorkflow starts a workflow execution and return a WorkflowRun instance and error
	// The user can use this to start using a function or workflow type name.
	// Either by
	//     ExecuteWorkflow(ctx, options, "workflowTypeName", arg1, arg2, arg3)
	//     or
	//     ExecuteWorkflow(ctx, options, workflowExecuteFn, arg1, arg2, arg3)
	// The errors it can return:
	//	- EntityNotExistsError, if domain does not exists
	//	- BadRequestError
	//	- WorkflowExecutionAlreadyStartedError
	//	- InternalServiceError
	//
	// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
	// subjected to change in the future.
	//
	// WorkflowRun has three methods:
	//  - GetID() string: which return workflow ID (which is same as StartWorkflowOptions.ID if provided)
	//  - GetRunID() string: which return the first started workflow run ID (please see below)
	//  - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow
	//    execution result to valuePtr, if workflow execution is a success, or return corresponding
	//    error. This is a blocking API.
	// NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
	// return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
	// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
	// Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError,
	// the second run has run ID "run ID 2" and return some result other than ContinueAsNewError:
	// GetRunID() will always return "run ID 1" and  Get(ctx context.Context, valuePtr interface{}) will return the result of second run.
	// NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
	ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error)

	// SignalWorkflow sends a signals to a workflow in execution
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// - signalName name to identify the signal.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- InternalServiceError
	SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error

	// SignalWithStartWorkflow sends a signal to a running workflow.
	// If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction.
	// - workflowID, signalName, signalArg are same as SignalWorkflow's parameters
	// - workflow, workflowArgs are same as StartWorkflow's parameters
	// - options.WorkflowIDReusePolicy will be ignored, and WorkflowIDReusePolicyAllowDuplicate will be used as the policy
	// The errors it can return:
	//  - EntityNotExistsError, if domain does not exist
	//  - BadRequestError
	//	- InternalServiceError
	SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{},
		options StartWorkflowOptions, workflow interface{}, workflowArgs ...interface{}) (*WorkflowExecution, error)

	// CancelWorkflow cancels a workflow in execution
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	CancelWorkflow(ctx context.Context, workflowID string, runID string) error

	// TerminateWorkflow terminates a workflow execution.
	// workflowID is required, other parameters are optional.
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error

	// GetWorkflowHistory gets history events of a particular workflow
	// - workflow ID of the workflow.
	// - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID.
	// - whether use long poll for tracking new events: when the workflow is running, there can be new events generated during iteration
	// 	 of HistoryEventIterator, if isLongPoll == true, then iterator will do long poll, tracking new history event, i.e. the iteration
	//   will not be finished until workflow is finished; if isLongPoll == false, then iterator will only return current history events.
	// - whether return all history events or just the last event, which contains the workflow execution end result
	// Example:-
	//	To iterate all events,
	//		iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
	//		events := []*shared.HistoryEvent{}
	//		for iter.HasNext() {
	//			event, err := iter.Next()
	//			if err != nil {
	//				return err
	//			}
	//			events = append(events, event)
	//		}
	GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType s.HistoryEventFilterType) HistoryEventIterator

	// CompleteActivity reports activity completed.
	// activity Execute method can return acitivity.activity.ErrResultPending to
	// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
	// should be called when that activity is completed with the actual result and error. If err is nil, activity task
	// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
	// activity task failed event will be reported.
	// An activity implementation should use GetActivityInfo(ctx).TaskToken function to get task token to use for completion.
	// Example:-
	//	To complete with a result.
	//  	CompleteActivity(token, "Done", nil)
	//	To fail the activity with an error.
	//      CompleteActivity(token, nil, cadence.NewCustomError("reason", details)
	// The activity can fail with below errors ErrorWithDetails, TimeoutError, CanceledError.
	CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error

	// CompleteActivityById reports activity completed.
	// Similar to CompleteActivity, but may save cadence user from keeping taskToken info.
	// activity Execute method can return activity.ErrResultPending to
	// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivityById() method
	// should be called when that activity is completed with the actual result and error. If err is nil, activity task
	// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
	// activity task failed event will be reported.
	// An activity implementation should use activityID provided in ActivityOption to use for completion.
	// domain name, workflowID, activityID are required, runID is optional.
	// The errors it can return:
	//  - ErrorWithDetails
	//  - TimeoutError
	//  - CanceledError
	CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string, result interface{}, err error) error

	// RecordActivityHeartbeat records heartbeat for an activity.
	// details - is the progress you want to record along with heart beat for this activity.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- InternalServiceError
	RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error

	// RecordActivityHeartbeatByID records heartbeat for an activity.
	// details - is the progress you want to record along with heart beat for this activity.
	// The errors it can return:
	//	- EntityNotExistsError
	//	- InternalServiceError
	RecordActivityHeartbeatByID(ctx context.Context, domain, workflowID, runID, activityID string, details ...interface{}) error

	// ListClosedWorkflow gets closed workflow executions based on request filters
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error)

	// ListClosedWorkflow gets open workflow executions based on request filters
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error)

	// QueryWorkflow queries a given workflow execution and returns the query result synchronously. Parameter workflowID
	// and queryType are required, other parameters are optional. The workflowID and runID (optional) identify the
	// target workflow execution that this query will be send to. If runID is not specified (empty string), server will
	// use the currently running execution of that workflowID. The queryType specifies the type of query you want to
	// run. By default, cadence supports "__stack_trace" as a standard query type, which will return string value
	// representing the call stack of the target workflow. The target workflow could also setup different query handler
	// to handle custom query types.
	// See comments at workflow.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details
	// on how to setup query handler within the target workflow.
	// - workflowID is required.
	// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
	// - queryType is the type of the query.
	// - args... are the optional query parameters.
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	//  - QueryFailError
	QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (encoded.Value, error)

	// DescribeWorkflowExecution returns information about the specified workflow execution.
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error)

	// DescribeTaskList returns information about the target tasklist, right now this API returns the
	// pollers which polled this tasklist in last few minutes.
	// The errors it can return:
	//  - BadRequestError
	//  - InternalServiceError
	//  - EntityNotExistError
	DescribeTaskList(ctx context.Context, tasklist string, tasklistType s.TaskListType) (*s.DescribeTaskListResponse, error)
}

Client is the client for starting and getting information about a workflow executions as well as completing activities asynchronously.

func NewClient

func NewClient(service workflowserviceclient.Interface, domain string, options *ClientOptions) Client

NewClient creates an instance of a workflow client

type ClientOptions

type ClientOptions struct {
	MetricsScope  tally.Scope
	Identity      string
	DataConverter encoded.DataConverter
}

ClientOptions are optional parameters for Client creation.

type Context

type Context interface {
	// Deadline returns the time when work done on behalf of this context
	// should be canceled.  Deadline returns ok==false when no deadline is
	// set.  Successive calls to Deadline return the same results.
	Deadline() (deadline time.Time, ok bool)

	// Done returns a channel that's closed when work done on behalf of this
	// context should be canceled.  Done may return nil if this context can
	// never be canceled.  Successive calls to Done return the same value.
	//
	// WithCancel arranges for Done to be closed when cancel is called;
	// WithDeadline arranges for Done to be closed when the deadline
	// expires; WithTimeout arranges for Done to be closed when the timeout
	// elapses.
	//
	// Done is provided for use in select statements:
	//
	//  // Stream generates values with DoSomething and sends them to out
	//  // until DoSomething returns an error or ctx.Done is closed.
	//  func Stream(ctx Context, out Channel) (err error) {
	//	for {
	//		v, err := DoSomething(ctx)
	//		if err != nil {
	//			return err
	//		}
	//		s := NewSelector(ctx)
	//		s.AddReceive(ctx.Done(),  func(v interface{}) { err = ctx.Err() })
	//		s.AddReceive(v, func(v interface{}, more bool) { out.Send(ctx, v) })
	//		s.Select(ctx)
	//		if err != nil {
	//			return err
	//		}
	//	}
	//  }
	//
	// See http://blog.golang.org/pipelines for more examples of how to use
	// a Done channel for cancellation.
	Done() Channel

	// Err returns a non-nil error value after Done is closed.  Err returns
	// Canceled if the context was canceled or DeadlineExceeded if the
	// context's deadline passed.  No other values for Err are defined.
	// After Done is closed, successive calls to Err return the same value.
	Err() error

	// Value returns the value associated with this context for key, or nil
	// if no value is associated with key.  Successive calls to Value with
	// the same key returns the same result.
	//
	// Use context values only for request-scoped data that transits
	// processes and API boundaries, not for passing optional parameters to
	// functions.
	//
	// A key identifies a specific value in a Context.  Functions that wish
	// to store values in Context typically allocate a key in a global
	// variable then use that key as the argument to context.WithValue and
	// Context.Value.  A key can be any type that supports equality;
	// packages should define keys as an unexported type to avoid
	// collisions.
	//
	// Packages that define a Context key should provide type-safe accessors
	// for the values stores using that key:
	//
	// 	// Package user defines a User type that's stored in Contexts.
	// 	package user
	//
	// 	import "golang.org/x/net/context"
	//
	// 	// User is the type of value stored in the Contexts.
	// 	type User struct {...}
	//
	// 	// key is an unexported type for keys defined in this package.
	// 	// This prevents collisions with keys defined in other packages.
	// 	type key int
	//
	// 	// userKey is the key for user.User values in Contexts.  It is
	// 	// unexported; clients use user.NewContext and user.FromContext
	// 	// instead of using this key directly.
	// 	var userKey key = 0
	//
	// 	// NewContext returns a new Context that carries value u.
	// 	func NewContext(ctx context.Context, u *User) context.Context {
	// 		return context.WithValue(ctx, userKey, u)
	// 	}
	//
	// 	// FromContext returns the User value stored in ctx, if any.
	// 	func FromContext(ctx context.Context) (*User, bool) {
	// 		u, ok := ctx.Value(userKey).(*User)
	// 		return u, ok
	// 	}
	Value(key interface{}) interface{}
}

Context is a clone of context.Context with Done() returning Channel instead of native channel. A Context carries a deadline, a cancellation signal, and other values across API boundaries.

Context's methods may be called by multiple goroutines simultaneously.

func WithActivityOptions

func WithActivityOptions(ctx Context, options ActivityOptions) Context

WithActivityOptions adds all options to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithChildPolicy

func WithChildPolicy(ctx Context, childPolicy ChildWorkflowPolicy) Context

WithChildPolicy adds a ChildWorkflowPolicy to the context.

func WithChildWorkflowOptions

func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context

WithChildWorkflowOptions adds all workflow options to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithDataConverter added in v0.7.0

func WithDataConverter(ctx Context, dc encoded.DataConverter) Context

WithDataConverter adds DataConverter to the context.

func WithExecutionStartToCloseTimeout

func WithExecutionStartToCloseTimeout(ctx Context, d time.Duration) Context

WithExecutionStartToCloseTimeout adds a workflow execution timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithHeartbeatTimeout

func WithHeartbeatTimeout(ctx Context, d time.Duration) Context

WithHeartbeatTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithLocalActivityOptions added in v0.5.1

func WithLocalActivityOptions(ctx Context, options LocalActivityOptions) Context

WithLocalActivityOptions adds local activity options to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithScheduleToCloseTimeout

func WithScheduleToCloseTimeout(ctx Context, d time.Duration) Context

WithScheduleToCloseTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithScheduleToStartTimeout

func WithScheduleToStartTimeout(ctx Context, d time.Duration) Context

WithScheduleToStartTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithStartToCloseTimeout

func WithStartToCloseTimeout(ctx Context, d time.Duration) Context

WithStartToCloseTimeout adds a timeout to the copy of the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func WithTaskList

func WithTaskList(ctx Context, name string) Context

WithTaskList adds a task list to the copy of the context.

func WithValue

func WithValue(parent Context, key interface{}, val interface{}) Context

WithValue returns a copy of parent in which the value associated with key is val.

Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.

func WithWaitForCancellation

func WithWaitForCancellation(ctx Context, wait bool) Context

WithWaitForCancellation adds wait for the cacellation to the copy of the context.

func WithWorkflowDomain

func WithWorkflowDomain(ctx Context, name string) Context

WithWorkflowDomain adds a domain to the context.

func WithWorkflowID

func WithWorkflowID(ctx Context, workflowID string) Context

WithWorkflowID adds a workflowID to the context.

func WithWorkflowTaskList

func WithWorkflowTaskList(ctx Context, name string) Context

WithWorkflowTaskList adds a task list to the context.

func WithWorkflowTaskStartToCloseTimeout

func WithWorkflowTaskStartToCloseTimeout(ctx Context, d time.Duration) Context

WithWorkflowTaskStartToCloseTimeout adds a decision timeout to the context. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

type ContinueAsNewError

type ContinueAsNewError struct {
	// contains filtered or unexported fields
}

ContinueAsNewError contains information about how to continue the workflow as new.

func NewContinueAsNewError

func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) *ContinueAsNewError

NewContinueAsNewError creates ContinueAsNewError instance If the workflow main function returns this error then the current execution is ended and the new execution with same workflow ID is started automatically with options provided to this function.

 ctx - use context to override any options for the new workflow like execution time out, decision task time out, task list.
	  if not mentioned it would use the defaults that the current workflow is using.
       ctx := WithExecutionStartToCloseTimeout(ctx, 30 * time.Minute)
       ctx := WithWorkflowTaskStartToCloseTimeout(ctx, time.Minute)
	  ctx := WithWorkflowTaskList(ctx, "example-group")
 wfn - workflow function. for new execution it can be different from the currently running.
 args - arguments for the new workflow.

func (*ContinueAsNewError) Error

func (e *ContinueAsNewError) Error() string

Error from error interface

type CustomError

type CustomError struct {
	// contains filtered or unexported fields
}

CustomError returned from workflow and activity implementations with reason and optional details.

func NewCustomError

func NewCustomError(reason string, details ...interface{}) *CustomError

NewCustomError create new instance of *CustomError with reason and optional details.

func (*CustomError) Details

func (e *CustomError) Details(d ...interface{}) error

Details extracts strong typed detail data of this custom error. If there is no details, it will return ErrNoData.

func (*CustomError) Error

func (e *CustomError) Error() string

Error from error interface

func (*CustomError) HasDetails added in v0.5.1

func (e *CustomError) HasDetails() bool

HasDetails return if this error has strong typed detail data.

func (*CustomError) Reason

func (e *CustomError) Reason() string

Reason gets the reason of this custom error

type DomainClient

type DomainClient interface {
	// Register a domain with cadence server
	// The errors it can throw:
	//	- DomainAlreadyExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Register(ctx context.Context, request *s.RegisterDomainRequest) error

	// Describe a domain. The domain has 3 part of information
	// DomainInfo - Which has Name, Status, Description, Owner Email
	// DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics.
	// ReplicationConfiguration - replication config like clusters and active cluster name
	// The errors it can throw:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Describe(ctx context.Context, name string) (*s.DescribeDomainResponse, error)

	// Update a domain.
	// The errors it can throw:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Update(ctx context.Context, request *s.UpdateDomainRequest) error
}

DomainClient is the client for managing operations on the domain. CLI, tools, ... can use this layer to manager operations on domain.

func NewDomainClient

func NewDomainClient(service workflowserviceclient.Interface, options *ClientOptions) DomainClient

NewDomainClient creates an instance of a domain client, to manager lifecycle of domains.

type EncodedValue

type EncodedValue struct {
	// contains filtered or unexported fields
}

EncodedValue is type alias used to encapsulate/extract encoded result from workflow/activity.

func (EncodedValue) Get

func (b EncodedValue) Get(valuePtr interface{}) error

Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.

func (EncodedValue) HasValue added in v0.5.1

func (b EncodedValue) HasValue() bool

HasValue return whether there is value encoded.

type EncodedValues

type EncodedValues struct {
	// contains filtered or unexported fields
}

EncodedValues is a type alias used to encapsulate/extract encoded arguments from workflow/activity.

func (EncodedValues) Get

func (b EncodedValues) Get(valuePtr ...interface{}) error

Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.

func (EncodedValues) HasValues added in v0.5.1

func (b EncodedValues) HasValues() bool

HasValues return whether there are values encoded.

type ErrorDetailsValues added in v0.7.0

type ErrorDetailsValues []interface{}

ErrorDetailsValues is a type alias used hold error details objects.

func (ErrorDetailsValues) Get added in v0.7.0

func (b ErrorDetailsValues) Get(valuePtr ...interface{}) error

Get extract data from encoded data to desired value type. valuePtr is pointer to the actual value type.

func (ErrorDetailsValues) HasValues added in v0.7.0

func (b ErrorDetailsValues) HasValues() bool

HasValues return whether there are values.

type Future

type Future interface {
	// Get blocks until the future is ready. When ready it either returns non nil error or assigns result value to
	// the provided pointer.
	// Example:
	//  var v string
	//  if err := f.Get(ctx, &v); err != nil {
	//      return err
	//  }
	Get(ctx Context, valuePtr interface{}) error

	// When true Get is guaranteed to not block
	IsReady() bool
}

Future represents the result of an asynchronous computation.

func ExecuteActivity

func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteActivity requests activity execution in the context of a workflow. Context can be used to pass the settings for this activity. For example: task list that this need to be routed, timeouts that need to be configured. Use ActivityOptions to pass down the options.

 ao := ActivityOptions{
	    TaskList: "exampleTaskList",
	    ScheduleToStartTimeout: 10 * time.Second,
	    StartToCloseTimeout: 5 * time.Second,
	    ScheduleToCloseTimeout: 10 * time.Second,
	    HeartbeatTimeout: 0,
	}
	ctx := WithActivityOptions(ctx, ao)

Or to override a single option

ctx := WithTaskList(ctx, "exampleTaskList")

Input activity is either an activity name (string) or a function representing an activity that is getting scheduled. Input args are the arguments that need to be passed to the scheduled activity.

If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity using context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.

ExecuteActivity returns Future with activity result or failure.

func ExecuteLocalActivity added in v0.5.1

func ExecuteLocalActivity(ctx Context, activity interface{}, args ...interface{}) Future

ExecuteLocalActivity requests to run a local activity. A local activity is like a regular activity with some key differences: * Local activity is scheduled and run by the workflow worker locally. * Local activity does not need Cadence server to schedule activity task and does not rely on activity worker. * No need to register local activity. * The parameter activity to ExecuteLocalActivity() must be a function. * Local activity is for short living activities (usually finishes within seconds). * Local activity cannot heartbeat.

Context can be used to pass the settings for this local activity. For now there is only one setting for timeout to be set:

 lao := LocalActivityOptions{
	    ScheduleToCloseTimeout: 5 * time.Second,
	}
	ctx := WithLocalActivityOptions(ctx, lao)

The timeout here should be relative shorter than the DecisionTaskStartToCloseTimeout of the workflow. If you need a longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is designed to be used for short living activities (usually finishes within seconds).

Input args are the arguments that will to be passed to the local activity. The input args will be hand over directly to local activity function without serialization/deserialization because we don't need to pass the input across process boundary. However, the result will still go through serialization/deserialization because we need to record the result as history to cadence server so if the workflow crashes, a different worker can replay the history without running the local activity again.

If the activity failed to complete then the future get error would indicate the failure, and it can be one of CustomError, TimeoutError, CanceledError, PanicError, GenericError. You can cancel the pending activity by cancel the context(workflow.WithCancel(ctx)) and that will fail the activity with error CanceledError.

ExecuteLocalActivity returns Future with local activity result or failure.

func NewTimer

func NewTimer(ctx Context, d time.Duration) Future

NewTimer returns immediately and the future becomes ready after the specified duration d. The workflow needs to use this NewTimer() to get the timer instead of the Go lang library one(timer.NewTimer()). You can cancel the pending timer by cancel the Context (using context from workflow.WithCancel(ctx)) and that will cancel the timer. After timer is canceled, the returned Future become ready, and Future.Get() will return *CanceledError. The current timer resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

func RequestCancelExternalWorkflow added in v0.5.1

func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future

RequestCancelExternalWorkflow can be used to request cancellation of an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:

ctx := WithWorkflowDomain(ctx, "domain-name")

RequestCancelExternalWorkflow return Future with failure or empty success result.

func SignalExternalWorkflow added in v0.5.1

func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future

SignalExternalWorkflow can be used to send signal info to an external workflow. Input workflowID is the workflow ID of target workflow. Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, then the currently running instance of that workflowID will be used. By default, the current workflow's domain will be used as target domain. However, you can specify a different domain of the target workflow using the context like:

ctx := WithWorkflowDomain(ctx, "domain-name")

SignalExternalWorkflow return Future with failure or empty success result.

type GenericError

type GenericError struct {
	// contains filtered or unexported fields
}

GenericError returned from workflow/workflow when the implementations return errors other than from NewCustomError() API.

func (*GenericError) Error

func (e *GenericError) Error() string

Error from error interface

type HistoryEventIterator

type HistoryEventIterator interface {
	// HasNext return whether this iterator has next value
	HasNext() bool
	// Next returns the next history events and error
	// The errors it can return:
	//	- EntityNotExistsError
	//	- BadRequestError
	//	- InternalServiceError
	Next() (*s.HistoryEvent, error)
}

HistoryEventIterator represents the interface for history event iterator

type HistoryIterator

type HistoryIterator interface {
	// GetNextPage returns next page of history events
	GetNextPage() (*s.History, error)
	// Reset resets the internal state so next GetNextPage() call will return first page of events from beginning.
	Reset()
	// HasNextPage returns if there are more page of events
	HasNextPage() bool
}

HistoryIterator iterator through history events

type LocalActivityOptions added in v0.5.1

type LocalActivityOptions struct {
	// ScheduleToCloseTimeout - The end to end timeout for the local activity.
	// This field is required.
	ScheduleToCloseTimeout time.Duration
}

LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.

type MockCallWrapper

type MockCallWrapper struct {
	// contains filtered or unexported fields
}

MockCallWrapper is a wrapper to mock.Call. It offers the ability to wait on workflow's clock instead of wall clock.

func (*MockCallWrapper) After

After sets how long to wait on workflow's clock before the mock call returns.

func (*MockCallWrapper) Once

func (c *MockCallWrapper) Once() *MockCallWrapper

Once indicates that that the mock should only return the value once.

func (*MockCallWrapper) Return

func (c *MockCallWrapper) Return(returnArguments ...interface{}) *MockCallWrapper

Return specifies the return arguments for the expectation.

func (*MockCallWrapper) Run

func (c *MockCallWrapper) Run(fn func(args mock.Arguments)) *MockCallWrapper

Run sets a handler to be called before returning. It can be used when mocking a method such as unmarshalers that takes a pointer to a struct and sets properties in such struct.

func (*MockCallWrapper) Times

func (c *MockCallWrapper) Times(i int) *MockCallWrapper

Times indicates that that the mock should only return the indicated number of times.

func (*MockCallWrapper) Twice

func (c *MockCallWrapper) Twice() *MockCallWrapper

Twice indicates that that the mock should only return the value twice.

type NonDeterministicWorkflowPolicy added in v0.7.0

type NonDeterministicWorkflowPolicy int

NonDeterministicWorkflowPolicy is an enum for configuring how client's decision task handler deals with mismatched history events (presumably arising from non-deterministic workflow definitions).

const (
	// NonDeterministicWorkflowPolicyBlockWorkflow is the default policy for handling detected non-determinism.
	// This option simply logs to console with an error message that non-determinism is detected, but
	// does *NOT* reply anything back to the server.
	// It is chosen as default for backward compatibility reasons because it preserves the old behavior
	// for handling non-determinism that we had before NonDeterministicWorkflowPolicy type was added to
	// allow more configurability.
	NonDeterministicWorkflowPolicyBlockWorkflow NonDeterministicWorkflowPolicy = iota
	// NonDeterministicWorkflowPolicyFailWorkflow behaves exactly the same as Ignore, up until the very
	// end of processing a decision task.
	// Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request
	// to fail the workflow execution.
	NonDeterministicWorkflowPolicyFailWorkflow
)

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

PanicError contains information about panicked workflow/activity.

func (*PanicError) Error

func (e *PanicError) Error() string

Error from error interface

func (*PanicError) StackTrace

func (e *PanicError) StackTrace() string

StackTrace return stack trace of the panic

type RegisterActivityOptions

type RegisterActivityOptions struct {
	Name string
}

RegisterActivityOptions consists of options for registering an activity

type RegisterWorkflowOptions

type RegisterWorkflowOptions struct {
	Name string
}

RegisterWorkflowOptions consists of options for registering a workflow

type Selector

type Selector interface {
	AddReceive(c Channel, f func(c Channel, more bool)) Selector
	AddSend(c Channel, v interface{}, f func()) Selector
	AddFuture(future Future, f func(f Future)) Selector
	AddDefault(f func())
	Select(ctx Context)
}

Selector must be used instead of native go select by workflow code. Use workflow.NewSelector(ctx) method to create a Selector instance.

func NewNamedSelector

func NewNamedSelector(ctx Context, name string) Selector

NewNamedSelector creates a new Selector instance with a given human readable name. Name appears in stack traces that are blocked on this Selector.

func NewSelector

func NewSelector(ctx Context) Selector

NewSelector creates a new Selector instance.

type ServiceInvoker

type ServiceInvoker interface {
	// Returns ActivityTaskCanceledError if activity is cancelled
	Heartbeat(details []byte) error
	Close()
}

ServiceInvoker abstracts calls to the Cadence service from an activity implementation. Implement to unit test activities.

type Settable

type Settable interface {
	Set(value interface{}, err error)
	SetValue(value interface{})
	SetError(err error)
	Chain(future Future) // Value (or error) of the future become the same of the chained one.
}

Settable is used to set value or error on a future. See more: workflow.NewFuture(ctx).

type StartWorkflowOptions

type StartWorkflowOptions struct {
	// ID - The business identifier of the workflow execution.
	// Optional: defaulted to a uuid.
	ID string

	// TaskList - The decisions of the workflow are scheduled on this queue.
	// This is also the default task list on which activities are scheduled. The workflow author can choose
	// to override this using activity options.
	// Mandatory: No default.
	TaskList string

	// ExecutionStartToCloseTimeout - The time out for duration of workflow execution.
	// The resolution is seconds.
	// Mandatory: No default.
	ExecutionStartToCloseTimeout time.Duration

	// DecisionTaskStartToCloseTimeout - The time out for processing decision task from the time the worker
	// pulled this task.
	// The resolution is seconds.
	// Optional: defaulted to 20 secs.
	DecisionTaskStartToCloseTimeout time.Duration

	// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
	// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate.
	// Optional: defaulted to WorkflowIDReusePolicyAllowDuplicateFailedOnly.
	WorkflowIDReusePolicy WorkflowIDReusePolicy
}

StartWorkflowOptions configuration parameters for starting a workflow execution. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

type TerminatedError

type TerminatedError struct {
}

TerminatedError returned when workflow was terminated.

func (*TerminatedError) Error

func (e *TerminatedError) Error() string

Error from error interface

type TestActivityEnvironment

type TestActivityEnvironment struct {
	// contains filtered or unexported fields
}

TestActivityEnvironment is the environment that you use to test activity

func (*TestActivityEnvironment) ExecuteActivity

func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args ...interface{}) (encoded.Value, error)

ExecuteActivity executes an activity. The tested activity will be executed synchronously in the calling goroutinue. Caller should use encoded.Value.Get() to extract strong typed result value.

func (*TestActivityEnvironment) ExecuteLocalActivity added in v0.5.1

func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (encoded.Value, error)

ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue. Caller should use encoded.Value.Get() to extract strong typed result value.

func (*TestActivityEnvironment) SetTestTimeout added in v0.7.0

func (t *TestActivityEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestActivityEnvironment

SetTestTimeout sets the wall clock timeout for this activity test run. When test timeout happen, it means activity is taking too long.

func (*TestActivityEnvironment) SetWorkerOptions

func (t *TestActivityEnvironment) SetWorkerOptions(options WorkerOptions) *TestActivityEnvironment

SetWorkerOptions sets the WorkerOptions that will be use by TestActivityEnvironment. TestActivityEnvironment will use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored. Note: WorkerOptions is defined in internal package, use public type worker.Options instead.

type TestWorkflowEnvironment

type TestWorkflowEnvironment struct {
	mock.Mock
	// contains filtered or unexported fields
}

TestWorkflowEnvironment is the environment that you use to test workflow

func (*TestWorkflowEnvironment) CancelWorkflow

func (t *TestWorkflowEnvironment) CancelWorkflow()

CancelWorkflow requests cancellation (through workflow Context) to the currently running test workflow.

func (*TestWorkflowEnvironment) CompleteActivity

func (t *TestWorkflowEnvironment) CompleteActivity(taskToken []byte, result interface{}, err error) error

CompleteActivity complete an activity that had returned activity.ErrResultPending error

func (*TestWorkflowEnvironment) ExecuteWorkflow

func (t *TestWorkflowEnvironment) ExecuteWorkflow(workflowFn interface{}, args ...interface{})

ExecuteWorkflow executes a workflow, wait until workflow complete. It will fail the test if workflow is blocked and cannot complete within TestTimeout (set by SetTestTimeout()).

func (*TestWorkflowEnvironment) GetWorkflowError

func (t *TestWorkflowEnvironment) GetWorkflowError() error

GetWorkflowError return the error from test workflow

func (*TestWorkflowEnvironment) GetWorkflowResult

func (t *TestWorkflowEnvironment) GetWorkflowResult(valuePtr interface{}) error

GetWorkflowResult extracts the encoded result from test workflow, it returns error if the extraction failed.

func (*TestWorkflowEnvironment) IsWorkflowCompleted

func (t *TestWorkflowEnvironment) IsWorkflowCompleted() bool

IsWorkflowCompleted check if test is completed or not

func (*TestWorkflowEnvironment) Now

Now returns the current workflow time (a.k.a workflow.Now() time) of this TestWorkflowEnvironment.

func (*TestWorkflowEnvironment) OnActivity

func (t *TestWorkflowEnvironment) OnActivity(activity interface{}, args ...interface{}) *MockCallWrapper

OnActivity setup a mock call for activity. Parameter activity must be activity function (func) or activity name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked activity, or it should be mock values with the same types as the mocked activity function returns. Example: assume the activity you want to mock has function signature as:

func MyActivity(ctx context.Context, msg string) (string, error)

You can mock it by return a function with exact same signature:

t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return(func(ctx context.Context, msg string) (string, error) {
   // your mock function implementation
   return "", nil
})

OR return mock values with same types as activity function's return types:

t.OnActivity(MyActivity, mock.Anything, mock.Anything).Return("mock_result", nil)

func (*TestWorkflowEnvironment) OnRequestCancelExternalWorkflow added in v0.5.1

func (t *TestWorkflowEnvironment) OnRequestCancelExternalWorkflow(domainName, workflowID, runID string) *MockCallWrapper

OnRequestCancelExternalWorkflow setup a mock for cancellation of external workflow. This TestWorkflowEnvironment handles cancellation of workflows that are started from the root workflow. For example, cancellation sent from parent to child workflows. Or cancellation between 2 child workflows. However, it does not know what to do if your tested workflow code is sending cancellation to external unknown workflows. In that case, you will need to setup mock for those signal calls. Some examples of how to setup mock:

  • mock for specific target workflow that matches specific workflow ID and run ID env.OnSignalExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1").Return(nil).Once()
  • mock for anything and succeed the cancellation env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
  • mock for anything and fail the cancellation env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once()
  • mock function for SignalExternalWorkflow env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything).Return( func(domainName, workflowID, runID) error { // you can do differently based on the parameters return nil })

func (*TestWorkflowEnvironment) OnSignalExternalWorkflow added in v0.5.1

func (t *TestWorkflowEnvironment) OnSignalExternalWorkflow(domainName, workflowID, runID, signalName, arg interface{}) *MockCallWrapper

OnSignalExternalWorkflow setup a mock for sending signal to external workflow. This TestWorkflowEnvironment handles sending signals between the workflows that are started from the root workflow. For example, sending signals between parent and child workflows. Or sending signals between 2 child workflows. However, it does not know what to do if your tested workflow code is sending signal to external unknown workflows. In that case, you will need to setup mock for those signal calls. Some examples of how to setup mock:

  • mock for specific target workflow that matches specific signal name and signal data env.OnSignalExternalWorkflow("test-domain", "test-workflow-id1", "test-runid1", "test-signal", "test-data").Return(nil).Once()
  • mock for anything and succeed the send env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
  • mock for anything and fail the send env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("unknown external workflow")).Once()
  • mock function for SignalExternalWorkflow env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( func(domainName, workflowID, runID, signalName string, arg interface{}) error { // you can do differently based on the parameters return nil })

func (*TestWorkflowEnvironment) OnWorkflow

func (t *TestWorkflowEnvironment) OnWorkflow(workflow interface{}, args ...interface{}) *MockCallWrapper

OnWorkflow setup a mock call for workflow. Parameter workflow must be workflow function (func) or workflow name (string). You must call Return() with appropriate parameters on the returned *MockCallWrapper instance. The supplied parameters to the Return() call should either be a function that has exact same signature as the mocked workflow, or it should be mock values with the same types as the mocked workflow function returns. Example: assume the workflow you want to mock has function signature as:

func MyChildWorkflow(ctx workflow.Context, msg string) (string, error)

You can mock it by return a function with exact same signature:

t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return(func(ctx workflow.Context, msg string) (string, error) {
   // your mock function implementation
   return "", nil
})

OR return mock values with same types as workflow function's return types:

t.OnWorkflow(MyChildWorkflow, mock.Anything, mock.Anything).Return("mock_result", nil)

You could also setup mock to simulate start child workflow failure case by returning ErrMockStartChildWorkflowFailed as error.

func (*TestWorkflowEnvironment) QueryWorkflow

func (t *TestWorkflowEnvironment) QueryWorkflow(queryType string, args ...interface{}) (encoded.Value, error)

QueryWorkflow queries to the currently running test workflow and returns result synchronously.

func (*TestWorkflowEnvironment) RegisterDelayedCallback

func (t *TestWorkflowEnvironment) RegisterDelayedCallback(callback func(), delayDuration time.Duration)

RegisterDelayedCallback creates a new timer with specified delayDuration using workflow clock (not wall clock). When the timer fires, the callback will be called. By default, this test suite uses mock clock which automatically move forward to fire next timer when workflow is blocked. Use this API to make some event (like activity completion, signal or workflow cancellation) at desired time.

func (*TestWorkflowEnvironment) SetActivityTaskList

func (t *TestWorkflowEnvironment) SetActivityTaskList(tasklist string, activityFn ...interface{})

SetActivityTaskList set the affinity between activity and tasklist. By default, activity can be invoked by any tasklist in this test environment. Use this SetActivityTaskList() to set affinity between activity and a tasklist. Once activity is set to a particular tasklist, that activity will only be available to that tasklist.

func (*TestWorkflowEnvironment) SetOnActivityCanceledListener

func (t *TestWorkflowEnvironment) SetOnActivityCanceledListener(
	listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment

SetOnActivityCanceledListener sets a listener that will be called after an activity is canceled. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnActivityCompletedListener

func (t *TestWorkflowEnvironment) SetOnActivityCompletedListener(
	listener func(activityInfo *ActivityInfo, result encoded.Value, err error)) *TestWorkflowEnvironment

SetOnActivityCompletedListener sets a listener that will be called after an activity is completed. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnActivityHeartbeatListener

func (t *TestWorkflowEnvironment) SetOnActivityHeartbeatListener(
	listener func(activityInfo *ActivityInfo, details encoded.Values)) *TestWorkflowEnvironment

SetOnActivityHeartbeatListener sets a listener that will be called when activity heartbeat. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnActivityStartedListener

func (t *TestWorkflowEnvironment) SetOnActivityStartedListener(
	listener func(activityInfo *ActivityInfo, ctx context.Context, args encoded.Values)) *TestWorkflowEnvironment

SetOnActivityStartedListener sets a listener that will be called before activity starts execution. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowCanceledListener(
	listener func(workflowInfo *WorkflowInfo)) *TestWorkflowEnvironment

SetOnChildWorkflowCanceledListener sets a listener that will be called when a child workflow is canceled. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.

func (*TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowCompletedListener(
	listener func(workflowInfo *WorkflowInfo, result encoded.Value, err error)) *TestWorkflowEnvironment

SetOnChildWorkflowCompletedListener sets a listener that will be called after a child workflow is completed. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.

func (*TestWorkflowEnvironment) SetOnChildWorkflowStartedListener

func (t *TestWorkflowEnvironment) SetOnChildWorkflowStartedListener(
	listener func(workflowInfo *WorkflowInfo, ctx Context, args encoded.Values)) *TestWorkflowEnvironment

SetOnChildWorkflowStartedListener sets a listener that will be called before a child workflow starts execution. Note: WorkflowInfo is defined in internal package, use public type workflow.Info instead.

func (*TestWorkflowEnvironment) SetOnLocalActivityCanceledListener added in v0.5.1

func (t *TestWorkflowEnvironment) SetOnLocalActivityCanceledListener(
	listener func(activityInfo *ActivityInfo)) *TestWorkflowEnvironment

SetOnLocalActivityCanceledListener sets a listener that will be called after local activity is canceled. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnLocalActivityCompletedListener added in v0.5.1

func (t *TestWorkflowEnvironment) SetOnLocalActivityCompletedListener(
	listener func(activityInfo *ActivityInfo, result encoded.Value, err error)) *TestWorkflowEnvironment

SetOnLocalActivityCompletedListener sets a listener that will be called after local activity is completed. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnLocalActivityStartedListener added in v0.5.1

func (t *TestWorkflowEnvironment) SetOnLocalActivityStartedListener(
	listener func(activityInfo *ActivityInfo, ctx context.Context, args []interface{})) *TestWorkflowEnvironment

SetOnLocalActivityStartedListener sets a listener that will be called before local activity starts execution. Note: ActivityInfo is defined in internal package, use public type activity.Info instead.

func (*TestWorkflowEnvironment) SetOnTimerCancelledListener

func (t *TestWorkflowEnvironment) SetOnTimerCancelledListener(listener func(timerID string)) *TestWorkflowEnvironment

SetOnTimerCancelledListener sets a listener that will be called after a timer is cancelled

func (*TestWorkflowEnvironment) SetOnTimerFiredListener

func (t *TestWorkflowEnvironment) SetOnTimerFiredListener(listener func(timerID string)) *TestWorkflowEnvironment

SetOnTimerFiredListener sets a listener that will be called after a timer is fired.

func (*TestWorkflowEnvironment) SetOnTimerScheduledListener

func (t *TestWorkflowEnvironment) SetOnTimerScheduledListener(
	listener func(timerID string, duration time.Duration)) *TestWorkflowEnvironment

SetOnTimerScheduledListener sets a listener that will be called before a timer is scheduled.

func (*TestWorkflowEnvironment) SetStartTime added in v0.6.1

func (t *TestWorkflowEnvironment) SetStartTime(startTime time.Time)

SetStartTime sets the start time of the workflow. This is optional, default start time will be the wall clock time when workflow starts. Start time is the workflow.Now(ctx) time at the beginning of the workflow.

func (*TestWorkflowEnvironment) SetTestTimeout

func (t *TestWorkflowEnvironment) SetTestTimeout(idleTimeout time.Duration) *TestWorkflowEnvironment

SetTestTimeout sets the wall clock timeout for this workflow test run. When test timeout happen, it means workflow is blocked and cannot make progress. This could happen if workflow is waiting for activity result for too long. This is real wall clock time, not the workflow time (a.k.a workflow.Now() time).

func (*TestWorkflowEnvironment) SetWorkerOptions

func (t *TestWorkflowEnvironment) SetWorkerOptions(options WorkerOptions) *TestWorkflowEnvironment

SetWorkerOptions sets the WorkerOptions for TestWorkflowEnvironment. TestWorkflowEnvironment will use options set by use options of Identity, MetricsScope and BackgroundActivityContext on the WorkerOptions. Other options are ignored. Note: WorkerOptions is defined in internal package, use public type worker.Options instead.

func (*TestWorkflowEnvironment) SignalWorkflow

func (t *TestWorkflowEnvironment) SignalWorkflow(name string, input interface{})

SignalWorkflow sends signal to the currently running test workflow.

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

TimeoutError returned when activity or child workflow timed out.

func NewHeartbeatTimeoutError

func NewHeartbeatTimeoutError(details ...interface{}) *TimeoutError

NewHeartbeatTimeoutError creates TimeoutError instance

func NewTimeoutError

func NewTimeoutError(timeoutType shared.TimeoutType) *TimeoutError

NewTimeoutError creates TimeoutError instance. Use NewHeartbeatTimeoutError to create heartbeat TimeoutError

func (*TimeoutError) Details

func (e *TimeoutError) Details(d ...interface{}) error

Details extracts strong typed detail data of this error. If there is no details, it will return ErrNoData.

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

Error from error interface

func (*TimeoutError) HasDetails added in v0.5.1

func (e *TimeoutError) HasDetails() bool

HasDetails return if this error has strong typed detail data.

func (*TimeoutError) TimeoutType

func (e *TimeoutError) TimeoutType() shared.TimeoutType

TimeoutType return timeout type of this error

type Version

type Version int

Version represents a change version. See GetVersion call.

const DefaultVersion Version = -1

DefaultVersion is a version returned by GetVersion for code that wasn't versioned before

func GetVersion

func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version

GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It is not allowed to update workflow code while there are workflows running as it is going to break determinism. The solution is to have both old code that is used to replay existing workflows as well as the new one that is used when it is executed for the first time. GetVersion returns maxSupported version when is executed for the first time. This version is recorded into the workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. For example initially workflow has the following code:

err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)

it should be updated to

err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)

The backwards compatible way to execute the update is

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 1)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
}

Then bar has to be changed to baz:

v :=  GetVersion(ctx, "fooChange", DefaultVersion, 2)
if v  == DefaultVersion {
    err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil)
} else if v == 1 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

Later when there are no workflow executions running DefaultVersion the correspondent branch can be removed:

v :=  GetVersion(ctx, "fooChange", 1, 2)
if v == 1 {
    err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil)
} else {
    err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)
}

It is recommended to keep the GetVersion() call even if single branch is left:

GetVersion(ctx, "fooChange", 2, 2)
err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil)

The reason to keep it is: 1) it ensures that if there is older version execution still running, it will fail here and not proceed; 2) if you ever need to make more changes for “fooChange”, for example change activity from baz to qux, you just need to update the maxVersion from 2 to 3.

Note that, you only need to preserve the first call to GetVersion() for each changeID. All subsequent call to GetVersion() with same changeID are safe to remove. However, if you really want to get rid of the first GetVersion() call as well, you can do so, but you need to make sure: 1) all older version executions are completed; 2) you can no longer use “fooChange” as changeID. If you ever need to make changes to that same part like change from baz to qux, you would need to use a different changeID like “fooChange-fix2”, and start minVersion from DefaultVersion again. The code would looks like:

v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 1)
if v == workflow.DefaultVersion {
  err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil)
} else {
  err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil)
}

type Worker

type Worker interface {
	// Start starts the worker in a non-blocking fashion
	Start() error
	// Run is a blocking start and cleans up resources when killed
	// returns error only if it fails to start the worker
	Run() error
	// Stop cleans up any resources opened by worker
	Stop()
}

Worker represents objects that can be started and stopped.

func NewWorker

func NewWorker(
	service workflowserviceclient.Interface,
	domain string,
	taskList string,
	options WorkerOptions,
) Worker

NewWorker creates an instance of worker for managing workflow and activity executions. service - thrift connection to the cadence server. domain - the name of the cadence domain. taskList - is the task list name you use to identify your client worker, also

identifies group of workflow and activity implementations that are hosted by a single worker process.

options - configure any worker specific options like logger, metrics, identity.

type WorkerOptions

type WorkerOptions struct {
	// Optional: To set the maximum concurrent activity executions this worker can have.
	// The zero value of this uses the default value.
	// default: defaultMaxConcurrentActivityExecutionSize(1k)
	MaxConcurrentActivityExecutionSize int

	// Optional: Sets the rate limiting on number of activities that can be executed per second per
	// worker. This can be used to limit resources used by the worker.
	// Notice that the number is represented in float, so that you can set it to less than
	// 1 if needed. For example, set the number to 0.1 means you want your activity to be executed
	// once for every 10 seconds. This can be used to protect down stream services from flooding.
	// The zero value of this uses the default value. Default: 100k
	WorkerActivitiesPerSecond float64

	// Optional: To set the maximum concurrent local activity executions this worker can have.
	// The zero value of this uses the default value.
	// default: 1k
	MaxConcurrentLocalActivityExecutionSize int

	// Optional: Sets the rate limiting on number of local activities that can be executed per second per
	// worker. This can be used to limit resources used by the worker.
	// Notice that the number is represented in float, so that you can set it to less than
	// 1 if needed. For example, set the number to 0.1 means you want your local activity to be executed
	// once for every 10 seconds. This can be used to protect down stream services from flooding.
	// The zero value of this uses the default value. Default: 100k
	WorkerLocalActivitiesPerSecond float64

	// Optional: Sets the rate limiting on number of activities that can be executed per second.
	// This is managed by the server and controls activities per second for your entire tasklist
	// whereas WorkerActivityTasksPerSecond controls activities only per worker.
	// Notice that the number is represented in float, so that you can set it to less than
	// 1 if needed. For example, set the number to 0.1 means you want your activity to be executed
	// once for every 10 seconds. This can be used to protect down stream services from flooding.
	// The zero value of this uses the default value. Default: 100k
	TaskListActivitiesPerSecond float64

	// Optional: if the activities need auto heart beating for those activities
	// by the framework
	// default: false not to heartbeat.
	AutoHeartBeat bool

	// Optional: Sets an identify that can be used to track this host for debugging.
	// default: default identity that include hostname, groupName and process ID.
	Identity string

	// Optional: Metrics to be reported.
	// default: no metrics.
	MetricsScope tally.Scope

	// Optional: Logger framework can use to log.
	// default: default logger provided.
	Logger *zap.Logger

	// Optional: Enable logging in replay.
	// In the workflow code you can use workflow.GetLogger(ctx) to write logs. By default, the logger will skip log
	// entry during replay mode so you won't see duplicate logs. This option will enable the logging in replay mode.
	// This is only useful for debugging purpose.
	// default: false
	EnableLoggingInReplay bool

	// Optional: Disable running workflow workers.
	// default: false
	DisableWorkflowWorker bool

	// Optional: Disable running activity workers.
	// default: false
	DisableActivityWorker bool

	// Optional: Disable sticky execution.
	// default: false
	// Sticky Execution is to run the decision tasks for one workflow execution on same worker host. This is an
	// optimization for workflow execution. When sticky execution is enabled, worker keeps the workflow state in
	// memory. New decision task contains the new history events will be dispatched to the same worker. If this
	// worker crashes, the sticky decision task will timeout after StickyScheduleToStartTimeout, and cadence server
	// will clear the stickiness for that workflow execution and automatically reschedule a new decision task that
	// is available for any worker to pick up and resume the progress.
	DisableStickyExecution bool

	// Optional: Sticky schedule to start timeout.
	// default: 5s
	// The resolution is seconds. See details about StickyExecution on the comments for DisableStickyExecution.
	StickyScheduleToStartTimeout time.Duration

	// Optional: sets context for activity. The context can be used to pass any configuration to activity
	// like common logger for all activities.
	BackgroundActivityContext context.Context

	// Optional: Sets how decision worker deals with non-deterministic history events
	// (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow definition changes).
	// default: NonDeterministicWorkflowPolicyBlockWorkflow, which just logs error but reply nothing back to server
	NonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy

	// Optional: Sets DataConverter to customize serialization/deserialization of arguments in Cadence
	// default: defaultDataConverter, an combination of thriftEncoder and jsonEncoder
	DataConverter encoded.DataConverter
}

WorkerOptions is used to configure a worker instance. The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is subjected to change in the future.

type WorkflowExecution

type WorkflowExecution struct {
	ID    string
	RunID string
}

WorkflowExecution Details.

type WorkflowExecutionContext added in v0.7.1

type WorkflowExecutionContext interface {
	sync.Locker
	ProcessWorkflowTask(task *s.PollForDecisionTaskResponse, historyIterator HistoryIterator) (completeRequest interface{}, err error)
	ProcessLocalActivityResult(lar *localActivityResult) (interface{}, error)
	// CompleteDecisionTask try to complete current decision task and get response that needs to be sent back to server.
	// The waitLocalActivity is used to control if we should wait for outstanding local activities.
	// If there is no outstanding local activities or if waitLocalActivity is false, the complete will return response
	// which will be one of following:
	// - RespondDecisionTaskCompletedRequest
	// - RespondDecisionTaskFailedRequest
	// - RespondQueryTaskCompletedRequest
	// If waitLocalActivity is true, and there is outstanding local activities, this call will return nil.
	CompleteDecisionTask(waitLocalActivity bool) interface{}
	// GetDecisionTimeout returns the TaskStartToCloseTimeout
	GetDecisionTimeout() time.Duration
	GetCurrentDecisionTask() *s.PollForDecisionTaskResponse
	StackTrace() string
}

WorkflowExecutionContext represents one instance of workflow execution state in memory. Lock must be obtained before calling into any methods.

type WorkflowIDReusePolicy

type WorkflowIDReusePolicy int

WorkflowIDReusePolicy defines workflow ID reuse behavior.

const (
	// WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution
	// when workflow not running, and the last execution close state is in
	// [terminated, cancelled, timeouted, failed].
	WorkflowIDReusePolicyAllowDuplicateFailedOnly WorkflowIDReusePolicy = iota

	// WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using
	// the same workflow ID,when workflow not running.
	WorkflowIDReusePolicyAllowDuplicate

	// WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all
	WorkflowIDReusePolicyRejectDuplicate
)

type WorkflowInfo

type WorkflowInfo struct {
	WorkflowExecution                   WorkflowExecution
	WorkflowType                        WorkflowType
	TaskListName                        string
	ExecutionStartToCloseTimeoutSeconds int32
	TaskStartToCloseTimeoutSeconds      int32
	Domain                              string
}

WorkflowInfo information about currently executing workflow

func GetWorkflowInfo

func GetWorkflowInfo(ctx Context) *WorkflowInfo

GetWorkflowInfo extracts info of a current workflow from a context.

type WorkflowRun

type WorkflowRun interface {
	// GetID return workflow ID, which will be same as StartWorkflowOptions.ID if provided.
	GetID() string

	// GetRunID return the first started workflow run ID (please see below)
	GetRunID() string

	// Get will fill the workflow execution result to valuePtr,
	// if workflow execution is a success, or return corresponding,
	// error. This is a blocking API.
	Get(ctx context.Context, valuePtr interface{}) error
}

WorkflowRun represents a started non child workflow

type WorkflowTaskHandler

type WorkflowTaskHandler interface {
	// Processes the workflow task
	// The response could be:
	// - RespondDecisionTaskCompletedRequest
	// - RespondDecisionTaskFailedRequest
	// - RespondQueryTaskCompletedRequest
	ProcessWorkflowTask(
		task *s.PollForDecisionTaskResponse,
		historyIterator HistoryIterator) (response interface{}, w WorkflowExecutionContext, err error)
}

WorkflowTaskHandler represents decision task handlers.

type WorkflowTestSuite

type WorkflowTestSuite struct {
	// contains filtered or unexported fields
}

WorkflowTestSuite is the test suite to run unit tests for workflow/activity.

func (*WorkflowTestSuite) GetLogger

func (s *WorkflowTestSuite) GetLogger() *zap.Logger

GetLogger gets the logger for this WorkflowTestSuite.

func (*WorkflowTestSuite) NewTestActivityEnvironment

func (s *WorkflowTestSuite) NewTestActivityEnvironment() *TestActivityEnvironment

NewTestActivityEnvironment creates a new instance of TestActivityEnvironment. Use the returned TestActivityEnvironment to run your activity in the test environment.

func (*WorkflowTestSuite) NewTestWorkflowEnvironment

func (s *WorkflowTestSuite) NewTestWorkflowEnvironment() *TestWorkflowEnvironment

NewTestWorkflowEnvironment creates a new instance of TestWorkflowEnvironment. Use the returned TestWorkflowEnvironment to run your workflow in the test environment.

func (*WorkflowTestSuite) SetLogger

func (s *WorkflowTestSuite) SetLogger(logger *zap.Logger)

SetLogger sets the logger for this WorkflowTestSuite. If you don't set logger, test suite will create a default logger with Debug level logging enabled.

func (*WorkflowTestSuite) SetMetricsScope

func (s *WorkflowTestSuite) SetMetricsScope(scope tally.Scope)

SetMetricsScope sets the metrics scope for this WorkflowTestSuite. If you don't set scope, test suite will use tally.NoopScope

type WorkflowType

type WorkflowType struct {
	Name string
}

WorkflowType identifies a workflow type.

Directories

Path Synopsis
cmd
dummy
This file exists to force compilation of all code that doesn't have unit tests.
This file exists to force compilation of all code that doesn't have unit tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL