Documentation
¶
Index ¶
- Constants
- Variables
- func GetFinalProcessType(wf Process) string
- func GetFinalStateId(asyncState AsyncState) string
- func GetOpenApiErrorBody(err error) string
- func IsClientError(err error) bool
- func IsProcessAlreadyStartedError(err error) bool
- func IsProcessNotExistsError(err error) bool
- func IsRPCExecutionError(err error) bool
- func IsRPCLockingFailure(err error) bool
- func IsWaitingExceedingTimeoutError(err error) bool
- func NewApiError(originalError error, openApiError *xcapi.GenericOpenAPIError, ...) error
- func NewInternalError(format string, args ...interface{}) error
- func NewInvalidArgumentError(tpl string, arg ...interface{}) error
- func NewProcessDefinitionError(tpl string, arg ...interface{}) error
- func ShouldSkipWaitUntilAPI(state AsyncState) bool
- type ApiError
- type AsyncState
- type AsyncStateDefaults
- type AsyncStateDefaultsSkipWaitUntil
- type AsyncStateOptions
- type BasicClient
- type BasicClientProcessOptions
- type Client
- type ClientOptions
- type Command
- type CommandRequest
- type CommandResults
- type CommandType
- type Communication
- type Context
- type DBColumnDef
- type DBHint
- type DBTableSchema
- type GlobalAttributesSchema
- type InternalSDKError
- type InvalidArgumentError
- type LocalAttributeDef
- type LocalAttributeLoadingType
- type LocalAttributePolicy
- type LocalAttributesSchema
- type LocalQueueCommand
- type LocalQueueCommandResult
- type LocalQueuePublishMessage
- type LocalQueuePublishOptions
- type NamedPersistencePolicy
- type Object
- type ObjectEncoder
- type Persistence
- type PersistenceSchema
- type PersistenceSchemaOptions
- type Process
- type ProcessAbnormalExitError
- type ProcessDefaults
- type ProcessDefinitionError
- type ProcessOptions
- type ProcessStartOptions
- type Registry
- type StateDecision
- type StateMovement
- type StateSchema
- type TablePolicy
- type TimerCommand
- type TimerResult
- type WorkerExecutionError
- type WorkerOptions
- type WorkerService
Constants ¶
const ( DefaultNamespace = "default" DefaultWorkerPort = "8803" DefaultServerPort = "8801" DefaultWorkerUrl = "http://localhost:" + DefaultWorkerPort DefaultServerUrl = "http://localhost:" + DefaultServerPort )
const ( ApiPathAsyncStateWaitUntil = "/api/v1/xcherry/worker/async-state/wait-until" ApiPathAsyncStateExecute = "/api/v1/xcherry/worker/async-state/execute" )
Variables ¶
var DeadEnd = &StateDecision{ ThreadCloseType: xcapi.DEAD_END.Ptr(), }
var ForceCompletingProcess = &StateDecision{ ThreadCloseType: xcapi.FORCE_COMPLETE_PROCESS.Ptr(), }
var ForceFailProcess = &StateDecision{ ThreadCloseType: xcapi.FORCE_FAIL_PROCESS.Ptr(), }
var GracefulCompletingProcess = &StateDecision{ ThreadCloseType: xcapi.GRACEFUL_COMPLETE_PROCESS.Ptr(), }
Functions ¶
func GetFinalProcessType ¶
GetFinalProcessType returns the process type that will be registered if the process is from &myStruct{} or myStruct{} under mywf package, the method returns "mywf.myStruct"
func GetFinalStateId ¶
func GetFinalStateId(asyncState AsyncState) string
GetFinalStateId returns the stateId that will be registered and used if the asyncState is from myStruct{} under mywf package, the method returns "mywf.myStruct"
func GetOpenApiErrorBody ¶
GetOpenApiErrorBody retrieve the API error body into a string to be human-readable
func IsClientError ¶
func IsProcessNotExistsError ¶
func IsRPCExecutionError ¶
func IsRPCLockingFailure ¶
func NewApiError ¶
func NewApiError( originalError error, openApiError *xcapi.GenericOpenAPIError, httpResponse *http.Response, errResponse *xcapi.ApiErrorResponse, ) error
func NewInternalError ¶
func NewInvalidArgumentError ¶
func ShouldSkipWaitUntilAPI ¶
func ShouldSkipWaitUntilAPI(state AsyncState) bool
Types ¶
type ApiError ¶
type ApiError struct { StatusCode int OriginalError error OpenApiError *xcapi.GenericOpenAPIError HttpResponse *http.Response ErrResponse *xcapi.ApiErrorResponse }
ApiError represents error returned from xCherry server Could be client side(4xx) or server side(5xx), see below helpers to check details
type AsyncState ¶
type AsyncState interface { // GetStateOptions defines the optional configuration of this state definition. GetStateOptions() *AsyncStateOptions // WaitUntil is the method to set up commands set up to wait for, before `Execute` API is invoked. // It's optional -- use xc.AsyncStateNoWaitUntil to skip this( then Execute will be invoked directly instead) // // ctx the context info of this API invocation, like process start time, processId, etc // input the state input // Communication TODO // @return the requested commands for this state /// WaitUntil(ctx Context, input Object, communication Communication) (*CommandRequest, error) // Execute is the method to execute and decide what to do next. // It's invoked after commands from WaitUntil are completed, or if WaitUntil is skipped(not implemented). // // ctx the context info of this API invocation, like process start time, processId, etc // input the state input // CommandResults the results of the command that executed by WaitUntil // Persistence TODO // Communication TODO // @return the decision of what to do next(e.g. transition to next states or closing process) Execute(ctx Context, input Object, commandResults CommandResults, persistence Persistence, communication Communication) (*StateDecision, error) }
type AsyncStateDefaults ¶
type AsyncStateDefaults struct {
// contains filtered or unexported fields
}
AsyncStateDefaults is a convenient struct to put into your state implementation to save the boilerplate code of returning default values Example usage:
type myStateImpl struct{ AsyncStateDefaults }
Then myStateImpl doesn't have to implement WaitUntil, Execute or GetStateOptions
func (AsyncStateDefaults) GetStateOptions ¶
func (d AsyncStateDefaults) GetStateOptions() *AsyncStateOptions
type AsyncStateDefaultsSkipWaitUntil ¶
type AsyncStateDefaultsSkipWaitUntil struct {
// contains filtered or unexported fields
}
AsyncStateDefaultsSkipWaitUntil is required to skip WaitUntil put into your state implementation to save the boilerplate code of returning default values Example usage:
type myStateImpl struct{ AsyncStateDefaultsSkipWaitUntil }
Then myStateImpl will skip WaitUntil, and doesn't have to implement GetStateOptions
func (AsyncStateDefaultsSkipWaitUntil) GetStateOptions ¶
func (d AsyncStateDefaultsSkipWaitUntil) GetStateOptions() *AsyncStateOptions
func (AsyncStateDefaultsSkipWaitUntil) WaitUntil ¶
func (d AsyncStateDefaultsSkipWaitUntil) WaitUntil(ctx Context, input Object, communication Communication) (*CommandRequest, error)
type AsyncStateOptions ¶
type AsyncStateOptions struct { // StateId is the unique identifier of the state. // It is being used for WorkerService to choose the right AsyncState to execute Start/Execute APIs // Default: the pkgName.structName of the state struct, see GetFinalStateId() for details when set as empty string StateId string // WaitUntilTimeoutSeconds is the timeout for the waitUntil API call. // Default: 10 seconds(configurable in server) when set as 0 // It will be capped to 60 seconds by server (configurable in server) WaitUntilTimeoutSeconds int32 // ExecuteTimeoutSeconds is the timeout for the execute API call. // Default: 10 seconds(configurable in server) when set as 0 // It will be capped to 60 seconds by server (configurable in server) ExecuteTimeoutSeconds int32 // WaitUntilRetryPolicy is the retry policy for the waitUntil API call. // Default: infinite retry with 1 second initial interval, 120 seconds max interval, and 2 backoff factor, // when set as nil WaitUntilRetryPolicy *xcapi.RetryPolicy // ExecuteRetryPolicy is the retry policy for the execute API call. // Default: infinite retry with 1 second initial interval, 120 seconds max interval, and 2 backoff factor, // when set as nil ExecuteRetryPolicy *xcapi.RetryPolicy // FailureRecoveryState is the state to recover after current state execution fails // Default: no recovery when set as nil FailureRecoveryState AsyncState // PersistencePolicyName is the name of loading policy for persistence if not using default policy PersistencePolicyName *string }
func (*AsyncStateOptions) SetFailureRecoveryOption ¶
func (o *AsyncStateOptions) SetFailureRecoveryOption(destState AsyncState) *AsyncStateOptions
type BasicClient ¶
type BasicClient interface { // StartProcess starts a process execution // processType is the process type // startStateId is the stateId of the startingState // processId is the required business identifier for the process execution(can be used with ProcessIdReusePolicy // input the optional input for the startingState // options is optional includes like ProcessIdReusePolicy. // return the processExecutionId StartProcess( ctx context.Context, processType string, startStateId, processId string, input interface{}, options *BasicClientProcessOptions, ) (string, error) // StopProcess stops a process execution // processId is the required business identifier for the process execution StopProcess(ctx context.Context, processId string, stopType xcapi.ProcessExecutionStopType) error // DescribeCurrentProcessExecution returns a process execution info // processId is the required business identifier for the process execution DescribeCurrentProcessExecution( ctx context.Context, processId string, ) (*xcapi.ProcessExecutionDescribeResponse, error) PublishToLocalQueue( ctx context.Context, processId string, messages []xcapi.LocalQueueMessage, ) error }
BasicClient is a base client without process registry It's the internal implementation of Client. But it can be used directly if there is good reason -- let you invoke the APIs to xCherry server without much type validation checks(process type, queue names, etc).
func NewBasicClient ¶
func NewBasicClient(options ClientOptions) BasicClient
NewBasicClient returns a BasicClient
type BasicClientProcessOptions ¶
type BasicClientProcessOptions struct { ProcessIdReusePolicy *xcapi.ProcessIdReusePolicy StartStateOptions *xcapi.AsyncStateConfig // default is 0 which indicate no timeout TimeoutSeconds int32 LocalAttributeConfig *xcapi.LocalAttributeConfig }
type Client ¶
type Client interface { GetBasicClient() BasicClient // StartProcess starts a process execution // definition is the definition of the process // processId is the required business identifier for the process execution (can be used with ProcessIdReusePolicy) // input the optional input for the startingState // return the processExecutionId StartProcess(ctx context.Context, definition Process, processId string, input interface{}) (string, error) // StartProcessWithOptions starts a process execution with options, which will override the options defined in process definition StartProcessWithOptions( ctx context.Context, definition Process, processId string, input interface{}, options *ProcessStartOptions, ) (string, error) // StopProcess stops a process execution // processId is the required business identifier for the process execution StopProcess(ctx context.Context, processId string, stopType xcapi.ProcessExecutionStopType) error // PublishToLocalQueue publishes a message to a local queue // the payload can be empty(nil) PublishToLocalQueue( ctx context.Context, processId string, queueName string, payload interface{}, options *LocalQueuePublishOptions, ) error BatchPublishToLocalQueue( ctx context.Context, processId string, messages ...LocalQueuePublishMessage, ) error // DescribeCurrentProcessExecution returns a process execution info // processId is the required business identifier for the process execution DescribeCurrentProcessExecution( ctx context.Context, processId string, ) (*xcapi.ProcessExecutionDescribeResponse, error) }
Client is a full-featured client
func NewClient ¶
func NewClient(registry Registry, options *ClientOptions) Client
NewClient returns a Client
type ClientOptions ¶
type ClientOptions struct { Namespace string ServerUrl string WorkerUrl string ObjectEncoder ObjectEncoder EnabledDebugLogging bool // DefaultProcessTimeoutSecondsOverride is used when StartProcess is called and // 1. no timeout specified in ProcessOptions(default as zero) // 2. no timeout specified in ProcessStartOptions(default as nil) // currently mainly for testing purpose DefaultProcessTimeoutSecondsOverride int32 }
func GetLocalDefaultClientOptions ¶
func GetLocalDefaultClientOptions() *ClientOptions
type Command ¶
type Command struct { CommandType CommandType TimerCommand *TimerCommand LocalQueueCommand *LocalQueueCommand }
func NewLocalQueueCommand ¶
func NewTimerCommand ¶
type CommandRequest ¶
type CommandRequest struct { Commands []Command CommandWaitingType xcapi.CommandWaitingType }
func AllOf ¶
func AllOf(commands ...Command) *CommandRequest
AllOf will wait for all the commands to complete
func AnyOf ¶
func AnyOf(commands ...Command) *CommandRequest
AnyOf will wait for any of the commands to complete
func EmptyCommandRequest ¶
func EmptyCommandRequest() *CommandRequest
EmptyCommandRequest will jump to decide stage immediately.
type CommandResults ¶
type CommandResults struct { TimerResults []TimerResult LocalQueueResults []LocalQueueCommandResult }
func (CommandResults) GetFirstLocalQueueCommand ¶
func (c CommandResults) GetFirstLocalQueueCommand() LocalQueueCommandResult
func (CommandResults) GetFirstTimerStatus ¶
func (c CommandResults) GetFirstTimerStatus() xcapi.CommandStatus
func (CommandResults) GetLocalQueueCommand ¶
func (c CommandResults) GetLocalQueueCommand(index int) LocalQueueCommandResult
func (CommandResults) GetTimerStatus ¶
func (c CommandResults) GetTimerStatus(index int) xcapi.CommandStatus
type CommandType ¶
type CommandType string
const ( CommandTypeTimer CommandType = "Timer" CommandTypeLocalQueue CommandType = "LocalQueue" )
type Communication ¶
type Communication interface { // PublishToLocalQueue publishes a message to a local queue // the payload can be empty(nil) PublishToLocalQueue(queueName string, payload interface{}) // contains filtered or unexported methods }
func NewCommunication ¶
func NewCommunication(encoder ObjectEncoder) Communication
type Context ¶
type Context interface { GetAttempt() int GetProcessId() string GetRecoverFromStateExecutionId() *string GetRecoverFromStateApi() *xcapi.WorkerApiType }
type DBColumnDef ¶
type DBColumnDef struct { GlobalAttributeKey string ColumnName string Hint *DBHint // contains filtered or unexported fields }
func NewDBColumnDef ¶
func NewDBColumnDef( key string, dbColumn string, defaultLoading bool, ) DBColumnDef
func NewDBColumnDefWithHint ¶
func NewDBColumnDefWithHint( key string, dbColumn string, defaultLoading bool, hint DBHint, ) DBColumnDef
type DBHint ¶
type DBHint string
DBHint is the hint for the DBConverter to convert database column to query value and vice versa
type DBTableSchema ¶
type DBTableSchema struct { TableName string PK string Columns []DBColumnDef // DefaultTablePolicy is the default loading policy for this table DefaultTablePolicy TablePolicy }
func NewDBTableSchema ¶
func NewDBTableSchema( tableName string, pk string, defaultReadLocking xcapi.LockType, columns ...DBColumnDef, ) DBTableSchema
type GlobalAttributesSchema ¶
type GlobalAttributesSchema struct { // Tables is table name to the table schema Tables map[string]DBTableSchema }
func NewEmptyGlobalAttributesSchema ¶
func NewEmptyGlobalAttributesSchema() *GlobalAttributesSchema
func NewGlobalAttributesSchema ¶
func NewGlobalAttributesSchema( table ...DBTableSchema, ) *GlobalAttributesSchema
type InternalSDKError ¶
type InternalSDKError struct {
Message string
}
InternalSDKError means something wrong within xCherry SDK
func (InternalSDKError) Error ¶
func (i InternalSDKError) Error() string
type InvalidArgumentError ¶
type InvalidArgumentError struct {
// contains filtered or unexported fields
}
InvalidArgumentError represents an invalid input argument
func (InvalidArgumentError) Error ¶
func (w InvalidArgumentError) Error() string
type LocalAttributeDef ¶
type LocalAttributeDef struct { Key string DefaultLoadingType LocalAttributeLoadingType }
func NewLocalAttributeDef ¶
func NewLocalAttributeDef(key string, defaultLoadingType LocalAttributeLoadingType) LocalAttributeDef
type LocalAttributeLoadingType ¶
type LocalAttributeLoadingType string
const ( NotLoad LocalAttributeLoadingType = "not load" LoadWithLock LocalAttributeLoadingType = "load with lock" LoadNoLock LocalAttributeLoadingType = "load no lock" )
type LocalAttributePolicy ¶
type LocalAttributesSchema ¶
type LocalAttributesSchema struct { LocalAttributeKeys map[string]bool DefaultLocalAttributePolicy LocalAttributePolicy }
func NewEmptyLocalAttributesSchema ¶
func NewEmptyLocalAttributesSchema() *LocalAttributesSchema
func NewLocalAttributesSchema ¶
func NewLocalAttributesSchema( LockingType *xcapi.LockType, localAttributesDef ...LocalAttributeDef, ) *LocalAttributesSchema
type LocalQueueCommand ¶
type LocalQueueCommandResult ¶
type LocalQueueCommandResult struct { Result xcapi.LocalQueueResult Encoder ObjectEncoder }
func (LocalQueueCommandResult) GetFirstMessage ¶
func (lc LocalQueueCommandResult) GetFirstMessage(ptr interface{})
func (LocalQueueCommandResult) GetMessages ¶
func (lc LocalQueueCommandResult) GetMessages() []Object
func (LocalQueueCommandResult) GetQueueName ¶
func (lc LocalQueueCommandResult) GetQueueName() string
func (LocalQueueCommandResult) GetStatus ¶
func (lc LocalQueueCommandResult) GetStatus() xcapi.CommandStatus
type NamedPersistencePolicy ¶
type NamedPersistencePolicy struct { Name string // LocalAttributePolicy is the policy for local attributes LocalAttributePolicy *LocalAttributePolicy // GlobalAttributePolicy is the policy for global attributes // key is the table name GlobalAttributePolicy map[string]TablePolicy }
func NewNamedPersistencePolicy ¶
func NewNamedPersistencePolicy( name string, localAttributesPolicy *LocalAttributePolicy, globalAttributeTablePolicy ...TablePolicy, ) NamedPersistencePolicy
type Object ¶
type Object interface {
Get(resultPtr interface{})
}
Object is a representation of EncodedObject
func NewObject ¶
func NewObject(EncodedObject *xcapi.EncodedObject, ObjectEncoder ObjectEncoder) Object
type ObjectEncoder ¶
type ObjectEncoder interface { // GetEncodingType returns the encoding info that it can handle GetEncodingType() string // Encode serialize an object Encode(obj interface{}) (*xcapi.EncodedObject, error) // Decode deserialize an object Decode(encodedObj *xcapi.EncodedObject, resultPtr interface{}) error }
func GetDefaultObjectEncoder ¶
func GetDefaultObjectEncoder() ObjectEncoder
type Persistence ¶
type Persistence interface { // GetLocalAttribute returns the local attribute value GetLocalAttribute(key string, resultPtr interface{}) // SetLocalAttribute sets the local attribute value SetLocalAttribute(key string, value interface{}) // contains filtered or unexported methods }
func NewPersistenceImpl ¶
func NewPersistenceImpl( localAttrKeys map[string]bool, currLocalAttrs *xcapi.LoadLocalAttributesResponse, ) Persistence
type PersistenceSchema ¶
type PersistenceSchema struct { // LocalAttributeSchema is the schema for local attributes // LocalAttributes are attributes that are specific to a process execution LocalAttributeSchema *LocalAttributesSchema }
func NewEmptyPersistenceSchema ¶
func NewEmptyPersistenceSchema() PersistenceSchema
func NewPersistenceSchema ¶
func NewPersistenceSchema( localAttrSchema *LocalAttributesSchema, globalAttrSchema *GlobalAttributesSchema, ) PersistenceSchema
NewPersistenceSchema creates a new PersistenceSchema globalAttrSchema is the schema for global attributes localAttrSchema is the schema for local attributes
func NewPersistenceSchemaWithOptions ¶
func NewPersistenceSchemaWithOptions( localAttrSchema *LocalAttributesSchema, ) PersistenceSchema
func (PersistenceSchema) ValidateLocalAttributeForRegistry ¶
func (s PersistenceSchema) ValidateLocalAttributeForRegistry() (map[string]bool, error)
type PersistenceSchemaOptions ¶
type PersistenceSchemaOptions struct { // NameToPolicy is the persistence policy with a name, // which can be used as an override to the default policy NameToPolicy map[string]NamedPersistencePolicy }
func NewPersistenceSchemaOptions ¶
func NewPersistenceSchemaOptions( namedPolicies ...NamedPersistencePolicy, ) PersistenceSchemaOptions
type Process ¶
type Process interface { // GetAsyncStateSchema defines the AsyncStates of the process. // If there is no startingState, the process will not start any state execution after process stated. // Application can still use RPC to invoke new state execution later. GetAsyncStateSchema() StateSchema // GetPersistenceSchema defines the persistence schema of the process GetPersistenceSchema() PersistenceSchema // GetProcessOptions defines the options for the process // Note that they can be overridden by the ProcessStartOptions when starting a process GetProcessOptions() ProcessOptions }
Process is the interface to define a process definition. Process is a top level concept in xCherry
type ProcessAbnormalExitError ¶
type ProcessAbnormalExitError struct { ProcessExecutionId string // TODO ClosedStatus xcapi.ProcessStatus // TODO FailureType *xcapi.ProcessFailureSubType ErrorMessage *string // StateResults []xcapi.ProcessCloseOutput Encoder ObjectEncoder }
ProcessAbnormalExitError is returned when process execution doesn't complete successfully when waiting on the completion
func AsProcessAbnormalExitError ¶
func AsProcessAbnormalExitError(err error) (*ProcessAbnormalExitError, bool)
AsProcessAbnormalExitError will check if it's a ProcessAbnormalExitError and convert it if so
func (*ProcessAbnormalExitError) Error ¶
func (w *ProcessAbnormalExitError) Error() string
type ProcessDefaults ¶
type ProcessDefaults struct { }
ProcessDefaults is a convenient struct to put into your process implementation to save the boilerplate code of returning default values Example usage :
type myPcImpl struct{ ProcessDefaults }
Then myPcImpl doesn't have to implement GetProcessOptions or GetAsyncStateSchema
func (ProcessDefaults) GetAsyncStateSchema ¶
func (d ProcessDefaults) GetAsyncStateSchema() StateSchema
func (ProcessDefaults) GetPersistenceSchema ¶
func (d ProcessDefaults) GetPersistenceSchema() PersistenceSchema
func (ProcessDefaults) GetProcessOptions ¶
func (d ProcessDefaults) GetProcessOptions() ProcessOptions
type ProcessDefinitionError ¶
type ProcessDefinitionError struct {
// contains filtered or unexported fields
}
ProcessDefinitionError represents process code(including its elements like AsyncStates/RPCs) is not valid
func (ProcessDefinitionError) Error ¶
func (w ProcessDefinitionError) Error() string
type ProcessOptions ¶
type ProcessOptions struct { // TimeoutSeconds is the timeout for the process execution. // Default: 0, mean which means infinite timeout TimeoutSeconds int32 // IdReusePolicy is the policy for reusing process id. // Default: xcapi.ALLOW_IF_NO_RUNNING when set as nil IdReusePolicy *xcapi.ProcessIdReusePolicy // GetProcessType defines the processType of this process definition. // GetFinalProcessType set the default value when return empty string -- // It's the packageName.structName of the process instance and ignores the import paths and aliases. // e.g. if the process is from myStruct{} under mywf package, the simple name is just "mywf.myStruct". Underneath, it's from reflect.TypeOf(wf).String(). ProcessType string }
func NewDefaultProcessOptions ¶
func NewDefaultProcessOptions() ProcessOptions
type ProcessStartOptions ¶
type ProcessStartOptions struct { // TimeoutSeconds is the timeout for the process execution. // Default: 0, mean which means infinite timeout. // This will override the timeout defined in process definition TimeoutSeconds *int32 // IdReusePolicy is the policy for reusing process id. // Default: xcapi.ALLOW_IF_NO_RUNNING when set as nil. // This will override the IdReusePolicy defined in process definition. IdReusePolicy *xcapi.ProcessIdReusePolicy // InitialLocalAttribute is the initial local attributes to be set when starting the process execution InitialLocalAttribute map[string]interface{} }
type Registry ¶
type Registry interface { // AddProcess registers a process AddProcess(processDef Process) error // AddProcesses registers multiple processes AddProcesses(processDefs ...Process) error // GetAllRegisteredProcessTypes returns all the process types that have been registered GetAllRegisteredProcessTypes() []string // contains filtered or unexported methods }
func NewRegistry ¶
func NewRegistry() Registry
type StateDecision ¶
type StateDecision struct { NextStates []StateMovement ThreadCloseType *xcapi.ThreadCloseType }
func MultiNextStates ¶
func MultiNextStates(states ...AsyncState) *StateDecision
func MultiNextStatesWithInput ¶
func MultiNextStatesWithInput(movements ...StateMovement) *StateDecision
func SingleNextState ¶
func SingleNextState(state AsyncState, input interface{}) *StateDecision
type StateMovement ¶
type StateMovement struct { // NextStateId is required NextStateId string // NextStateInput is optional NextStateInput interface{} }
func NewStateMovement ¶
func NewStateMovement(st AsyncState, input interface{}) StateMovement
type StateSchema ¶
type StateSchema struct { StartingState AsyncState AllStates []AsyncState }
func NewStateSchema ¶
func NewStateSchema(startingState AsyncState, nonStartingStates ...AsyncState) StateSchema
func NewStateSchemaNoStartingState ¶
func NewStateSchemaNoStartingState(nonStartingStates ...AsyncState) StateSchema
type TablePolicy ¶
type TablePolicy struct { TableName string // LoadingKeys are the attribute keys that will be loaded from the database LoadingKeys []string // TableLockingTypeDefault is the locking type for all the loaded attributes LockingType xcapi.LockType }
func NewTablePolicy ¶
func NewTablePolicy( tableName string, lockingType xcapi.LockType, loadingKeys ...string, ) TablePolicy
type TimerCommand ¶
type TimerCommand struct {
DelayInSeconds int64
}
type TimerResult ¶
type TimerResult struct {
Status xcapi.CommandStatus
}
type WorkerExecutionError ¶
WorkerExecutionError represents runtime errors on worker execution
func (WorkerExecutionError) Error ¶
func (i WorkerExecutionError) Error() string
type WorkerOptions ¶
type WorkerOptions struct {
ObjectEncoder ObjectEncoder
}
func GetDefaultWorkerOptions ¶
func GetDefaultWorkerOptions() WorkerOptions
type WorkerService ¶
type WorkerService interface { HandleAsyncStateWaitUntil(ctx context.Context, request xcapi.AsyncStateWaitUntilRequest) (*xcapi.AsyncStateWaitUntilResponse, error) HandleAsyncStateExecute(ctx context.Context, request xcapi.AsyncStateExecuteRequest) (*xcapi.AsyncStateExecuteResponse, error) }
WorkerService is for worker to handle task requests from xCherry server Typically put it behind a REST controller, using the above API paths
func NewWorkerService ¶
func NewWorkerService(registry Registry, options *WorkerOptions) WorkerService
Source Files
¶
- async_state.go
- async_state_options.go
- basic_client_impl.go
- basic_client_process_options.go
- client.go
- client_impl.go
- client_options.go
- command.go
- command_request.go
- command_results.go
- communication.go
- communication_impl.go
- context.go
- errors_api.go
- errors_worker.go
- internal_mapper.go
- local_queue_publish.go
- object.go
- object_encoder.go
- object_encoder_default.go
- persistence.go
- persistence_impl.go
- persistence_schema.go
- process.go
- process_options.go
- process_start_options.go
- registry.go
- registry_impl.go
- state_decision.go
- state_movement.go
- state_schema.go
- worker_options.go
- worker_service.go
- worker_service_impl.go