Documentation ¶
Overview ¶
Package worker contains functions to manage lifecycle of a Cadence client side worker.
Index ¶
- Constants
- func EnableVerboseLogging(enable bool)
- func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
- func ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, ...) error
- func ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error
- func ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error
- func SetBinaryChecksum(checksum string)
- func SetStickyWorkflowCacheSize(cacheSize int)
- type ActivityRegistry
- type AuthorizationProvider
- type NonDeterministicWorkflowPolicy
- type OAuthConfig
- type Options
- type Registry
- type ReplayOptions
- type ShadowExitCondition
- type ShadowMode
- type ShadowOptions
- type TimeFilter
- type Worker
- type WorkflowRegistry
- type WorkflowReplayer
- type WorkflowShadower
Constants ¶
const ( // NonDeterministicWorkflowPolicyBlockWorkflow is the default policy for handling detected non-determinism. // This option simply logs to console with an error message that non-determinism is detected, but // does *NOT* reply anything back to the server. // It is chosen as default for backward compatibility reasons because it preserves the old behavior // for handling non-determinism that we had before NonDeterministicWorkflowPolicy type was added to // allow more configurability. NonDeterministicWorkflowPolicyBlockWorkflow = internal.NonDeterministicWorkflowPolicyBlockWorkflow // NonDeterministicWorkflowPolicyFailWorkflow behaves exactly the same as Ignore, up until the very // end of processing a decision task. // Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request // to fail the workflow execution. NonDeterministicWorkflowPolicyFailWorkflow = internal.NonDeterministicWorkflowPolicyFailWorkflow )
const ( // ShadowModeNormal is the default mode for workflow shadowing. // Shadowing will complete after all workflows matches WorkflowQuery have been replayed. ShadowModeNormal = internal.ShadowModeNormal // ShadowModeContinuous mode will start a new round of shadowing // after all workflows matches WorkflowQuery have been replayed. // There will be a 5 min wait period between each round, // currently this wait period is not configurable. // Shadowing will complete only when ExitCondition is met. // ExitCondition must be specified when using this mode ShadowModeContinuous = internal.ShadowModeContinuous )
Variables ¶
This section is empty.
Functions ¶
func EnableVerboseLogging ¶
func EnableVerboseLogging(enable bool)
EnableVerboseLogging enable or disable verbose logging of internal Cadence library components. Most customers don't need this feature, unless advised by the Cadence team member. Also there is no guarantee that this API is not going to change.
func ReplayPartialWorkflowHistoryFromJSONFile ¶ added in v0.9.1
func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error
ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the json history file upto provided // lastEventID(inclusive), downloaded from the cli. To download the history file: cadence workflow showid <workflow_id> -of <output_filename> See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
func ReplayWorkflowExecution ¶ added in v0.7.0
func ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, logger *zap.Logger, domain string, execution workflow.Execution) error
ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is the only optional parameter. Defaults to the noop logger.
func ReplayWorkflowHistory ¶ added in v0.7.0
ReplayWorkflowHistory executes a single decision task for the given json history file. Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
func ReplayWorkflowHistoryFromJSONFile ¶ added in v0.7.1
ReplayWorkflowHistoryFromJSONFile executes a single decision task for the json history file downloaded from the cli. To download the history file: cadence workflow showid <workflow_id> -of <output_filename> See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. The logger is an optional parameter. Defaults to the noop logger.
func SetBinaryChecksum ¶ added in v0.10.0
func SetBinaryChecksum(checksum string)
SetBinaryChecksum sets the identifier of the binary(aka BinaryChecksum). The identifier is mainly used in recording reset points when respondDecisionTaskCompleted. For each workflow, the very first decision completed by a binary will be associated as a auto-reset point for the binary. So that when a customer wants to mark the binary as bad, the workflow will be reset to that point -- which means workflow will forget all progress generated by the binary. On another hand, once the binary is marked as bad, the bad binary cannot poll decision and make any progress any more.
func SetStickyWorkflowCacheSize ¶ added in v0.7.0
func SetStickyWorkflowCacheSize(cacheSize int)
SetStickyWorkflowCacheSize sets the cache size for sticky workflow cache. Sticky workflow execution is the affinity between decision tasks of a specific workflow execution to a specific worker. The affinity is set if sticky execution is enabled via Worker.Options (It is enabled by default unless disabled explicitly). The benefit of sticky execution is that workflow does not have to reconstruct the state by replaying from beginning of history events. But the cost is it consumes more memory as it rely on caching workflow execution's running state on the worker. The cache is shared between workers running within same process. This must be called before any worker is started. If not called, the default size of 10K (might change in future) will be used.
Types ¶
type ActivityRegistry ¶ added in v0.16.0
type ActivityRegistry interface { // RegisterActivity - register an activity function or a pointer to a structure with the worker. // An activity function takes a context and input and returns a (result, error) or just error. // // And activity struct is a structure with all its exported methods treated as activities. The default // name of each activity is the method name. // // Examples: // func sampleActivity(ctx context.Context, input []byte) (result []byte, err error) // func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) // func sampleActivity(ctx context.Context) (err error) // func sampleActivity() (result string, err error) // func sampleActivity(arg1 bool) (result int, err error) // func sampleActivity(arg1 bool) (err error) // // type Activities struct { // // fields // } // func (a *Activities) SampleActivity1(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) { // ... // } // // func (a *Activities) SampleActivity2(ctx context.Context, arg1 int, arg2 *customerStruct) (result string, err error) { // ... // } // // Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. // This method panics if activityFunc doesn't comply with the expected format or an activity with the same // type name is registered more than once. // For global registration consider activity.Register RegisterActivity(a interface{}) // RegisterActivityWithOptions registers the activity function or struct pointer with options. // The user can use options to provide an external name for the activity or leave it empty if no // external name is required. This can be used as // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{}) // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{Name: "barExternal"}) // When registering the structure that implements activities the name is used as a prefix that is // prepended to the activity method name. // worker.RegisterActivityWithOptions(&Activities{ ... }, RegisterActivityOptions{Name: "MyActivities_"}) // To override each name of activities defined through a structure register the methods one by one: // activities := &Activities{ ... } // worker.RegisterActivityWithOptions(activities.SampleActivity1, RegisterActivityOptions{Name: "Sample1"}) // worker.RegisterActivityWithOptions(activities.SampleActivity2, RegisterActivityOptions{Name: "Sample2"}) // See RegisterActivity function for more info. // The other use of options is to disable duplicated activity registration check // which might be useful for integration tests. // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true}) RegisterActivityWithOptions(a interface{}, options activity.RegisterOptions) }
ActivityRegistry exposes activity registration functions to consumers.
type AuthorizationProvider ¶ added in v0.19.0
type AuthorizationProvider = auth.AuthorizationProvider
AuthorizationProvider is the interface that contains the method to get the auth token
func NewAdminJwtAuthorizationProvider ¶ added in v0.19.0
func NewAdminJwtAuthorizationProvider(privateKey []byte) AuthorizationProvider
NewAdminJwtAuthorizationProvider creates a JwtAuthorizationProvider instance.
func NewOAuthAuthorizationProvider ¶ added in v1.2.8
func NewOAuthAuthorizationProvider(config OAuthConfig) AuthorizationProvider
NewOAuthAuthorizationProvider asks for a token from external OAuth provider
type NonDeterministicWorkflowPolicy ¶ added in v0.7.0
type NonDeterministicWorkflowPolicy = internal.NonDeterministicWorkflowPolicy
NonDeterministicWorkflowPolicy is an enum for configuring how client's decision task handler deals with mismatched history events (presumably arising from non-deterministic workflow definitions).
type OAuthConfig ¶ added in v1.2.8
type OAuthConfig = internal.OAuthAuthorizerConfig
OAuthConfig allows to configure external OAuth token provider.
type Options ¶
type Options = internal.WorkerOptions
Options is used to configure a worker instance.
func AugmentWorkerOptions ¶ added in v1.2.8
AugmentWorkerOptions fill all unset worker Options fields with their default values Use as getter for default worker options
type Registry ¶ added in v0.16.0
type Registry interface { WorkflowRegistry ActivityRegistry }
Registry exposes registration functions to consumers.
type ReplayOptions ¶ added in v0.17.0
type ReplayOptions = internal.ReplayOptions
ReplayOptions is used to configure the replay decision task worker.
type ShadowExitCondition ¶ added in v0.17.0
type ShadowExitCondition = internal.ShadowExitCondition
ShadowExitCondition configures when the workflow shadower should exit. If not specified shadower will exit after replaying all workflows satisfying the visibility query.
type ShadowMode ¶ added in v0.17.0
type ShadowMode = internal.ShadowMode
ShadowMode is an enum for configuring if shadowing should continue after all workflows matches the WorkflowQuery have been replayed.
type ShadowOptions ¶ added in v0.17.0
type ShadowOptions = internal.ShadowOptions
ShadowOptions is used to configure a WorkflowShadower.
type TimeFilter ¶ added in v0.17.0
type TimeFilter = internal.TimeFilter
TimeFilter represents a time range through the min and max timestamp
type Worker ¶
type Worker interface { Registry // Start starts the worker in a non-blocking fashion Start() error // Run is a blocking start and cleans up resources when killed // returns error only if it fails to start the worker Run() error // Stop cleans up any resources opened by worker Stop() }
Worker hosts workflow and activity implementations. Use worker.New(...) to create an instance.
func New ¶
func New( service workflowserviceclient.Interface, domain string, taskList string, options Options, ) Worker
New creates an instance of worker for managing workflow and activity executions.
service - thrift connection to the cadence server domain - the name of the cadence domain taskList - is the task list name you use to identify your client worker, also identifies group of workflow and activity implementations that are hosted by a single worker process options - configure any worker specific options like logger, metrics, identity
type WorkflowRegistry ¶ added in v0.16.0
type WorkflowRegistry interface { // RegisterWorkflow - registers a workflow function with the worker. // A workflow takes a workflow.Context and input and returns a (result, error) or just error. // Examples: // func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error) // func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error) // func sampleWorkflow(ctx workflow.Context) (result []byte, err error) // func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error) // Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. // For global registration consider workflow.Register // This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow RegisterWorkflow(w interface{}) // RegisterWorkflowWithOptions registers the workflow function with options. // The user can use options to provide an external name for the workflow or leave it empty if no // external name is required. This can be used as // worker.RegisterWorkflowWithOptions(sampleWorkflow, RegisterWorkflowOptions{}) // worker.RegisterWorkflowWithOptions(sampleWorkflow, RegisterWorkflowOptions{Name: "foo"}) // This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow // type name twice. Use workflow.RegisterOptions.DisableAlreadyRegisteredCheck to allow multiple registrations. RegisterWorkflowWithOptions(w interface{}, options workflow.RegisterOptions) }
WorkflowRegistry exposes workflow registration functions to consumers.
type WorkflowReplayer ¶ added in v0.12.2
type WorkflowReplayer interface { WorkflowRegistry ActivityRegistry // ReplayWorkflowHistory executes a single decision task for the given json history file. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. ReplayWorkflowHistory(logger *zap.Logger, history *shared.History) error // ReplayWorkflowHistoryFromJSONFile executes a single decision task for the json history file downloaded from the cli. // To download the history file: cadence workflow showid <workflow_id> -of <output_filename> // See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. // // Deprecated: prefer ReplayWorkflowHistoryFromJSON ReplayWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string) error // ReplayPartialWorkflowHistoryFromJSONFile executes a single decision task for the json history file upto provided // lastEventID(inclusive), downloaded from the cli. // To download the history file: cadence workflow showid <workflow_id> -of <output_filename> // See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. // // Deprecated: prefer ReplayPartialWorkflowHistoryFromJSON ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName string, lastEventID int64) error // ReplayWorkflowExecution loads a workflow execution history from the Cadence service and executes a single decision task for it. // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is the only optional parameter. Defaults to the noop logger. ReplayWorkflowExecution(ctx context.Context, service workflowserviceclient.Interface, logger *zap.Logger, domain string, execution workflow.Execution) error // ReplayWorkflowHistoryFromJSON executes a single decision task for the json history file downloaded from the cli. // To download the history file: // cadence workflow showid <workflow_id> -of <output_filename> // See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. ReplayWorkflowHistoryFromJSON(logger *zap.Logger, reader io.Reader) error // ReplayPartialWorkflowHistoryFromJSON executes a single decision task for the json history file upto provided // lastEventID(inclusive), downloaded from the cli. // To download the history file: // cadence workflow showid <workflow_id> -of <output_filename> // See https://github.com/uber/cadence/blob/master/tools/cli/README.md for full documentation // Use for testing the backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. ReplayPartialWorkflowHistoryFromJSON(logger *zap.Logger, reader io.Reader, lastEventID int64) error }
WorkflowReplayer supports replaying a workflow from its event history. Use for troubleshooting and backwards compatibility unit tests. For example if a workflow failed in production then its history can be downloaded through UI or CLI and replayed in a debugger as many times as necessary. Use this class to create unit tests that check if workflow changes are backwards compatible. It is important to maintain backwards compatibility through use of workflow.GetVersion to ensure that new deployments are not going to break open workflows.
func NewWorkflowReplayer ¶ added in v0.12.2
func NewWorkflowReplayer() WorkflowReplayer
NewWorkflowReplayer creates a WorkflowReplayer instance.
func NewWorkflowReplayerWithOptions ¶ added in v0.17.0
func NewWorkflowReplayerWithOptions( options ReplayOptions, ) WorkflowReplayer
NewWorkflowReplayerWithOptions creates an instance of the WorkflowReplayer with provided replay worker options
type WorkflowShadower ¶ added in v0.17.0
type WorkflowShadower interface { WorkflowRegistry Run() error }
WorkflowShadower retrieves and replays workflow history from Cadence service to determine if there's any nondeterministic changes in the workflow definition
func NewWorkflowShadower ¶ added in v0.17.0
func NewWorkflowShadower( service workflowserviceclient.Interface, domain string, shadowOptions ShadowOptions, replayOptions ReplayOptions, logger *zap.Logger, ) (WorkflowShadower, error)
NewWorkflowShadower creates a WorkflowShadower instance.