interpreter

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DumpWorkflowInternal added in v1.5.0

func DumpWorkflowInternal(ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest) (*iwfidl.WorkflowDumpResponse, error)

func GetSharedConfig added in v1.5.0

func GetSharedConfig() config.Config

func IsDeciderTriggerConditionMet added in v1.5.0

func IsDeciderTriggerConditionMet(
	commandReq iwfidl.CommandRequest,
	completedTimerCmds map[int]service.InternalTimerStatus,
	completedSignalCmds map[int]*iwfidl.EncodedObject,
	completedInterStateChannelCmds map[int]*iwfidl.EncodedObject,
) bool

func LastCaller added in v1.5.0

func LastCaller() string

func LoadInternalsFromPreviousRun added in v1.5.0

func LoadInternalsFromPreviousRun(ctx UnifiedContext, provider WorkflowProvider, previousRunId string, continueAsNewPageSizeInBytes int32) (*service.ContinueAsNewDumpResponse, error)

func RegisterActivityProvider

func RegisterActivityProvider(backendType service.BackendType, provider ActivityProvider)

func SetQueryHandlers added in v1.5.0

func SetQueryHandlers(ctx UnifiedContext, provider WorkflowProvider, persistenceManager *PersistenceManager, continueAsNewer *ContinueAsNewer,
	workflowConfiger *WorkflowConfiger, basicInfo service.BasicInfo) error

func SetSharedConfig added in v1.5.0

func SetSharedConfig(config config.Config)

func StateApiExecute added in v1.5.0

func StateApiWaitUntil added in v1.5.0

func StateDecide

StateDecide is deprecated. Will be removed in next release

func StateStart

StateStart is Deprecated, will be removed in next release

Types

type ActivityInfo added in v1.2.2

type ActivityInfo struct {
	ScheduledTime time.Time // Time of activity scheduled by a workflow
	Attempt       int32     // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
}

type ActivityOptions

type ActivityOptions struct {
	StartToCloseTimeout time.Duration
	RetryPolicy         *iwfidl.RetryPolicy
}

type ActivityProvider

type ActivityProvider interface {
	GetLogger(ctx context.Context) UnifiedLogger
	NewApplicationError(errType string, details interface{}) error
	GetActivityInfo(ctx context.Context) ActivityInfo
}

type ContinueAsNewCounter added in v1.5.0

type ContinueAsNewCounter struct {
	// contains filtered or unexported fields
}

func NewContinueAsCounter added in v1.5.0

func NewContinueAsCounter(configer *WorkflowConfiger, rootCtx UnifiedContext, provider WorkflowProvider) *ContinueAsNewCounter

func (*ContinueAsNewCounter) IncExecutedStateExecution added in v1.5.0

func (c *ContinueAsNewCounter) IncExecutedStateExecution(skipStart bool)

func (*ContinueAsNewCounter) IncSignalsReceived added in v1.5.0

func (c *ContinueAsNewCounter) IncSignalsReceived()

func (*ContinueAsNewCounter) IsThresholdMet added in v1.5.0

func (c *ContinueAsNewCounter) IsThresholdMet() bool

type ContinueAsNewer added in v1.3.0

type ContinueAsNewer struct {
	StateExecutionToResumeMap map[string]service.StateExecutionResumeInfo // stateExeId to StateExecutionResumeInfo
	// contains filtered or unexported fields
}

func NewContinueAsNewer added in v1.3.0

func NewContinueAsNewer(
	provider WorkflowProvider,
	interStateChannel *InterStateChannel, signalReceiver *SignalReceiver, stateExecutionCounter *StateExecutionCounter,
	persistenceManager *PersistenceManager, stateRequestQueue *StateRequestQueue, collector *OutputCollector, timerProcessor *TimerProcessor,
) *ContinueAsNewer

func (*ContinueAsNewer) AddPotentialStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) AddPotentialStateExecutionToResume(
	stateExecutionId string, state iwfidl.StateMovement, stateExecLocals []iwfidl.KeyValue, commandRequest iwfidl.CommandRequest,
	completedTimerCommands map[int]service.InternalTimerStatus, completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
)

func (*ContinueAsNewer) DrainThreads added in v1.5.0

func (c *ContinueAsNewer) DrainThreads(ctx UnifiedContext) error

func (*ContinueAsNewer) HasAnyStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) HasAnyStateExecutionToResume() bool

func (*ContinueAsNewer) RemoveStateExecutionToResume added in v1.5.0

func (c *ContinueAsNewer) RemoveStateExecutionToResume(stateExecutionId string)

func (*ContinueAsNewer) SetQueryHandlersForContinueAsNew added in v1.3.0

func (c *ContinueAsNewer) SetQueryHandlersForContinueAsNew(ctx UnifiedContext) error

type Future

type Future interface {
	Get(ctx UnifiedContext, valuePtr interface{}) error
	IsReady() bool
}

type GlobalVersioner added in v1.5.0

type GlobalVersioner struct {
	// contains filtered or unexported fields
}

GlobalVersioner see https://stackoverflow.com/questions/73941723/what-is-a-good-way-pattern-to-use-temporal-cadence-versioning-api

func NewGlobalVersioner added in v1.5.0

func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) *GlobalVersioner

func (*GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool

func (*GlobalVersioner) IsAfterVersionOfRenamedStateApi added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool

func (*GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning added in v1.5.0

func (p *GlobalVersioner) IsAfterVersionOfUsingGlobalVersioning() bool

func (*GlobalVersioner) UpsertGlobalVersionSearchAttribute added in v1.5.0

func (p *GlobalVersioner) UpsertGlobalVersionSearchAttribute() error

type InterStateChannel

type InterStateChannel struct {
	// contains filtered or unexported fields
}

func NewInterStateChannel

func NewInterStateChannel() *InterStateChannel

func RebuildInterStateChannel added in v1.2.0

func RebuildInterStateChannel(refill map[string][]*iwfidl.EncodedObject) *InterStateChannel

func (*InterStateChannel) HasData

func (i *InterStateChannel) HasData(channelName string) bool

func (*InterStateChannel) ProcessPublishing

func (i *InterStateChannel) ProcessPublishing(publishes []iwfidl.InterStateChannelPublishing)

func (*InterStateChannel) ReadReceived added in v1.3.0

func (i *InterStateChannel) ReadReceived(channelNames []string) map[string][]*iwfidl.EncodedObject

func (*InterStateChannel) Retrieve

func (i *InterStateChannel) Retrieve(channelName string) *iwfidl.EncodedObject

type OutputCollector added in v1.5.0

type OutputCollector struct {
	// contains filtered or unexported fields
}

func NewOutputCollector added in v1.5.0

func NewOutputCollector(initOutputs []iwfidl.StateCompletionOutput) *OutputCollector

func (*OutputCollector) Add added in v1.5.0

func (*OutputCollector) GetAll added in v1.5.0

type PersistenceManager

type PersistenceManager struct {
	// contains filtered or unexported fields
}

func NewPersistenceManager

func NewPersistenceManager(provider WorkflowProvider, initSearchAttributes []iwfidl.SearchAttribute) *PersistenceManager

func RebuildPersistenceManager added in v1.2.0

func RebuildPersistenceManager(provider WorkflowProvider,
	dolist []iwfidl.KeyValue, salist []iwfidl.SearchAttribute,
) *PersistenceManager

func (*PersistenceManager) GetAllDataObjects

func (am *PersistenceManager) GetAllDataObjects() []iwfidl.KeyValue

func (*PersistenceManager) GetAllSearchAttributes

func (am *PersistenceManager) GetAllSearchAttributes() []iwfidl.SearchAttribute

func (*PersistenceManager) GetDataObjectsByKey

func (*PersistenceManager) LoadDataObjects

func (am *PersistenceManager) LoadDataObjects(loadingPolicy *iwfidl.PersistenceLoadingPolicy) []iwfidl.KeyValue

func (*PersistenceManager) LoadSearchAttributes

func (am *PersistenceManager) LoadSearchAttributes(loadingPolicy *iwfidl.PersistenceLoadingPolicy) []iwfidl.SearchAttribute

func (*PersistenceManager) ProcessUpsertDataObject

func (am *PersistenceManager) ProcessUpsertDataObject(attributes []iwfidl.KeyValue) error

func (*PersistenceManager) ProcessUpsertSearchAttribute

func (am *PersistenceManager) ProcessUpsertSearchAttribute(ctx UnifiedContext, attributes []iwfidl.SearchAttribute) error

type ReceiveChannel

type ReceiveChannel interface {
	ReceiveAsync(valuePtr interface{}) (ok bool)
}

type SignalReceiver added in v1.3.0

type SignalReceiver struct {
	// contains filtered or unexported fields
}

func NewSignalReceiver added in v1.3.0

func NewSignalReceiver(ctx UnifiedContext, provider WorkflowProvider, interStateChannel *InterStateChannel, stateRequestQueue *StateRequestQueue,
	persistenceManager *PersistenceManager, tp *TimerProcessor, continueAsNewCounter *ContinueAsNewCounter, workflowConfiger *WorkflowConfiger,
	initReceivedSignals map[string][]*iwfidl.EncodedObject) *SignalReceiver

func (*SignalReceiver) DrainAllUnreceivedSignals added in v1.5.0

func (sr *SignalReceiver) DrainAllUnreceivedSignals(ctx UnifiedContext)

DrainAllUnreceivedSignals will retrieve signals that after signal handler threads are stopped, so that the signals can be carried over to next run by continueAsNew. This includes both regular user signals and system signals

func (*SignalReceiver) DumpReceived added in v1.5.0

func (sr *SignalReceiver) DumpReceived(channelNames []string) map[string][]*iwfidl.EncodedObject

func (*SignalReceiver) HasSignal added in v1.3.0

func (sr *SignalReceiver) HasSignal(channelName string) bool

func (*SignalReceiver) IsFailWorkflowRequested added in v1.5.0

func (sr *SignalReceiver) IsFailWorkflowRequested() (bool, error)

func (*SignalReceiver) Retrieve added in v1.3.0

func (sr *SignalReceiver) Retrieve(channelName string) *iwfidl.EncodedObject

type StateExecutionCounter added in v1.3.0

type StateExecutionCounter struct {
	// contains filtered or unexported fields
}

func NewStateExecutionCounter added in v1.3.0

func NewStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider, configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter) *StateExecutionCounter

func RebuildStateExecutionCounter added in v1.5.0

func RebuildStateExecutionCounter(ctx UnifiedContext, provider WorkflowProvider,
	stateIdStartedCounts map[string]int, stateIdCurrentlyExecutingCounts map[string]int, totalCurrentlyExecutingCount int,
	configer *WorkflowConfiger, continueAsNewCounter *ContinueAsNewCounter,
) *StateExecutionCounter

func (*StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally added in v1.5.0

func (e *StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally()

ClearExecutingStateIdsSearchAttributeFinally should only be called at the end of workflow

func (*StateExecutionCounter) CreateNextExecutionId added in v1.3.0

func (e *StateExecutionCounter) CreateNextExecutionId(stateId string) string

func (*StateExecutionCounter) Dump added in v1.3.0

func (*StateExecutionCounter) GetTotalCurrentlyExecutingCount added in v1.5.0

func (e *StateExecutionCounter) GetTotalCurrentlyExecutingCount() int

func (*StateExecutionCounter) MarkStateExecutionCompleted added in v1.3.0

func (e *StateExecutionCounter) MarkStateExecutionCompleted(state iwfidl.StateMovement) error

func (*StateExecutionCounter) MarkStateIdExecutingIfNotYet added in v1.5.0

func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRequest) error

type StateRequest added in v1.5.0

type StateRequest struct {
	// contains filtered or unexported fields
}

func NewStateResumeRequest added in v1.5.0

func NewStateResumeRequest(resumeRequest service.StateExecutionResumeInfo) StateRequest

func NewStateStartRequest added in v1.5.0

func NewStateStartRequest(movement iwfidl.StateMovement) StateRequest

func (StateRequest) GetStateId added in v1.5.0

func (sq StateRequest) GetStateId() string

func (StateRequest) GetStateResumeRequest added in v1.5.0

func (sq StateRequest) GetStateResumeRequest() service.StateExecutionResumeInfo

func (StateRequest) GetStateStartRequest added in v1.5.0

func (sq StateRequest) GetStateStartRequest() iwfidl.StateMovement

func (StateRequest) IsResumeRequest added in v1.5.0

func (sq StateRequest) IsResumeRequest() bool

type StateRequestQueue added in v1.5.0

type StateRequestQueue struct {
	// contains filtered or unexported fields
}

func NewStateRequestQueue added in v1.5.0

func NewStateRequestQueue() *StateRequestQueue

func NewStateRequestQueueWithResumeRequests added in v1.5.0

func NewStateRequestQueueWithResumeRequests(startReqs []iwfidl.StateMovement, resumeReqs map[string]service.StateExecutionResumeInfo) *StateRequestQueue

func (*StateRequestQueue) AddStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) AddStateStartRequests(reqs []iwfidl.StateMovement)

func (*StateRequestQueue) GetAllStateStartRequests added in v1.5.0

func (srq *StateRequestQueue) GetAllStateStartRequests() []iwfidl.StateMovement

func (*StateRequestQueue) IsEmpty added in v1.5.0

func (srq *StateRequestQueue) IsEmpty() bool

func (*StateRequestQueue) TakeAll added in v1.5.0

func (srq *StateRequestQueue) TakeAll() []StateRequest

type TimerProcessor added in v1.2.0

type TimerProcessor struct {
	// contains filtered or unexported fields
}

func NewTimerProcessor added in v1.2.0

func NewTimerProcessor(ctx UnifiedContext, provider WorkflowProvider, staleSkipTimerSignals []service.StaleSkipTimerSignal) *TimerProcessor

func (*TimerProcessor) AddTimers added in v1.5.0

func (t *TimerProcessor) AddTimers(stateExeId string, commands []iwfidl.TimerCommand, completedTimerCmds map[int]service.InternalTimerStatus)

func (*TimerProcessor) Dump added in v1.5.0

func (*TimerProcessor) GetCurrentTimerInfos added in v1.2.0

func (t *TimerProcessor) GetCurrentTimerInfos() map[string][]*service.TimerInfo

func (*TimerProcessor) RemovePendingTimersOfState added in v1.5.0

func (t *TimerProcessor) RemovePendingTimersOfState(stateExeId string)

RemovePendingTimersOfState is for when a state is completed, remove all its pending timers

func (*TimerProcessor) RetryStaleSkipTimer added in v1.5.0

func (t *TimerProcessor) RetryStaleSkipTimer() bool

func (*TimerProcessor) SkipTimer added in v1.2.0

func (t *TimerProcessor) SkipTimer(stateExeId, timerId string, timerIdx int) bool

SkipTimer will attempt to skip a timer, return false if no valid timer found

func (*TimerProcessor) WaitForTimerFiredOrSkipped added in v1.5.0

func (t *TimerProcessor) WaitForTimerFiredOrSkipped(ctx UnifiedContext, stateExeId string, timerIdx int, cancelWaiting *bool) service.InternalTimerStatus

WaitForTimerFiredOrSkipped waits for timer completed(fired or skipped), return true when the timer is fired or skipped return false if the waitingCommands is canceled by cancelWaiting bool pointer(when the trigger type is completed, or continueAsNew)

type UnifiedContext

type UnifiedContext interface {
	GetContext() interface{}
}

func NewUnifiedContext

func NewUnifiedContext(ctx interface{}) UnifiedContext

type UnifiedLogger added in v1.2.0

type UnifiedLogger interface {
	Debug(msg string, keyvals ...interface{})
	Info(msg string, keyvals ...interface{})
	Warn(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
}

type WorkflowConfiger added in v1.5.0

type WorkflowConfiger struct {
	// contains filtered or unexported fields
}

func NewWorkflowConfiger added in v1.5.0

func NewWorkflowConfiger(config iwfidl.WorkflowConfig) *WorkflowConfiger

func (*WorkflowConfiger) Get added in v1.5.0

func (*WorkflowConfiger) SetIfPresent added in v1.5.0

func (wc *WorkflowConfiger) SetIfPresent(config iwfidl.WorkflowConfig)

type WorkflowExecution

type WorkflowExecution struct {
	ID    string
	RunID string
}

WorkflowExecution details.

type WorkflowInfo

type WorkflowInfo struct {
	WorkflowExecution WorkflowExecution
	WorkflowStartTime time.Time
}

WorkflowInfo information about currently executing workflow

type WorkflowProvider

type WorkflowProvider interface {
	NewApplicationError(errType string, details interface{}) error
	IsApplicationError(err error) bool
	GetWorkflowInfo(ctx UnifiedContext) WorkflowInfo
	UpsertSearchAttributes(ctx UnifiedContext, attributes map[string]interface{}) error
	SetQueryHandler(ctx UnifiedContext, queryType string, handler interface{}) error
	ExtendContextWithValue(parent UnifiedContext, key string, val interface{}) UnifiedContext
	GoNamed(ctx UnifiedContext, name string, f func(ctx UnifiedContext))
	GetThreadCount() int
	GetPendingThreadNames() map[string]int
	Await(ctx UnifiedContext, condition func() bool) error
	WithActivityOptions(ctx UnifiedContext, options ActivityOptions) UnifiedContext
	ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future)
	Now(ctx UnifiedContext) time.Time
	Sleep(ctx UnifiedContext, d time.Duration) (err error)
	NewTimer(ctx UnifiedContext, d time.Duration) Future
	GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
	GetContextValue(ctx UnifiedContext, key string) interface{}
	GetVersion(ctx UnifiedContext, changeID string, minSupported, maxSupported int) int
	GetUnhandledSignalNames(ctx UnifiedContext) []string
	GetBackendType() service.BackendType
	GetLogger(ctx UnifiedContext) UnifiedLogger
	NewInterpreterContinueAsNewError(ctx UnifiedContext, input service.InterpreterWorkflowInput) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL