interpreter

package
v1.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DumpWorkflowInternal added in v1.5.0

func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error)

func IsDeciderTriggerConditionMet added in v1.5.0

func IsDeciderTriggerConditionMet(
	commandReq iwfidl.CommandRequest,
	completedTimerCmds map[int]service.InternalTimerStatus,
	completedSignalCmds map[int]*iwfidl.EncodedObject,
	completedInterStateChannelCmds map[int]*iwfidl.EncodedObject,
) bool

func LastCaller added in v1.5.0

func LastCaller() string

func LoadInternalsFromPreviousRun added in v1.5.0

func LoadInternalsFromPreviousRun(ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32) (*service.ContinueAsNewDumpResponse, error)

func RegisterActivityProvider

func RegisterActivityProvider(backendType service.BackendType, provider ActivityProvider)

func SetQueryHandlers added in v1.5.0

func SetQueryHandlers(ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, continueAsNewer *ContinueAsNewer,
	workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo) error

func StateApiExecute added in v1.5.0

func StateApiWaitUntil added in v1.5.0

func StateDecide

func StateDecide(
	ctx context.Context,
	backendType service.BackendType,
	input service.StateDecideActivityInput,
	shouldSendSignalOnCompletion bool,
	timeout time.Duration,
) (*iwfidl.WorkflowStateDecideResponse, error)

StateDecide is deprecated. Will be removed in next release

func StateStart

StateStart is Deprecated, will be removed in next release

func WaitForStateCompletionWorkflowImpl added in v1.8.0

func WaitForStateCompletionWorkflowImpl(
	ctx UnifiedContext, provider WorkflowProvider,
) (*service.WaitForStateCompletionWorkflowOutput, error)

Types

type ActivityInfo added in v1.2.2

type ActivityInfo struct {
	ScheduledTime time.Time // Time of activity scheduled by a workflow
	Attempt       int32     // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
}

type ActivityOptions

type ActivityOptions struct {
	StartToCloseTimeout time.Duration
	RetryPolicy         *iwfidl.RetryPolicy
}

type ActivityProvider

type ActivityProvider interface {
	GetLogger(ctx context.Context) UnifiedLogger
	NewApplicationError(errType string, details interface{}) error
	GetActivityInfo(ctx context.Context) ActivityInfo
}

type ContinueAsNewCounter added in v1.5.0

type ContinueAsNewCounter struct {
	// contains filtered or unexported fields
}

func NewContinueAsCounter added in v1.5.0

func NewContinueAsCounter(
	configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider,
) *ContinueAsNewCounter

func (*ContinueAsNewCounter) IncExecutedStateExecution added in v1.5.0

func (c *ContinueAsNewCounter) IncExecutedStateExecution(skipStart bool)

func (*ContinueAsNewCounter) IncSignalsReceived added in v1.5.0

func (c *ContinueAsNewCounter) IncSignalsReceived()

func (*ContinueAsNewCounter) IncSyncUpdateReceived added in v1.8.0

func (c *ContinueAsNewCounter) IncSyncUpdateReceived()

func (*ContinueAsNewCounter) IsThresholdMet added in v1.5.0

func (c *ContinueAsNewCounter) IsThresholdMet() bool

func (*ContinueAsNewCounter) TriggerByAPI added in v1.9.0

func (c *ContinueAsNewCounter) TriggerByAPI()

type ContinueAsNewer added in v1.3.0

type ContinueAsNewer struct {
	StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo
	// contains filtered or unexported fields
}

func NewContinueAsNewer added in v1.3.0

func NewContinueAsNewer(
	provider WorkflowProvider,
	interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
	persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, timerProcessor *TimerProcessor,
) *ContinueAsNewer

func (*ContinueAsNewer) AddPotentialStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) AddPotentialStateExecutionToResume(
	stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue, commandRequest iwfidl.CommandRequest,
	completedTimerCommands map[int]service.InternalTimerStatus, completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
)

func (*ContinueAsNewer) DecreaseInflightOperation added in v1.8.0

func (c *ContinueAsNewer) DecreaseInflightOperation()

func (*ContinueAsNewer) DrainThreads added in v1.5.0

func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error

func (*ContinueAsNewer) HasAnyStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) HasAnyStateExecutionToResume() bool

func (*ContinueAsNewer) IncreaseInflightOperation added in v1.8.0

func (c *ContinueAsNewer) IncreaseInflightOperation()

func (*ContinueAsNewer) RemoveStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string)

func (*ContinueAsNewer) SetQueryHandlersForContinueAsNew added in v1.3.0

func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error

type Future

type Future interface {
	Get(ctx UnifiedContext, valuePtr interface{}) error
	IsReady() bool
}

type GlobalVersioner added in v1.5.0

type GlobalVersioner struct {
	OmitVersionMarker bool // indicate the version marker and upsertSearchAttribute is already set at the start of the workflow
	// contains filtered or unexported fields
}

GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api

func NewGlobalVersioner added in v1.5.0

func NewGlobalVersioner(
	workflowProvider WorkflowProvider, omitVersionMarker bool, ctx UnifiedContext,
) (*GlobalVersioner, error)

func (*GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates added in v1.8.0

func (p *GlobalVersioner) IsAfterVersionOfContinueAsNewOnNoStates() bool

func (*GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool

func (*GlobalVersioner) IsAfterVersionOfRenamedStateApi added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool

func (*GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool

func (*GlobalVersioner) UpsertGlobalVersionSearchAttribute added in v1.5.0

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error

type HandlerOutput added in v1.8.0

type HandlerOutput struct {
	RpcOutput   *iwfidl.WorkflowRpcResponse
	StatusError *errors.ErrorAndStatus
}

type InterStateChannel

type InterStateChannel struct {
	// contains filtered or unexported fields
}

func NewInterStateChannel

func NewInterStateChannel() *InterStateChannel

func RebuildInterStateChannel added in v1.2.0

func RebuildInterStateChannel(refill map[string][]*iwfidl.EncodedObject) *InterStateChannel

func (*InterStateChannel) HasData

func (i *InterStateChannel) HasData(channelName string) bool

func (*InterStateChannel) ProcessPublishing

func (i *InterStateChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing)

func (*InterStateChannel) ReadReceived added in v1.3.0

func (i *InterStateChannel) ReadReceived(channelNames []string) map[string][]*iwfidl.EncodedObject

func (*InterStateChannel) Retrieve

func (i *InterStateChannel) Retrieve(channelName string) *iwfidl.EncodedObject

type InvokeRpcActivityOutput added in v1.8.0

type InvokeRpcActivityOutput struct {
	RpcOutput   *iwfidl.WorkflowWorkerRpcResponse
	StatusError *errors.ErrorAndStatus
}

func InvokeWorkerRpc added in v1.8.0

type OutputCollector added in v1.5.0

type OutputCollector struct {
	// contains filtered or unexported fields
}

func NewOutputCollector added in v1.5.0

func NewOutputCollector(initOutputs []iwfidl.StateCompletionOutput) *OutputCollector

func (*OutputCollector) Add added in v1.5.0

func (*OutputCollector) GetAll added in v1.5.0

type PersistenceManager

type PersistenceManager struct {
	// contains filtered or unexported fields
}

func NewPersistenceManager

func NewPersistenceManager(
	provider WorkflowProvider, initSearchAttributes []iwfidl.SearchAttribute, useMemo bool,
) *PersistenceManager

func RebuildPersistenceManager added in v1.2.0

func RebuildPersistenceManager(
	provider WorkflowProvider,
	dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute,
	useMemo bool,
) *PersistenceManager

func (*PersistenceManager) CheckDataAndSearchAttributesKeysAreUnlocked added in v1.8.0

func (am *PersistenceManager) CheckDataAndSearchAttributesKeysAreUnlocked(dataAttrKeysToCheck, searchAttrKeysToCheck []string) bool

func (*PersistenceManager) GetAllDataObjects

func (am *PersistenceManager) GetAllDataObjects() []iwfidl.KeyValue

func (*PersistenceManager) GetAllSearchAttributes

func (am *PersistenceManager) GetAllSearchAttributes() []iwfidl.SearchAttribute

func (*PersistenceManager) GetDataObjectsByKey

func (*PersistenceManager) LoadDataObjects

func (am *PersistenceManager) LoadDataObjects(
	ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy,
) []iwfidl.KeyValue

func (*PersistenceManager) LoadSearchAttributes

func (am *PersistenceManager) LoadSearchAttributes(
	ctx UnifiedContext, loadingPolicy *iwfidl.PersistenceLoadingPolicy,
) []iwfidl.SearchAttribute

func (*PersistenceManager) ProcessUpsertDataObject

func (am *PersistenceManager) ProcessUpsertDataObject(ctx UnifiedContext, attributes []iwfidl.KeyValue) error

func (*PersistenceManager) ProcessUpsertSearchAttribute

func (am *PersistenceManager) ProcessUpsertSearchAttribute(
	ctx UnifiedContext, attributes []iwfidl.SearchAttribute,
) error

func (*PersistenceManager) UnlockPersistence added in v1.5.1

func (am *PersistenceManager) UnlockPersistence(
	saPolicy *iwfidl.PersistenceLoadingPolicy, daPolicy *iwfidl.PersistenceLoadingPolicy,
)

type ReceiveChannel

type ReceiveChannel interface {
	ReceiveAsync(valuePtr interface{}) (ok bool)
	ReceiveBlocking(ctx UnifiedContext, valuePtr interface{}) (ok bool)
}

type SignalReceiver added in v1.3.0

type SignalReceiver struct {
	// contains filtered or unexported fields
}

func NewSignalReceiver added in v1.3.0

func NewSignalReceiver(
	ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel,
	stateRequestQueue *StateRequestQueue,
	persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter,
	workflowConfiger *WorkflowConfiger,
	initReceivedSignals map[string][]*iwfidl.EncodedObject,
) *SignalReceiver

func (*SignalReceiver) DrainAllUnreceivedSignals added in v1.5.0

func (sr *SignalReceiver) DrainAllUnreceivedSignals(ctx UnifiedContext)

DrainAllUnreceivedSignals will process all the signals There are two cases this is needed: 1. ContinueAsNew: retrieve signals that after signal handler threads are stopped, so that the signals can be carried over to next run by continueAsNew. This includes both regular user signals and system signals 2. Conditional close/complete workflow on signal/internal channel: retrieve all signal/internal channel messages before checking the signal/internal channels

func (*SignalReceiver) DumpReceived added in v1.5.0

func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfidl.EncodedObject

func (*SignalReceiver) HasSignal added in v1.3.0

func (sr *SignalReceiver) HasSignal(channelName string) bool

func (*SignalReceiver) IsFailWorkflowRequested added in v1.5.0

func (sr *SignalReceiver) IsFailWorkflowRequested() (bool, error)

func (*SignalReceiver) Retrieve added in v1.3.0

func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject

type StateExecutionCounter added in v1.3.0

type StateExecutionCounter struct {
	// contains filtered or unexported fields
}

func NewStateExecutionCounter added in v1.3.0

func NewStateExecutionCounter(
	ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner,
	configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
) *StateExecutionCounter

func RebuildStateExecutionCounter added in v1.5.0

func RebuildStateExecutionCounter(
	ctx UnifiedContext, provider WorkflowProvider, globalVersioner *GlobalVersioner,
	stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int,
	totalCurrentlyExecutingCount int,
	configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
) *StateExecutionCounter

func (*StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally added in v1.5.0

func (e *StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally()

ClearExecutingStateIdsSearchAttributeFinally should only be called at the end of workflow

func (*StateExecutionCounter) CreateNextExecutionId added in v1.3.0

func (e *StateExecutionCounter) CreateNextExecutionId(stateId string) string

func (*StateExecutionCounter) Dump added in v1.3.0

func (*StateExecutionCounter) GetTotalCurrentlyExecutingCount added in v1.5.0

func (e *StateExecutionCounter) GetTotalCurrentlyExecutingCount() int

func (*StateExecutionCounter) MarkStateExecutionCompleted added in v1.3.0

func (e *StateExecutionCounter) MarkStateExecutionCompleted(state iwfidl.StateMovement) error

func (*StateExecutionCounter) MarkStateIdExecutingIfNotYet added in v1.5.0

func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRequest) error

type StateRequest added in v1.5.0

type StateRequest struct {
	// contains filtered or unexported fields
}

func NewStateResumeRequest added in v1.5.0

func NewStateResumeRequest(resumeRequest service.StateExecutionResumeInfo) StateRequest

func NewStateStartRequest added in v1.5.0

func NewStateStartRequest(movement iwfidl.StateMovement) StateRequest

func (StateRequest) GetStateId added in v1.5.0

func (sq StateRequest) GetStateId() string

func (StateRequest) GetStateMovement added in v1.8.0

func (sq StateRequest) GetStateMovement() iwfidl.StateMovement

func (StateRequest) GetStateResumeRequest added in v1.5.0

func (sq StateRequest) GetStateResumeRequest() service.StateExecutionResumeInfo

func (StateRequest) GetStateStartRequest added in v1.5.0

func (sq StateRequest) GetStateStartRequest() iwfidl.StateMovement

func (StateRequest) IsResumeRequest added in v1.5.0

func (sq StateRequest) IsResumeRequest() bool

type StateRequestQueue added in v1.5.0

type StateRequestQueue struct {
	// contains filtered or unexported fields
}

func NewStateRequestQueue added in v1.5.0

func NewStateRequestQueue() *StateRequestQueue

func NewStateRequestQueueWithResumeRequests added in v1.5.0

func NewStateRequestQueueWithResumeRequests(startReqs []iwfidl.StateMovement, resumeReqs map[string]service.StateExecutionResumeInfo) *StateRequestQueue

func (*StateRequestQueue) AddSingleStateStartRequest added in v1.8.0

func (srq *StateRequestQueue) AddSingleStateStartRequest(stateId string, input *iwfidl.EncodedObject, options *iwfidl.WorkflowStateOptions)

func (*StateRequestQueue) AddStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) AddStateStartRequests(reqs []iwfidl.StateMovement)

func (*StateRequestQueue) GetAllStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) GetAllStateStartRequests() []iwfidl.StateMovement

func (*StateRequestQueue) IsEmpty added in v1.5.0

func (srq *StateRequestQueue) IsEmpty() bool

func (*StateRequestQueue) TakeAll added in v1.5.0

func (srq *StateRequestQueue) TakeAll() []StateRequest

type TimerProcessor added in v1.2.0

type TimerProcessor struct {
	// contains filtered or unexported fields
}

func NewTimerProcessor added in v1.2.0

func NewTimerProcessor(ctx UnifiedContext, provider WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal) *TimerProcessor

func (*TimerProcessor) AddTimers added in v1.5.0

func (t *TimerProcessor) AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus)

func (*TimerProcessor) Dump added in v1.5.0

func (*TimerProcessor) GetCurrentTimerInfos added in v1.2.0

func (t *TimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo

func (*TimerProcessor) RemovePendingTimersOfState added in v1.5.0

func (t *TimerProcessor) RemovePendingTimersOfState(stateExeId string)

RemovePendingTimersOfState is for when a state is completed, remove all its pending timers

func (*TimerProcessor) RetryStaleSkipTimer added in v1.5.0

func (t *TimerProcessor) RetryStaleSkipTimer() bool

func (*TimerProcessor) SkipTimer added in v1.2.0

func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool

SkipTimer will attempt to skip a timer, return false if no valid timer found

func (*TimerProcessor) WaitForTimerFiredOrSkipped added in v1.5.0

func (t *TimerProcessor) WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus

WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), return true when the timer is fired or skipped return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew)

type UnifiedContext

type UnifiedContext interface {
	GetContext() interface{}
}

func NewUnifiedContext

func NewUnifiedContext(ctx interface{}) UnifiedContext

type UnifiedLogger added in v1.2.0

type UnifiedLogger interface {
	Debug(msg string, keyvals ...interface{})
	Info(msg string, keyvals ...interface{})
	Warn(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
}

type UnifiedRpcHandler added in v1.8.0

type UnifiedRpcHandler func(ctx UnifiedContext, input iwfidl.WorkflowRpcRequest) (*HandlerOutput, error)

type UnifiedRpcValidator added in v1.8.0

type UnifiedRpcValidator func(ctx UnifiedContext, input iwfidl.WorkflowRpcRequest) error

type WorkflowConfiger added in v1.5.0

type WorkflowConfiger struct {
	// contains filtered or unexported fields
}

func NewWorkflowConfiger added in v1.5.0

func NewWorkflowConfiger(config iwfidl.WorkflowConfig) *WorkflowConfiger

func (*WorkflowConfiger) Get added in v1.5.0

func (*WorkflowConfiger) SetIfPresent added in v1.5.0

func (wc *WorkflowConfiger) SetIfPresent(config iwfidl.WorkflowConfig)

type WorkflowExecution

type WorkflowExecution struct {
	ID    string
	RunID string
}

WorkflowExecution details.

type WorkflowInfo

type WorkflowInfo struct {
	WorkflowExecution        WorkflowExecution
	WorkflowStartTime        time.Time
	WorkflowExecutionTimeout time.Duration
}

WorkflowInfo information about currently executing workflow

type WorkflowProvider

type WorkflowProvider interface {
	NewApplicationError(errType string, details interface{}) error
	IsApplicationError(err error) bool
	GetWorkflowInfo(ctx UnifiedContext) WorkflowInfo
	GetSearchAttributes(
		ctx UnifiedContext, requestedSearchAttributes []iwfidl.SearchAttributeKeyAndType,
	) (map[string]iwfidl.SearchAttribute, error)
	UpsertSearchAttributes(ctx UnifiedContext, attributes map[string]interface{}) error
	UpsertMemo(ctx UnifiedContext, memo map[string]iwfidl.EncodedObject) error
	SetQueryHandler(ctx UnifiedContext, queryType string, handler interface{}) error
	SetRpcUpdateHandler(
		ctx UnifiedContext, updateType string, validator UnifiedRpcValidator, handler UnifiedRpcHandler,
	) error
	ExtendContextWithValue(parent UnifiedContext, key string, val interface{}) UnifiedContext
	GoNamed(ctx UnifiedContext, name string, f func(ctx UnifiedContext))
	GetThreadCount() int
	GetPendingThreadNames() map[string]int
	Await(ctx UnifiedContext, condition func() bool) error
	WithActivityOptions(ctx UnifiedContext, options ActivityOptions) UnifiedContext
	ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future)
	Now(ctx UnifiedContext) time.Time
	IsReplaying(ctx UnifiedContext) bool
	Sleep(ctx UnifiedContext, d time.Duration) (err error)
	NewTimer(ctx UnifiedContext, d time.Duration) Future
	GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
	GetContextValue(ctx UnifiedContext, key string) interface{}
	GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int
	GetUnhandledSignalNames(ctx UnifiedContext) []string
	GetBackendType() service.BackendType
	GetLogger(ctx UnifiedContext) UnifiedLogger
	NewInterpreterContinueAsNewError(ctx UnifiedContext, input service.InterpreterWorkflowInput) error
}

type WorkflowUpdater added in v1.8.0

type WorkflowUpdater struct {
	// contains filtered or unexported fields
}

func NewWorkflowUpdater added in v1.8.0

func NewWorkflowUpdater(ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue,
	continueAsNewer *ContinueAsNewer, continueAsNewCounter *ContinueAsNewCounter, interStateChannel *InterStateChannel, basicInfo service.BasicInfo,
) (*WorkflowUpdater, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL