xc

package
v0.0.0-...-e21cc07 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultNamespace  = "default"
	DefaultWorkerPort = "8803"
	DefaultServerPort = "8801"

	DefaultWorkerUrl = "http://localhost:" + DefaultWorkerPort
	DefaultServerUrl = "http://localhost:" + DefaultServerPort
)
View Source
const (
	ApiPathAsyncStateWaitUntil = "/api/v1/xcherry/worker/async-state/wait-until"
	ApiPathAsyncStateExecute   = "/api/v1/xcherry/worker/async-state/execute"
)

Variables

View Source
var DeadEnd = &StateDecision{
	ThreadCloseType: xcapi.DEAD_END.Ptr(),
}
View Source
var ForceCompletingProcess = &StateDecision{
	ThreadCloseType: xcapi.FORCE_COMPLETE_PROCESS.Ptr(),
}
View Source
var ForceFailProcess = &StateDecision{
	ThreadCloseType: xcapi.FORCE_FAIL_PROCESS.Ptr(),
}
View Source
var GracefulCompletingProcess = &StateDecision{
	ThreadCloseType: xcapi.GRACEFUL_COMPLETE_PROCESS.Ptr(),
}

Functions

func GetFinalProcessType

func GetFinalProcessType(wf Process) string

GetFinalProcessType returns the process type that will be registered if the process is from &myStruct{} or myStruct{} under mywf package, the method returns "mywf.myStruct"

func GetFinalStateId

func GetFinalStateId(asyncState AsyncState) string

GetFinalStateId returns the stateId that will be registered and used if the asyncState is from myStruct{} under mywf package, the method returns "mywf.myStruct"

func GetOpenApiErrorBody

func GetOpenApiErrorBody(err error) string

GetOpenApiErrorBody retrieve the API error body into a string to be human-readable

func IsClientError

func IsClientError(err error) bool

func IsProcessAlreadyStartedError

func IsProcessAlreadyStartedError(err error) bool

func IsProcessNotExistsError

func IsProcessNotExistsError(err error) bool

func IsRPCExecutionError

func IsRPCExecutionError(err error) bool

func IsRPCLockingFailure

func IsRPCLockingFailure(err error) bool

func IsWaitingExceedingTimeoutError

func IsWaitingExceedingTimeoutError(err error) bool

func NewApiError

func NewApiError(
	originalError error, openApiError *xcapi.GenericOpenAPIError, httpResponse *http.Response,
	errResponse *xcapi.ApiErrorResponse,
) error

func NewInternalError

func NewInternalError(format string, args ...interface{}) error

func NewInvalidArgumentError

func NewInvalidArgumentError(tpl string, arg ...interface{}) error

func NewProcessDefinitionError

func NewProcessDefinitionError(tpl string, arg ...interface{}) error

func ShouldSkipWaitUntilAPI

func ShouldSkipWaitUntilAPI(state AsyncState) bool

Types

type ApiError

type ApiError struct {
	StatusCode    int
	OriginalError error
	OpenApiError  *xcapi.GenericOpenAPIError
	HttpResponse  *http.Response
	ErrResponse   *xcapi.ApiErrorResponse
}

ApiError represents error returned from xCherry server Could be client side(4xx) or server side(5xx), see below helpers to check details

func (*ApiError) Error

func (i *ApiError) Error() string

type AsyncState

type AsyncState interface {
	// GetStateOptions defines the optional configuration of this state definition.
	GetStateOptions() *AsyncStateOptions

	// WaitUntil is the method to set up commands set up to wait for, before `Execute` API is invoked.
	//           It's optional -- use xc.AsyncStateNoWaitUntil to skip this( then Execute will be invoked directly instead)
	//
	//  ctx              the context info of this API invocation, like process start time, processId, etc
	//  input            the state input
	//  Communication    TODO
	// @return the requested commands for this state
	///
	WaitUntil(ctx Context, input Object, communication Communication) (*CommandRequest, error)

	// Execute is the method to execute and decide what to do next.
	// It's invoked after commands from WaitUntil are completed, or if WaitUntil is skipped(not implemented).
	//
	//  ctx              the context info of this API invocation, like process start time, processId, etc
	//  input            the state input
	//  CommandResults   the results of the command that executed by WaitUntil
	//  Persistence      TODO
	//  Communication    TODO
	// @return the decision of what to do next(e.g. transition to next states or closing process)
	Execute(ctx Context, input Object, commandResults CommandResults, persistence Persistence, communication Communication) (*StateDecision, error)
}

type AsyncStateDefaults

type AsyncStateDefaults struct {
	// contains filtered or unexported fields
}

AsyncStateDefaults is a convenient struct to put into your state implementation to save the boilerplate code of returning default values Example usage:

type myStateImpl struct{
    AsyncStateDefaults
}

Then myStateImpl doesn't have to implement WaitUntil, Execute or GetStateOptions

func (AsyncStateDefaults) GetStateOptions

func (d AsyncStateDefaults) GetStateOptions() *AsyncStateOptions

type AsyncStateDefaultsSkipWaitUntil

type AsyncStateDefaultsSkipWaitUntil struct {
	// contains filtered or unexported fields
}

AsyncStateDefaultsSkipWaitUntil is required to skip WaitUntil put into your state implementation to save the boilerplate code of returning default values Example usage:

type myStateImpl struct{
    AsyncStateDefaultsSkipWaitUntil
}

Then myStateImpl will skip WaitUntil, and doesn't have to implement GetStateOptions

func (AsyncStateDefaultsSkipWaitUntil) GetStateOptions

func (d AsyncStateDefaultsSkipWaitUntil) GetStateOptions() *AsyncStateOptions

func (AsyncStateDefaultsSkipWaitUntil) WaitUntil

func (d AsyncStateDefaultsSkipWaitUntil) WaitUntil(ctx Context, input Object, communication Communication) (*CommandRequest, error)

type AsyncStateOptions

type AsyncStateOptions struct {
	// StateId is the unique identifier of the state.
	// It is being used for WorkerService to choose the right AsyncState to execute Start/Execute APIs
	// Default: the pkgName.structName of the state struct, see GetFinalStateId() for details when set as empty string
	StateId string
	// WaitUntilTimeoutSeconds is the timeout for the waitUntil API call.
	// Default: 10 seconds(configurable in server) when set as 0
	// It will be capped to 60 seconds by server (configurable in server)
	WaitUntilTimeoutSeconds int32
	// ExecuteTimeoutSeconds is the timeout for the execute API call.
	// Default: 10 seconds(configurable in server) when set as 0
	// It will be capped to 60 seconds by server (configurable in server)
	ExecuteTimeoutSeconds int32
	// WaitUntilRetryPolicy is the retry policy for the waitUntil API call.
	// Default: infinite retry with 1 second initial interval, 120 seconds max interval, and 2 backoff factor,
	// when set as nil
	WaitUntilRetryPolicy *xcapi.RetryPolicy
	// ExecuteRetryPolicy is the retry policy for the execute API call.
	// Default: infinite retry with 1 second initial interval, 120 seconds max interval, and 2 backoff factor,
	// when set as nil
	ExecuteRetryPolicy *xcapi.RetryPolicy
	// FailureRecoveryState is the state to recover after current state execution fails
	// Default: no recovery when set as nil
	FailureRecoveryState AsyncState
	// PersistencePolicyName is the name of loading policy for persistence if not using default policy
	PersistencePolicyName *string
}

func (*AsyncStateOptions) SetFailureRecoveryOption

func (o *AsyncStateOptions) SetFailureRecoveryOption(destState AsyncState) *AsyncStateOptions

type BasicClient

type BasicClient interface {
	// StartProcess starts a process execution
	// processType is the process type
	// startStateId is the stateId of the startingState
	// processId is the required business identifier for the process execution(can be used with ProcessIdReusePolicy
	// input the optional input for the startingState
	// options is optional includes like ProcessIdReusePolicy.
	// return the processExecutionId
	StartProcess(
		ctx context.Context, processType string, startStateId, processId string, input interface{},
		options *BasicClientProcessOptions,
	) (string, error)
	// StopProcess stops a process execution
	// processId is the required business identifier for the process execution
	StopProcess(ctx context.Context, processId string, stopType xcapi.ProcessExecutionStopType) error
	// DescribeCurrentProcessExecution returns a process execution info
	// processId is the required business identifier for the process execution
	DescribeCurrentProcessExecution(
		ctx context.Context, processId string,
	) (*xcapi.ProcessExecutionDescribeResponse, error)

	PublishToLocalQueue(
		ctx context.Context, processId string, messages []xcapi.LocalQueueMessage,
	) error
}

BasicClient is a base client without process registry It's the internal implementation of Client. But it can be used directly if there is good reason -- let you invoke the APIs to xCherry server without much type validation checks(process type, queue names, etc).

func NewBasicClient

func NewBasicClient(options ClientOptions) BasicClient

NewBasicClient returns a BasicClient

type BasicClientProcessOptions

type BasicClientProcessOptions struct {
	ProcessIdReusePolicy *xcapi.ProcessIdReusePolicy
	StartStateOptions    *xcapi.AsyncStateConfig
	// default is 0 which indicate no timeout
	TimeoutSeconds       int32
	LocalAttributeConfig *xcapi.LocalAttributeConfig
}

type Client

type Client interface {
	GetBasicClient() BasicClient
	// StartProcess starts a process execution
	// definition is the definition of the process
	// processId is the required business identifier for the process execution (can be used with ProcessIdReusePolicy)
	// input the optional input for the startingState
	// return the processExecutionId
	StartProcess(ctx context.Context, definition Process, processId string, input interface{}) (string, error)
	// StartProcessWithOptions starts a process execution with options, which will override the options defined in process definition
	StartProcessWithOptions(
		ctx context.Context, definition Process, processId string, input interface{}, options *ProcessStartOptions,
	) (string, error)
	// StopProcess stops a process execution
	// processId is the required business identifier for the process execution
	StopProcess(ctx context.Context, processId string, stopType xcapi.ProcessExecutionStopType) error
	// PublishToLocalQueue publishes a message to a local queue
	// the payload can be empty(nil)
	PublishToLocalQueue(
		ctx context.Context, processId string, queueName string, payload interface{}, options *LocalQueuePublishOptions,
	) error
	BatchPublishToLocalQueue(
		ctx context.Context, processId string, messages ...LocalQueuePublishMessage,
	) error
	// DescribeCurrentProcessExecution returns a process execution info
	// processId is the required business identifier for the process execution
	DescribeCurrentProcessExecution(
		ctx context.Context, processId string,
	) (*xcapi.ProcessExecutionDescribeResponse, error)
}

Client is a full-featured client

func NewClient

func NewClient(registry Registry, options *ClientOptions) Client

NewClient returns a Client

type ClientOptions

type ClientOptions struct {
	Namespace           string
	ServerUrl           string
	WorkerUrl           string
	ObjectEncoder       ObjectEncoder
	EnabledDebugLogging bool
	// DefaultProcessTimeoutSecondsOverride is used when StartProcess is called and
	// 1. no timeout specified in ProcessOptions(default as zero)
	// 2. no timeout specified in ProcessStartOptions(default as nil)
	// currently mainly for testing purpose
	DefaultProcessTimeoutSecondsOverride int32
}

func GetLocalDefaultClientOptions

func GetLocalDefaultClientOptions() *ClientOptions

type Command

type Command struct {
	CommandType       CommandType
	TimerCommand      *TimerCommand
	LocalQueueCommand *LocalQueueCommand
}

func NewLocalQueueCommand

func NewLocalQueueCommand(queueName string, count int) Command

func NewTimerCommand

func NewTimerCommand(duration time.Duration) Command

type CommandRequest

type CommandRequest struct {
	Commands           []Command
	CommandWaitingType xcapi.CommandWaitingType
}

func AllOf

func AllOf(commands ...Command) *CommandRequest

AllOf will wait for all the commands to complete

func AnyOf

func AnyOf(commands ...Command) *CommandRequest

AnyOf will wait for any of the commands to complete

func EmptyCommandRequest

func EmptyCommandRequest() *CommandRequest

EmptyCommandRequest will jump to decide stage immediately.

type CommandResults

type CommandResults struct {
	TimerResults      []TimerResult
	LocalQueueResults []LocalQueueCommandResult
}

func (CommandResults) GetFirstLocalQueueCommand

func (c CommandResults) GetFirstLocalQueueCommand() LocalQueueCommandResult

func (CommandResults) GetFirstTimerStatus

func (c CommandResults) GetFirstTimerStatus() xcapi.CommandStatus

func (CommandResults) GetLocalQueueCommand

func (c CommandResults) GetLocalQueueCommand(index int) LocalQueueCommandResult

func (CommandResults) GetTimerStatus

func (c CommandResults) GetTimerStatus(index int) xcapi.CommandStatus

type CommandType

type CommandType string
const (
	CommandTypeTimer      CommandType = "Timer"
	CommandTypeLocalQueue CommandType = "LocalQueue"
)

type Communication

type Communication interface {
	// PublishToLocalQueue publishes a message to a local queue
	// the payload can be empty(nil)
	PublishToLocalQueue(queueName string, payload interface{})
	// contains filtered or unexported methods
}

func NewCommunication

func NewCommunication(encoder ObjectEncoder) Communication

type Context

type Context interface {
	GetAttempt() int
	GetProcessId() string
	GetRecoverFromStateExecutionId() *string
	GetRecoverFromStateApi() *xcapi.WorkerApiType
}

type DBColumnDef

type DBColumnDef struct {
	GlobalAttributeKey string
	ColumnName         string
	Hint               *DBHint
	// contains filtered or unexported fields
}

func NewDBColumnDef

func NewDBColumnDef(
	key string, dbColumn string, defaultLoading bool,
) DBColumnDef

func NewDBColumnDefWithHint

func NewDBColumnDefWithHint(
	key string, dbColumn string, defaultLoading bool, hint DBHint,
) DBColumnDef

type DBHint

type DBHint string

DBHint is the hint for the DBConverter to convert database column to query value and vice versa

type DBTableSchema

type DBTableSchema struct {
	TableName string
	PK        string
	Columns   []DBColumnDef
	// DefaultTablePolicy is the default loading policy for this table
	DefaultTablePolicy TablePolicy
}

func NewDBTableSchema

func NewDBTableSchema(
	tableName string,
	pk string,
	defaultReadLocking xcapi.LockType,
	columns ...DBColumnDef,
) DBTableSchema

type GlobalAttributesSchema

type GlobalAttributesSchema struct {
	// Tables is table name to the table schema
	Tables map[string]DBTableSchema
}

func NewEmptyGlobalAttributesSchema

func NewEmptyGlobalAttributesSchema() *GlobalAttributesSchema

func NewGlobalAttributesSchema

func NewGlobalAttributesSchema(
	table ...DBTableSchema,
) *GlobalAttributesSchema

type InternalSDKError

type InternalSDKError struct {
	Message string
}

InternalSDKError means something wrong within xCherry SDK

func (InternalSDKError) Error

func (i InternalSDKError) Error() string

type InvalidArgumentError

type InvalidArgumentError struct {
	// contains filtered or unexported fields
}

InvalidArgumentError represents an invalid input argument

func (InvalidArgumentError) Error

func (w InvalidArgumentError) Error() string

type LocalAttributeDef

type LocalAttributeDef struct {
	Key                string
	DefaultLoadingType LocalAttributeLoadingType
}

func NewLocalAttributeDef

func NewLocalAttributeDef(key string, defaultLoadingType LocalAttributeLoadingType) LocalAttributeDef

type LocalAttributeLoadingType

type LocalAttributeLoadingType string
const (
	NotLoad      LocalAttributeLoadingType = "not load"
	LoadWithLock LocalAttributeLoadingType = "load with lock"
	LoadNoLock   LocalAttributeLoadingType = "load no lock"
)

type LocalAttributePolicy

type LocalAttributePolicy struct {
	LocalAttributeKeysNoLock   map[string]bool
	LocalAttributeKeysWithLock map[string]bool
	LockingType                *xcapi.LockType
}

type LocalAttributesSchema

type LocalAttributesSchema struct {
	LocalAttributeKeys          map[string]bool
	DefaultLocalAttributePolicy LocalAttributePolicy
}

func NewEmptyLocalAttributesSchema

func NewEmptyLocalAttributesSchema() *LocalAttributesSchema

func NewLocalAttributesSchema

func NewLocalAttributesSchema(
	LockingType *xcapi.LockType,
	localAttributesDef ...LocalAttributeDef,
) *LocalAttributesSchema

type LocalQueueCommand

type LocalQueueCommand struct {
	QueueName string
	Count     int
}

type LocalQueueCommandResult

type LocalQueueCommandResult struct {
	Result  xcapi.LocalQueueResult
	Encoder ObjectEncoder
}

func (LocalQueueCommandResult) GetFirstMessage

func (lc LocalQueueCommandResult) GetFirstMessage(ptr interface{})

func (LocalQueueCommandResult) GetMessages

func (lc LocalQueueCommandResult) GetMessages() []Object

func (LocalQueueCommandResult) GetQueueName

func (lc LocalQueueCommandResult) GetQueueName() string

func (LocalQueueCommandResult) GetStatus

type LocalQueuePublishMessage

type LocalQueuePublishMessage struct {
	QueueName string
	Payload   interface{}
	// DedupSeed is the seed to generate the DedupUUID
	// by uuid.NewMD5(uuid.NameSpaceOID, []byte(*DedupSeed))
	// only used if DedupUUID is nil
	DedupSeed *string
	// DedupUUID is the deduplication UUID
	DedupUUID *string
}

type LocalQueuePublishOptions

type LocalQueuePublishOptions struct {
	// DedupSeed is the seed to generate the DedupUUID
	// by uuid.NewMD5(uuid.NameSpaceOID, []byte(*DedupSeed))
	// only used if DedupUUID is nil
	DedupSeed *string
	// DedupUUID is the deduplication UUID
	DedupUUID *string
}

type NamedPersistencePolicy

type NamedPersistencePolicy struct {
	Name string
	// LocalAttributePolicy is the policy for local attributes
	LocalAttributePolicy *LocalAttributePolicy
	// GlobalAttributePolicy is the policy for global attributes
	// key is the table name
	GlobalAttributePolicy map[string]TablePolicy
}

func NewNamedPersistencePolicy

func NewNamedPersistencePolicy(
	name string,
	localAttributesPolicy *LocalAttributePolicy,
	globalAttributeTablePolicy ...TablePolicy,
) NamedPersistencePolicy

type Object

type Object interface {
	Get(resultPtr interface{})
}

Object is a representation of EncodedObject

func NewObject

func NewObject(EncodedObject *xcapi.EncodedObject, ObjectEncoder ObjectEncoder) Object

type ObjectEncoder

type ObjectEncoder interface {
	// GetEncodingType returns the encoding info that it can handle
	GetEncodingType() string
	// Encode serialize an object
	Encode(obj interface{}) (*xcapi.EncodedObject, error)
	// Decode deserialize an object
	Decode(encodedObj *xcapi.EncodedObject, resultPtr interface{}) error
}

func GetDefaultObjectEncoder

func GetDefaultObjectEncoder() ObjectEncoder

type Persistence

type Persistence interface {
	// GetLocalAttribute returns the local attribute value
	GetLocalAttribute(key string, resultPtr interface{})
	// SetLocalAttribute sets the local attribute value
	SetLocalAttribute(key string, value interface{})
	// contains filtered or unexported methods
}

func NewPersistenceImpl

func NewPersistenceImpl(
	localAttrKeys map[string]bool,
	currLocalAttrs *xcapi.LoadLocalAttributesResponse,
) Persistence

type PersistenceSchema

type PersistenceSchema struct {
	// LocalAttributeSchema is the schema for local attributes
	// LocalAttributes are attributes that are specific to a process execution
	LocalAttributeSchema *LocalAttributesSchema
}

func NewEmptyPersistenceSchema

func NewEmptyPersistenceSchema() PersistenceSchema

func NewPersistenceSchema

func NewPersistenceSchema(
	localAttrSchema *LocalAttributesSchema,
	globalAttrSchema *GlobalAttributesSchema,
) PersistenceSchema

NewPersistenceSchema creates a new PersistenceSchema globalAttrSchema is the schema for global attributes localAttrSchema is the schema for local attributes

func NewPersistenceSchemaWithOptions

func NewPersistenceSchemaWithOptions(
	localAttrSchema *LocalAttributesSchema,
) PersistenceSchema

func (PersistenceSchema) ValidateLocalAttributeForRegistry

func (s PersistenceSchema) ValidateLocalAttributeForRegistry() (map[string]bool, error)

type PersistenceSchemaOptions

type PersistenceSchemaOptions struct {
	// NameToPolicy is the persistence policy with a name,
	// which can be used as an override to the default policy
	NameToPolicy map[string]NamedPersistencePolicy
}

func NewPersistenceSchemaOptions

func NewPersistenceSchemaOptions(
	namedPolicies ...NamedPersistencePolicy,
) PersistenceSchemaOptions

type Process

type Process interface {
	// GetAsyncStateSchema defines the AsyncStates of the process.
	// If there is no startingState, the process will not start any state execution after process stated.
	// Application can still use RPC to invoke new state execution later.
	GetAsyncStateSchema() StateSchema
	// GetPersistenceSchema defines the persistence schema of the process
	GetPersistenceSchema() PersistenceSchema
	// GetProcessOptions defines the options for the process
	// Note that they can be overridden by the ProcessStartOptions when starting a process
	GetProcessOptions() ProcessOptions
}

Process is the interface to define a process definition. Process is a top level concept in xCherry

type ProcessAbnormalExitError

type ProcessAbnormalExitError struct {
	ProcessExecutionId string
	// TODO ClosedStatus xcapi.ProcessStatus
	// TODO FailureType    *xcapi.ProcessFailureSubType
	ErrorMessage *string
	// StateResults []xcapi.ProcessCloseOutput
	Encoder ObjectEncoder
}

ProcessAbnormalExitError is returned when process execution doesn't complete successfully when waiting on the completion

func AsProcessAbnormalExitError

func AsProcessAbnormalExitError(err error) (*ProcessAbnormalExitError, bool)

AsProcessAbnormalExitError will check if it's a ProcessAbnormalExitError and convert it if so

func (*ProcessAbnormalExitError) Error

func (w *ProcessAbnormalExitError) Error() string

type ProcessDefaults

type ProcessDefaults struct {
}

ProcessDefaults is a convenient struct to put into your process implementation to save the boilerplate code of returning default values Example usage :

type myPcImpl struct{
    ProcessDefaults
}

Then myPcImpl doesn't have to implement GetProcessOptions or GetAsyncStateSchema

func (ProcessDefaults) GetAsyncStateSchema

func (d ProcessDefaults) GetAsyncStateSchema() StateSchema

func (ProcessDefaults) GetPersistenceSchema

func (d ProcessDefaults) GetPersistenceSchema() PersistenceSchema

func (ProcessDefaults) GetProcessOptions

func (d ProcessDefaults) GetProcessOptions() ProcessOptions

type ProcessDefinitionError

type ProcessDefinitionError struct {
	// contains filtered or unexported fields
}

ProcessDefinitionError represents process code(including its elements like AsyncStates/RPCs) is not valid

func (ProcessDefinitionError) Error

func (w ProcessDefinitionError) Error() string

type ProcessOptions

type ProcessOptions struct {
	// TimeoutSeconds is the timeout for the process execution.
	// Default: 0, mean which means infinite timeout
	TimeoutSeconds int32
	// IdReusePolicy is the policy for reusing process id.
	// Default: xcapi.ALLOW_IF_NO_RUNNING when set as nil
	IdReusePolicy *xcapi.ProcessIdReusePolicy
	// GetProcessType defines the processType of this process definition.
	// GetFinalProcessType set the default value when return empty string --
	// It's the packageName.structName of the process instance and ignores the import paths and aliases.
	// e.g. if the process is from myStruct{} under mywf package, the simple name is just "mywf.myStruct". Underneath, it's from reflect.TypeOf(wf).String().
	ProcessType string
}

func NewDefaultProcessOptions

func NewDefaultProcessOptions() ProcessOptions

type ProcessStartOptions

type ProcessStartOptions struct {
	// TimeoutSeconds is the timeout for the process execution.
	// Default: 0, mean which means infinite timeout.
	// This will override the timeout defined in process definition
	TimeoutSeconds *int32
	// IdReusePolicy is the policy for reusing process id.
	// Default: xcapi.ALLOW_IF_NO_RUNNING when set as nil.
	// This will override the IdReusePolicy defined in process definition.
	IdReusePolicy *xcapi.ProcessIdReusePolicy
	// InitialLocalAttribute is the initial local attributes to be set when starting the process execution
	InitialLocalAttribute map[string]interface{}
}

type Registry

type Registry interface {
	// AddProcess registers a process
	AddProcess(processDef Process) error
	// AddProcesses registers multiple processes
	AddProcesses(processDefs ...Process) error
	// GetAllRegisteredProcessTypes returns all the process types that have been registered
	GetAllRegisteredProcessTypes() []string
	// contains filtered or unexported methods
}

func NewRegistry

func NewRegistry() Registry

type StateDecision

type StateDecision struct {
	NextStates      []StateMovement
	ThreadCloseType *xcapi.ThreadCloseType
}

func MultiNextStates

func MultiNextStates(states ...AsyncState) *StateDecision

func MultiNextStatesWithInput

func MultiNextStatesWithInput(movements ...StateMovement) *StateDecision

func SingleNextState

func SingleNextState(state AsyncState, input interface{}) *StateDecision

type StateMovement

type StateMovement struct {
	// NextStateId is required
	NextStateId string
	// NextStateInput is optional
	NextStateInput interface{}
}

func NewStateMovement

func NewStateMovement(st AsyncState, input interface{}) StateMovement

type StateSchema

type StateSchema struct {
	StartingState AsyncState
	AllStates     []AsyncState
}

func NewStateSchema

func NewStateSchema(startingState AsyncState, nonStartingStates ...AsyncState) StateSchema

func NewStateSchemaNoStartingState

func NewStateSchemaNoStartingState(nonStartingStates ...AsyncState) StateSchema

type TablePolicy

type TablePolicy struct {
	TableName string
	// LoadingKeys are the attribute keys that will be loaded from the database
	LoadingKeys []string
	// TableLockingTypeDefault is the locking type for all the loaded attributes
	LockingType xcapi.LockType
}

func NewTablePolicy

func NewTablePolicy(
	tableName string,
	lockingType xcapi.LockType,
	loadingKeys ...string,
) TablePolicy

type TimerCommand

type TimerCommand struct {
	DelayInSeconds int64
}

type TimerResult

type TimerResult struct {
	Status xcapi.CommandStatus
}

type WorkerExecutionError

type WorkerExecutionError struct {
	OriginalError error
	StackTrace    string
}

WorkerExecutionError represents runtime errors on worker execution

func (WorkerExecutionError) Error

func (i WorkerExecutionError) Error() string

type WorkerOptions

type WorkerOptions struct {
	ObjectEncoder ObjectEncoder
}

func GetDefaultWorkerOptions

func GetDefaultWorkerOptions() WorkerOptions

type WorkerService

type WorkerService interface {
	HandleAsyncStateWaitUntil(ctx context.Context, request xcapi.AsyncStateWaitUntilRequest) (*xcapi.AsyncStateWaitUntilResponse, error)
	HandleAsyncStateExecute(ctx context.Context, request xcapi.AsyncStateExecuteRequest) (*xcapi.AsyncStateExecuteResponse, error)
}

WorkerService is for worker to handle task requests from xCherry server Typically put it behind a REST controller, using the above API paths

func NewWorkerService

func NewWorkerService(registry Registry, options *WorkerOptions) WorkerService

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL