history

package
v0.3.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2018 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TimerTaskStatusNone = iota
	TimerTaskStatusCreated
)

Timer task status

View Source
const (
	TimerTaskStatusCreatedStartToClose = 1 << iota
	TimerTaskStatusCreatedScheduleToStart
	TimerTaskStatusCreatedScheduleToClose
	TimerTaskStatusCreatedHeartbeat
)

Activity Timer task status

Variables

View Source
var (
	// ErrTaskRetry is the error indicating that the timer / transfer task should be retried.
	ErrTaskRetry = errors.New("passive task should retry due to condition in mutable state is not met")
	// ErrDuplicate is exported temporarily for integration test
	ErrDuplicate = errors.New("Duplicate task, completing it")
	// ErrConflict is exported temporarily for integration test
	ErrConflict = errors.New("Conditional update failed")
	// ErrMaxAttemptsExceeded is exported temporarily for integration test
	ErrMaxAttemptsExceeded = errors.New("Maximum attempts exceeded to update history")
	// ErrStaleState is the error returned during state update indicating that cached mutable state could be stale
	ErrStaleState = errors.New("Cache mutable state could potentially be stale")
	// ErrActivityTaskNotFound is the error to indicate activity task could be duplicate and activity already completed
	ErrActivityTaskNotFound = &workflow.EntityNotExistsError{Message: "Activity task not found."}
	// ErrWorkflowCompleted is the error to indicate workflow execution already completed
	ErrWorkflowCompleted = &workflow.EntityNotExistsError{Message: "Workflow execution already completed."}
	// ErrWorkflowParent is the error to parent execution is given and mismatch
	ErrWorkflowParent = &workflow.EntityNotExistsError{Message: "Workflow parent does not match."}
	// ErrDeserializingToken is the error to indicate task token is invalid
	ErrDeserializingToken = &workflow.BadRequestError{Message: "Error deserializing task token."}
	// ErrCancellationAlreadyRequested is the error indicating cancellation for target workflow is already requested
	ErrCancellationAlreadyRequested = &workflow.CancellationAlreadyRequestedError{Message: "Cancellation already requested for this workflow execution."}
	// ErrBufferedEventsLimitExceeded is the error indicating limit reached for maximum number of buffered events
	ErrBufferedEventsLimitExceeded = &workflow.LimitExceededError{Message: "Exceeded workflow execution limit for buffered events"}
	// FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy
	// for start workflow execution API
	FailedWorkflowCloseState = map[int]bool{
		persistence.WorkflowCloseStatusFailed:     true,
		persistence.WorkflowCloseStatusCanceled:   true,
		persistence.WorkflowCloseStatusTerminated: true,
		persistence.WorkflowCloseStatusTimedOut:   true,
	}
)
View Source
var (
	// ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should
	// try this task again after a small delay.
	ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "workflow execution not found"}
	// ErrRetryExistingWorkflow is returned when events are arriving out of order, and there is another workflow with same version running
	ErrRetryExistingWorkflow = &shared.RetryTaskError{Message: "workflow with same version is running"}
	// ErrRetryBufferEvents is returned when events are arriving out of order, should retry, or specify force apply
	ErrRetryBufferEvents = &shared.RetryTaskError{Message: "retry on applying buffer events"}
	// ErrRetryExecutionAlreadyStarted is returned to indicate another workflow execution already started,
	// this error can be return if we encounter race condition, i.e. terminating the target workflow while
	// the target workflow has done continue as new.
	// try this task again after a small delay.
	ErrRetryExecutionAlreadyStarted = &shared.RetryTaskError{Message: "another workflow execution is running"}
	// ErrMissingReplicationInfo is returned when replication task is missing replication information from source cluster
	ErrMissingReplicationInfo = &shared.BadRequestError{Message: "replication task is missing cluster replication info"}
	// ErrCorruptedReplicationInfo is returned when replication task has corrupted replication information from source cluster
	ErrCorruptedReplicationInfo = &shared.BadRequestError{Message: "replication task is has corrupted cluster replication info"}
)
View Source
var (
	// ErrTryLock is a temporary error that is thrown by the API
	// when it loses the race to create workflow execution context
	ErrTryLock = &workflow.InternalServiceError{Message: "Failed to acquire lock, backoff and retry"}
)

Functions

func NewService

func NewService(params *service.BootstrapParams) common.Daemon

NewService builds a new cadence-history service

Types

type Config added in v0.3.1

type Config struct {
	NumberOfShards int

	PersistenceMaxQPS dynamicconfig.FloatPropertyFn

	// HistoryCache settings
	// Change of these configs require shard restart
	HistoryCacheInitialSize dynamicconfig.IntPropertyFn
	HistoryCacheMaxSize     dynamicconfig.IntPropertyFn
	HistoryCacheTTL         dynamicconfig.DurationPropertyFn

	// ShardController settings
	RangeSizeBits        uint
	AcquireShardInterval dynamicconfig.DurationPropertyFn

	// the atrificial delay added to standby cluster's view of active cluster's time
	StandbyClusterDelay dynamicconfig.DurationPropertyFn

	// TimerQueueProcessor settings
	TimerTaskBatchSize                             dynamicconfig.IntPropertyFn
	TimerTaskWorkerCount                           dynamicconfig.IntPropertyFn
	TimerTaskMaxRetryCount                         dynamicconfig.IntPropertyFn
	TimerProcessorStartDelay                       dynamicconfig.DurationPropertyFn
	TimerProcessorFailoverStartDelay               dynamicconfig.DurationPropertyFn
	TimerProcessorGetFailureRetryCount             dynamicconfig.IntPropertyFn
	TimerProcessorCompleteTimerFailureRetryCount   dynamicconfig.IntPropertyFn
	TimerProcessorUpdateAckInterval                dynamicconfig.DurationPropertyFn
	TimerProcessorCompleteTimerInterval            dynamicconfig.DurationPropertyFn
	TimerProcessorFailoverMaxPollRPS               dynamicconfig.IntPropertyFn
	TimerProcessorMaxPollRPS                       dynamicconfig.IntPropertyFn
	TimerProcessorMaxPollInterval                  dynamicconfig.DurationPropertyFn
	TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn

	// TransferQueueProcessor settings
	TransferTaskBatchSize                              dynamicconfig.IntPropertyFn
	TransferTaskWorkerCount                            dynamicconfig.IntPropertyFn
	TransferTaskMaxRetryCount                          dynamicconfig.IntPropertyFn
	TransferProcessorStartDelay                        dynamicconfig.DurationPropertyFn
	TransferProcessorFailoverStartDelay                dynamicconfig.DurationPropertyFn
	TransferProcessorCompleteTransferFailureRetryCount dynamicconfig.IntPropertyFn
	TransferProcessorFailoverMaxPollRPS                dynamicconfig.IntPropertyFn
	TransferProcessorMaxPollRPS                        dynamicconfig.IntPropertyFn
	TransferProcessorMaxPollInterval                   dynamicconfig.DurationPropertyFn
	TransferProcessorMaxPollIntervalJitterCoefficient  dynamicconfig.FloatPropertyFn
	TransferProcessorUpdateAckInterval                 dynamicconfig.DurationPropertyFn
	TransferProcessorCompleteTransferInterval          dynamicconfig.DurationPropertyFn

	// ReplicatorQueueProcessor settings
	ReplicatorTaskBatchSize                             dynamicconfig.IntPropertyFn
	ReplicatorTaskWorkerCount                           dynamicconfig.IntPropertyFn
	ReplicatorTaskMaxRetryCount                         dynamicconfig.IntPropertyFn
	ReplicatorProcessorStartDelay                       dynamicconfig.DurationPropertyFn
	ReplicatorProcessorMaxPollRPS                       dynamicconfig.IntPropertyFn
	ReplicatorProcessorMaxPollInterval                  dynamicconfig.DurationPropertyFn
	ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
	ReplicatorProcessorUpdateAckInterval                dynamicconfig.DurationPropertyFn

	// Persistence settings
	ExecutionMgrNumConns dynamicconfig.IntPropertyFn
	HistoryMgrNumConns   dynamicconfig.IntPropertyFn

	// System Limits
	MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn

	// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
	ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
	// ShardSyncMinInterval the minimal time interval which the shard info should be sync to remote
	ShardSyncMinInterval dynamicconfig.DurationPropertyFn

	// Time to hold a poll request before returning an empty response
	// right now only used by GetMutableState
	LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithDomainFilter
}

Config represents configuration for cadence-history service

func NewConfig added in v0.3.1

func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config

NewConfig returns new service config with default values

func (*Config) GetShardID added in v0.3.3

func (config *Config) GetShardID(workflowID string) int

GetShardID return the corresponding shard ID for a given workflow ID

type Engine

type Engine interface {
	common.Daemon
	// TODO: Convert workflow.WorkflowExecution to pointer all over the place
	StartWorkflowExecution(request *h.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse,
		error)
	GetMutableState(ctx context.Context, request *h.GetMutableStateRequest) (*h.GetMutableStateResponse, error)
	DescribeMutableState(ctx context.Context, request *h.DescribeMutableStateRequest) (*h.DescribeMutableStateResponse, error)
	ResetStickyTaskList(ctx context.Context, resetRequest *h.ResetStickyTaskListRequest) (*h.ResetStickyTaskListResponse, error)
	DescribeWorkflowExecution(ctx context.Context,
		request *h.DescribeWorkflowExecutionRequest) (*workflow.DescribeWorkflowExecutionResponse, error)
	RecordDecisionTaskStarted(ctx context.Context, request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error)
	RecordActivityTaskStarted(ctx context.Context, request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error)
	RespondDecisionTaskCompleted(ctx context.Context, request *h.RespondDecisionTaskCompletedRequest) (*h.RespondDecisionTaskCompletedResponse, error)
	RespondDecisionTaskFailed(ctx context.Context, request *h.RespondDecisionTaskFailedRequest) error
	RespondActivityTaskCompleted(ctx context.Context, request *h.RespondActivityTaskCompletedRequest) error
	RespondActivityTaskFailed(ctx context.Context, request *h.RespondActivityTaskFailedRequest) error
	RespondActivityTaskCanceled(ctx context.Context, request *h.RespondActivityTaskCanceledRequest) error
	RecordActivityTaskHeartbeat(ctx context.Context, request *h.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error)
	RequestCancelWorkflowExecution(ctx context.Context, request *h.RequestCancelWorkflowExecutionRequest) error
	SignalWorkflowExecution(ctx context.Context, request *h.SignalWorkflowExecutionRequest) error
	SignalWithStartWorkflowExecution(ctx context.Context, request *h.SignalWithStartWorkflowExecutionRequest) (
		*workflow.StartWorkflowExecutionResponse, error)
	RemoveSignalMutableState(ctx context.Context, request *h.RemoveSignalMutableStateRequest) error
	TerminateWorkflowExecution(ctx context.Context, request *h.TerminateWorkflowExecutionRequest) error
	ScheduleDecisionTask(ctx context.Context, request *h.ScheduleDecisionTaskRequest) error
	RecordChildExecutionCompleted(ctx context.Context, request *h.RecordChildExecutionCompletedRequest) error
	ReplicateEvents(ctx context.Context, request *h.ReplicateEventsRequest) error
	SyncShardStatus(ctx context.Context, request *h.SyncShardStatusRequest) error
}

Engine represents an interface for managing workflow execution history.

func NewEngineWithShardContext

func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.VisibilityManager,
	matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier, publisher messaging.Producer) Engine

NewEngineWithShardContext creates an instance of history engine

type EngineFactory

type EngineFactory interface {
	CreateEngine(context ShardContext) Engine
}

EngineFactory is used to create an instance of sharded history engine

type Handler

type Handler struct {
	service.Service
	// contains filtered or unexported fields
}

Handler - Thrift handler inteface for history service

func NewHandler

func NewHandler(sVice service.Service, config *Config, shardManager persistence.ShardManager,
	metadataMgr persistence.MetadataManager, visibilityMgr persistence.VisibilityManager,
	historyMgr persistence.HistoryManager, executionMgrFactory persistence.ExecutionManagerFactory) *Handler

NewHandler creates a thrift handler for the history service

func (*Handler) CreateEngine

func (h *Handler) CreateEngine(context ShardContext) Engine

CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard

func (*Handler) DescribeHistoryHost added in v0.3.13

func (h *Handler) DescribeHistoryHost(ctx context.Context,
	request *gen.DescribeHistoryHostRequest) (*gen.DescribeHistoryHostResponse, error)

DescribeHistoryHost returns information about the internal states of a history host

func (*Handler) DescribeMutableState added in v0.3.13

DescribeMutableState - returns the internal analysis of workflow execution state

func (*Handler) DescribeWorkflowExecution added in v0.3.3

DescribeWorkflowExecution returns information about the specified workflow execution.

func (*Handler) GetMutableState added in v0.3.5

func (h *Handler) GetMutableState(ctx context.Context,
	getRequest *hist.GetMutableStateRequest) (*hist.GetMutableStateResponse, error)

GetMutableState - returns the id of the next event in the execution's history

func (*Handler) Health added in v0.3.0

func (h *Handler) Health(ctx context.Context) (*health.HealthStatus, error)

Health is for health check

func (*Handler) RecordActivityTaskHeartbeat

func (h *Handler) RecordActivityTaskHeartbeat(ctx context.Context,
	wrappedRequest *hist.RecordActivityTaskHeartbeatRequest) (*gen.RecordActivityTaskHeartbeatResponse, error)

RecordActivityTaskHeartbeat - Record Activity Task Heart beat.

func (*Handler) RecordActivityTaskStarted

func (h *Handler) RecordActivityTaskStarted(ctx context.Context,
	recordRequest *hist.RecordActivityTaskStartedRequest) (*hist.RecordActivityTaskStartedResponse, error)

RecordActivityTaskStarted - Record Activity Task started.

func (*Handler) RecordChildExecutionCompleted

func (h *Handler) RecordChildExecutionCompleted(ctx context.Context, request *hist.RecordChildExecutionCompletedRequest) error

RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent. This is mainly called by transfer queue processor during the processing of DeleteExecution task.

func (*Handler) RecordDecisionTaskStarted

func (h *Handler) RecordDecisionTaskStarted(ctx context.Context,
	recordRequest *hist.RecordDecisionTaskStartedRequest) (*hist.RecordDecisionTaskStartedResponse, error)

RecordDecisionTaskStarted - Record Decision Task started.

func (*Handler) RemoveSignalMutableState added in v0.3.6

func (h *Handler) RemoveSignalMutableState(ctx context.Context,
	wrappedRequest *hist.RemoveSignalMutableStateRequest) error

RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently used to clean execution info when signal decision finished.

func (*Handler) ReplicateEvents added in v0.3.11

func (h *Handler) ReplicateEvents(ctx context.Context, replicateRequest *hist.ReplicateEventsRequest) error

ReplicateEvents is called by processor to replicate history events for passive domains

func (*Handler) RequestCancelWorkflowExecution

func (h *Handler) RequestCancelWorkflowExecution(ctx context.Context,
	request *hist.RequestCancelWorkflowExecutionRequest) error

RequestCancelWorkflowExecution - requests cancellation of a workflow

func (*Handler) ResetStickyTaskList added in v0.3.7

func (h *Handler) ResetStickyTaskList(ctx context.Context, resetRequest *hist.ResetStickyTaskListRequest) (*hist.ResetStickyTaskListResponse, error)

ResetStickyTaskList reset the volatile information in mutable state of a given workflow. Volatile information are the information related to client, such as: 1. StickyTaskList 2. StickyScheduleToStartTimeout 3. ClientLibraryVersion 4. ClientFeatureVersion 5. ClientImpl

func (*Handler) RespondActivityTaskCanceled

func (h *Handler) RespondActivityTaskCanceled(ctx context.Context,
	wrappedRequest *hist.RespondActivityTaskCanceledRequest) error

RespondActivityTaskCanceled - records failure of an activity task

func (*Handler) RespondActivityTaskCompleted

func (h *Handler) RespondActivityTaskCompleted(ctx context.Context,
	wrappedRequest *hist.RespondActivityTaskCompletedRequest) error

RespondActivityTaskCompleted - records completion of an activity task

func (*Handler) RespondActivityTaskFailed

func (h *Handler) RespondActivityTaskFailed(ctx context.Context,
	wrappedRequest *hist.RespondActivityTaskFailedRequest) error

RespondActivityTaskFailed - records failure of an activity task

func (*Handler) RespondDecisionTaskCompleted

func (h *Handler) RespondDecisionTaskCompleted(ctx context.Context,
	wrappedRequest *hist.RespondDecisionTaskCompletedRequest) (*hist.RespondDecisionTaskCompletedResponse, error)

RespondDecisionTaskCompleted - records completion of a decision task

func (*Handler) RespondDecisionTaskFailed added in v0.3.3

func (h *Handler) RespondDecisionTaskFailed(ctx context.Context,
	wrappedRequest *hist.RespondDecisionTaskFailedRequest) error

RespondDecisionTaskFailed - failed response to decision task

func (*Handler) ScheduleDecisionTask

func (h *Handler) ScheduleDecisionTask(ctx context.Context, request *hist.ScheduleDecisionTaskRequest) error

ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts child execution without creating the decision task and then calls this API after updating the mutable state of parent execution.

func (*Handler) SignalWithStartWorkflowExecution added in v0.3.11

func (h *Handler) SignalWithStartWorkflowExecution(ctx context.Context,
	wrappedRequest *hist.SignalWithStartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error)

SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution. If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution. If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled event recorded in history, and a decision task being created for the execution

func (*Handler) SignalWorkflowExecution

func (h *Handler) SignalWorkflowExecution(ctx context.Context,
	wrappedRequest *hist.SignalWorkflowExecutionRequest) error

SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.

func (*Handler) Start

func (h *Handler) Start() error

Start starts the handler

func (*Handler) StartWorkflowExecution

func (h *Handler) StartWorkflowExecution(ctx context.Context,
	wrappedRequest *hist.StartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error)

StartWorkflowExecution - creates a new workflow execution

func (*Handler) Stop

func (h *Handler) Stop()

Stop stops the handler

func (*Handler) SyncShardStatus added in v0.3.14

func (h *Handler) SyncShardStatus(ctx context.Context, syncShardStatusRequest *hist.SyncShardStatusRequest) error

SyncShardStatus is called by processor to sync history shrad information from another cluster

func (*Handler) TerminateWorkflowExecution

func (h *Handler) TerminateWorkflowExecution(ctx context.Context,
	wrappedRequest *hist.TerminateWorkflowExecutionRequest) error

TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event in the history and immediately terminating the execution instance.

type LocalTimerGate added in v0.3.11

type LocalTimerGate interface {
	TimerGate
	// Close shutdown the timer
	Close()
}

LocalTimerGate interface

func NewLocalTimerGate added in v0.3.11

func NewLocalTimerGate() LocalTimerGate

NewLocalTimerGate create a new timer gate instance

type LocalTimerGateImpl added in v0.3.11

type LocalTimerGateImpl struct {
	// contains filtered or unexported fields
}

LocalTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature

func (*LocalTimerGateImpl) Close added in v0.3.11

func (timerGate *LocalTimerGateImpl) Close()

Close shutdown the timer

func (*LocalTimerGateImpl) FireAfter added in v0.3.11

func (timerGate *LocalTimerGateImpl) FireAfter(now time.Time) bool

FireAfter check will the timer get fired after a certain time

func (*LocalTimerGateImpl) FireChan added in v0.3.11

func (timerGate *LocalTimerGateImpl) FireChan() <-chan struct{}

FireChan return the channel which will be fired when time is up

func (*LocalTimerGateImpl) Update added in v0.3.11

func (timerGate *LocalTimerGateImpl) Update(nextTime time.Time) bool

Update update the timer gate, return true if update is a success success means timer is idle or timer is set with a sooner time to fire

type MockHistoryEngine

type MockHistoryEngine struct {
	mock.Mock
}

MockHistoryEngine is used as mock implementation for HistoryEngine

func (*MockHistoryEngine) DescribeMutableState added in v0.3.13

DescribeMutableState is mock implementation for DescribeMutableState of HistoryEngine

func (*MockHistoryEngine) DescribeWorkflowExecution added in v0.3.3

DescribeWorkflowExecution is mock implementation for DescribeWorkflowExecution of HistoryEngine

func (*MockHistoryEngine) GetMutableState added in v0.3.5

GetMutableState is mock implementation for GetMutableState of HistoryEngine

func (*MockHistoryEngine) RecordActivityTaskHeartbeat

RecordActivityTaskHeartbeat is mock implementation for RecordActivityTaskHeartbeat of HistoryEngine

func (*MockHistoryEngine) RecordActivityTaskStarted

RecordActivityTaskStarted is mock implementation for RecordActivityTaskStarted of HistoryEngine

func (*MockHistoryEngine) RecordChildExecutionCompleted

func (_m *MockHistoryEngine) RecordChildExecutionCompleted(ctx context.Context, request *gohistory.RecordChildExecutionCompletedRequest) error

RecordChildExecutionCompleted is mock implementation for CompleteChildExecution of HistoryEngine

func (*MockHistoryEngine) RecordDecisionTaskStarted

RecordDecisionTaskStarted is mock implementation for RecordDecisionTaskStarted of HistoryEngine

func (*MockHistoryEngine) RemoveSignalMutableState added in v0.3.6

func (_m *MockHistoryEngine) RemoveSignalMutableState(ctx context.Context, request *gohistory.RemoveSignalMutableStateRequest) error

RemoveSignalMutableState is mock implementation for RemoveSignalMutableState of HistoryEngine

func (*MockHistoryEngine) ReplicateEvents added in v0.3.11

func (_m *MockHistoryEngine) ReplicateEvents(ctx context.Context, request *gohistory.ReplicateEventsRequest) error

ReplicateEvents is mock implementation for ReplicateEvents of HistoryEngine

func (*MockHistoryEngine) RequestCancelWorkflowExecution

func (_m *MockHistoryEngine) RequestCancelWorkflowExecution(ctx context.Context, request *gohistory.RequestCancelWorkflowExecutionRequest) error

RequestCancelWorkflowExecution is mock implementation for RequestCancelWorkflowExecution of HistoryEngine

func (*MockHistoryEngine) ResetStickyTaskList added in v0.3.7

ResetStickyTaskList is mock implementation for ResetStickyTaskList of HistoryEngine

func (*MockHistoryEngine) RespondActivityTaskCanceled

func (_m *MockHistoryEngine) RespondActivityTaskCanceled(ctx context.Context, request *gohistory.RespondActivityTaskCanceledRequest) error

RespondActivityTaskCanceled is mock implementation for RespondActivityTaskCanceled of HistoryEngine

func (*MockHistoryEngine) RespondActivityTaskCompleted

func (_m *MockHistoryEngine) RespondActivityTaskCompleted(ctx context.Context, request *gohistory.RespondActivityTaskCompletedRequest) error

RespondActivityTaskCompleted is mock implementation for RespondActivityTaskCompleted of HistoryEngine

func (*MockHistoryEngine) RespondActivityTaskFailed

func (_m *MockHistoryEngine) RespondActivityTaskFailed(ctx context.Context, request *gohistory.RespondActivityTaskFailedRequest) error

RespondActivityTaskFailed is mock implementation for RespondActivityTaskFailed of HistoryEngine

func (*MockHistoryEngine) RespondDecisionTaskCompleted

RespondDecisionTaskCompleted is mock implementation for RespondDecisionTaskCompleted of HistoryEngine

func (*MockHistoryEngine) RespondDecisionTaskFailed added in v0.3.3

func (_m *MockHistoryEngine) RespondDecisionTaskFailed(ctx context.Context, request *gohistory.RespondDecisionTaskFailedRequest) error

RespondDecisionTaskFailed is mock implementation for RespondDecisionTaskFailed of HistoryEngine

func (*MockHistoryEngine) ScheduleDecisionTask

func (_m *MockHistoryEngine) ScheduleDecisionTask(ctx context.Context, request *gohistory.ScheduleDecisionTaskRequest) error

ScheduleDecisionTask is mock implementation for ScheduleDecisionTask of HistoryEngine

func (*MockHistoryEngine) SignalWithStartWorkflowExecution added in v0.3.11

SignalWithStartWorkflowExecution is mock implementation for SignalWithStartWorkflowExecution of HistoryEngine

func (*MockHistoryEngine) SignalWorkflowExecution

func (_m *MockHistoryEngine) SignalWorkflowExecution(ctx context.Context, request *gohistory.SignalWorkflowExecutionRequest) error

SignalWorkflowExecution is mock implementation for SignalWorkflowExecution of HistoryEngine

func (*MockHistoryEngine) Start

func (_m *MockHistoryEngine) Start()

Start is mock implementation for Start for HistoryEngine

func (*MockHistoryEngine) StartWorkflowExecution

StartWorkflowExecution is mock implementation for StartWorkflowExecution of HistoryEngine

func (*MockHistoryEngine) Stop

func (_m *MockHistoryEngine) Stop()

Stop is mock implementation for Stop of HistoryEngine

func (*MockHistoryEngine) SyncShardStatus added in v0.3.14

func (_m *MockHistoryEngine) SyncShardStatus(ctx context.Context, request *gohistory.SyncShardStatusRequest) error

SyncShardStatus is mock implementation for SyncShardStatus of HistoryEngine

func (*MockHistoryEngine) TerminateWorkflowExecution

func (_m *MockHistoryEngine) TerminateWorkflowExecution(ctx context.Context, request *gohistory.TerminateWorkflowExecutionRequest) error

TerminateWorkflowExecution is mock implementation for TerminateWorkflowExecution of HistoryEngine

type MockHistoryEngineFactory

type MockHistoryEngineFactory struct {
	mock.Mock
}

MockHistoryEngineFactory is mock implementation for HistoryEngineFactory

func (*MockHistoryEngineFactory) CreateEngine

func (_m *MockHistoryEngineFactory) CreateEngine(context ShardContext) Engine

CreateEngine is mock implementation for CreateEngine of HistoryEngineFactory

type MockProcessor added in v0.3.12

type MockProcessor struct {
	mock.Mock
}

MockProcessor is used as mock implementation for Processor

type MockQueueAckMgr added in v0.3.12

type MockQueueAckMgr struct {
	mock.Mock
}

MockQueueAckMgr is used as mock implementation for QueueAckMgr

type MockTimerQueueAckMgr added in v0.3.11

type MockTimerQueueAckMgr struct {
	mock.Mock
}

MockTimerQueueAckMgr is used as mock implementation for TimerQueueAckMgr

type MockTimerQueueProcessor added in v0.3.12

type MockTimerQueueProcessor struct {
	mock.Mock
}

MockTimerQueueProcessor is used as mock implementation for Processor

func (*MockTimerQueueProcessor) FailoverDomain added in v0.3.12

func (_m *MockTimerQueueProcessor) FailoverDomain(domainID string)

FailoverDomain is mock implementation for FailoverDomain of Processor

func (*MockTimerQueueProcessor) NotifyNewTimers added in v0.3.12

func (_m *MockTimerQueueProcessor) NotifyNewTimers(clusterName string, currentTime time.Time, timerTask []persistence.Task)

NotifyNewTimers is mock implementation for NotifyNewTimers of Processor

func (*MockTimerQueueProcessor) Start added in v0.3.12

func (_m *MockTimerQueueProcessor) Start()

Start is mock implementation for Start of Processor

func (*MockTimerQueueProcessor) Stop added in v0.3.12

func (_m *MockTimerQueueProcessor) Stop()

Stop is mock implementation for Stop of Processor

type MockTransferQueueProcessor added in v0.3.12

type MockTransferQueueProcessor struct {
	mock.Mock
}

MockTransferQueueProcessor is used as mock implementation for Processor

func (*MockTransferQueueProcessor) FailoverDomain added in v0.3.12

func (_m *MockTransferQueueProcessor) FailoverDomain(domainID string)

FailoverDomain is mock implementation for FailoverDomain of Processor

func (*MockTransferQueueProcessor) NotifyNewTask added in v0.3.12

func (_m *MockTransferQueueProcessor) NotifyNewTask(clusterName string, transferTask []persistence.Task)

NotifyNewTask is mock implementation for NotifyNewTask of Processor

func (*MockTransferQueueProcessor) Start added in v0.3.12

func (_m *MockTransferQueueProcessor) Start()

Start is mock implementation for Start of Processor

func (*MockTransferQueueProcessor) Stop added in v0.3.12

func (_m *MockTransferQueueProcessor) Stop()

Stop is mock implementation for Stop of Processor

type QueueProcessorOptions added in v0.3.11

type QueueProcessorOptions struct {
	StartDelay                       dynamicconfig.DurationPropertyFn
	BatchSize                        dynamicconfig.IntPropertyFn
	WorkerCount                      dynamicconfig.IntPropertyFn
	MaxPollRPS                       dynamicconfig.IntPropertyFn
	MaxPollInterval                  dynamicconfig.DurationPropertyFn
	MaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
	UpdateAckInterval                dynamicconfig.DurationPropertyFn
	MaxRetryCount                    dynamicconfig.IntPropertyFn
	MetricScope                      int
}

QueueProcessorOptions is options passed to queue processor implementation

type RemoteTimerGate added in v0.3.11

type RemoteTimerGate interface {
	TimerGate
	// SetCurrentTime set the current time, and additionally fire the fire chan
	// if new "current" time is after the next wake up time, return true if
	// "current" is antually updated
	SetCurrentTime(nextTime time.Time) bool
}

RemoteTimerGate interface

func NewRemoteTimerGate added in v0.3.11

func NewRemoteTimerGate() RemoteTimerGate

NewRemoteTimerGate create a new timer gate instance

type RemoteTimerGateImpl added in v0.3.11

type RemoteTimerGateImpl struct {

	// lock for timer and next wake up time
	sync.Mutex
	// contains filtered or unexported fields
}

RemoteTimerGateImpl is an timer implementation, which basically is an wrapper of golang's timer and additional feature

func (*RemoteTimerGateImpl) FireAfter added in v0.3.11

func (timerGate *RemoteTimerGateImpl) FireAfter(now time.Time) bool

FireAfter check will the timer get fired after a certain time

func (*RemoteTimerGateImpl) FireChan added in v0.3.11

func (timerGate *RemoteTimerGateImpl) FireChan() <-chan struct{}

FireChan return the channel which will be fired when time is up

func (*RemoteTimerGateImpl) SetCurrentTime added in v0.3.11

func (timerGate *RemoteTimerGateImpl) SetCurrentTime(currentTime time.Time) bool

SetCurrentTime set the current time, and additionally fire the fire chan if new "current" time is after the next wake up time, return true if "current" is actually updated

func (*RemoteTimerGateImpl) Update added in v0.3.11

func (timerGate *RemoteTimerGateImpl) Update(nextTime time.Time) bool

Update update the timer gate, return true if update is a success success means timer is idle or timer is set with a sooner time to fire

type SequenceNumberGenerator

type SequenceNumberGenerator interface {
	NextSeq() int64
}

SequenceNumberGenerator - Generates next sequence number.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents the cadence-history service

func (*Service) Start

func (s *Service) Start()

Start starts the service

func (*Service) Stop

func (s *Service) Stop()

Stop stops the service

type ShardContext

type ShardContext interface {
	GetShardID() int
	GetService() service.Service
	GetExecutionManager() persistence.ExecutionManager
	GetHistoryManager() persistence.HistoryManager
	GetDomainCache() cache.DomainCache
	GetNextTransferTaskID() (int64, error)
	GetTransferMaxReadLevel() int64
	GetTransferAckLevel() int64
	UpdateTransferAckLevel(ackLevel int64) error
	GetTransferClusterAckLevel(cluster string) int64
	UpdateTransferClusterAckLevel(cluster string, ackLevel int64) error
	GetReplicatorAckLevel() int64
	UpdateReplicatorAckLevel(ackLevel int64) error
	GetTimerAckLevel() time.Time
	UpdateTimerAckLevel(ackLevel time.Time) error
	GetTimerClusterAckLevel(cluster string) time.Time
	UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
	UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error
	DeleteTransferFailoverLevel(failoverID string) error
	GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel
	UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error
	DeleteTimerFailoverLevel(failoverID string) error
	GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel
	GetDomainNotificationVersion() int64
	UpdateDomainNotificationVersion(domainNotificationVersion int64) error
	CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
		*persistence.CreateWorkflowExecutionResponse, error)
	UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error
	ResetMutableState(request *persistence.ResetMutableStateRequest) error
	AppendHistoryEvents(request *persistence.AppendHistoryEventsRequest) error
	NotifyNewHistoryEvent(event *historyEventNotification) error
	GetConfig() *Config
	GetLogger() bark.Logger
	GetMetricsClient() metrics.Client
	GetTimeSource() common.TimeSource
	SetCurrentTime(cluster string, currentTime time.Time)
	GetCurrentTime(cluster string) time.Time
}

ShardContext represents a history engine shard

type TestBase added in v0.3.0

type TestBase struct {
	persistence.TestBase
	ShardContext *TestShardContext
}

TestBase wraps the base setup needed to create workflows over engine layer.

func (*TestBase) SetupDomains added in v0.3.11

func (s *TestBase) SetupDomains()

SetupDomains setup the domains used for testing

func (*TestBase) SetupWorkflowStore added in v0.3.0

func (s *TestBase) SetupWorkflowStore()

SetupWorkflowStore to setup workflow test base

func (*TestBase) SetupWorkflowStoreWithOptions added in v0.3.0

func (s *TestBase) SetupWorkflowStoreWithOptions(options persistence.TestBaseOptions)

SetupWorkflowStoreWithOptions to setup workflow test base

func (*TestBase) TeardownDomains added in v0.3.11

func (s *TestBase) TeardownDomains()

TeardownDomains delete the domains used for testing

type TestShardContext added in v0.3.0

type TestShardContext struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TestShardContext shard context for testing.

func (*TestShardContext) AppendHistoryEvents added in v0.3.0

func (s *TestShardContext) AppendHistoryEvents(request *persistence.AppendHistoryEventsRequest) error

AppendHistoryEvents test implementation

func (*TestShardContext) CreateWorkflowExecution added in v0.3.0

CreateWorkflowExecution test implementation

func (*TestShardContext) DeleteTimerFailoverLevel added in v0.3.14

func (s *TestShardContext) DeleteTimerFailoverLevel(failoverID string) error

DeleteTimerFailoverLevel test implementation

func (*TestShardContext) DeleteTransferFailoverLevel added in v0.3.14

func (s *TestShardContext) DeleteTransferFailoverLevel(failoverID string) error

DeleteTransferFailoverLevel test implementation

func (*TestShardContext) GetAllTimerFailoverLevels added in v0.3.14

func (s *TestShardContext) GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel

GetAllTimerFailoverLevels test implementation

func (*TestShardContext) GetAllTransferFailoverLevels added in v0.3.14

func (s *TestShardContext) GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel

GetAllTransferFailoverLevels test implementation

func (*TestShardContext) GetConfig added in v0.3.1

func (s *TestShardContext) GetConfig() *Config

GetConfig test implementation

func (*TestShardContext) GetCurrentTime added in v0.3.11

func (s *TestShardContext) GetCurrentTime(cluster string) time.Time

GetCurrentTime test implementation

func (*TestShardContext) GetDomainCache added in v0.3.5

func (s *TestShardContext) GetDomainCache() cache.DomainCache

GetDomainCache test implementation

func (*TestShardContext) GetDomainNotificationVersion added in v0.3.13

func (s *TestShardContext) GetDomainNotificationVersion() int64

GetDomainNotificationVersion test implementation

func (*TestShardContext) GetExecutionManager added in v0.3.0

func (s *TestShardContext) GetExecutionManager() persistence.ExecutionManager

GetExecutionManager test implementation

func (*TestShardContext) GetHistoryManager added in v0.3.0

func (s *TestShardContext) GetHistoryManager() persistence.HistoryManager

GetHistoryManager test implementation

func (*TestShardContext) GetLogger added in v0.3.0

func (s *TestShardContext) GetLogger() bark.Logger

GetLogger test implementation

func (*TestShardContext) GetMetricsClient added in v0.3.0

func (s *TestShardContext) GetMetricsClient() metrics.Client

GetMetricsClient test implementation

func (*TestShardContext) GetNextTransferTaskID added in v0.3.0

func (s *TestShardContext) GetNextTransferTaskID() (int64, error)

GetNextTransferTaskID test implementation

func (*TestShardContext) GetRangeID added in v0.3.0

func (s *TestShardContext) GetRangeID() int64

GetRangeID test implementation

func (*TestShardContext) GetReplicatorAckLevel added in v0.3.11

func (s *TestShardContext) GetReplicatorAckLevel() int64

GetReplicatorAckLevel test implementation

func (*TestShardContext) GetService added in v0.3.11

func (s *TestShardContext) GetService() service.Service

GetService test implementation

func (*TestShardContext) GetShardID added in v0.3.12

func (s *TestShardContext) GetShardID() int

GetShardID test implementation

func (*TestShardContext) GetTimeSource added in v0.3.0

func (s *TestShardContext) GetTimeSource() common.TimeSource

GetTimeSource test implementation

func (*TestShardContext) GetTimerAckLevel added in v0.3.0

func (s *TestShardContext) GetTimerAckLevel() time.Time

GetTimerAckLevel test implementation

func (*TestShardContext) GetTimerClusterAckLevel added in v0.3.12

func (s *TestShardContext) GetTimerClusterAckLevel(cluster string) time.Time

GetTimerClusterAckLevel test implementation

func (*TestShardContext) GetTransferAckLevel added in v0.3.0

func (s *TestShardContext) GetTransferAckLevel() int64

GetTransferAckLevel test implementation

func (*TestShardContext) GetTransferClusterAckLevel added in v0.3.12

func (s *TestShardContext) GetTransferClusterAckLevel(cluster string) int64

GetTransferClusterAckLevel test implementation

func (*TestShardContext) GetTransferMaxReadLevel added in v0.3.0

func (s *TestShardContext) GetTransferMaxReadLevel() int64

GetTransferMaxReadLevel test implementation

func (*TestShardContext) NotifyNewHistoryEvent added in v0.3.3

func (s *TestShardContext) NotifyNewHistoryEvent(event *historyEventNotification) error

NotifyNewHistoryEvent test implementation

func (*TestShardContext) Reset added in v0.3.0

func (s *TestShardContext) Reset()

Reset test implementation

func (*TestShardContext) ResetMutableState added in v0.3.12

func (s *TestShardContext) ResetMutableState(request *persistence.ResetMutableStateRequest) error

ResetMutableState test implementation

func (*TestShardContext) SetCurrentTime added in v0.3.11

func (s *TestShardContext) SetCurrentTime(cluster string, currentTime time.Time)

SetCurrentTime test implementation

func (*TestShardContext) UpdateDomainNotificationVersion added in v0.3.13

func (s *TestShardContext) UpdateDomainNotificationVersion(domainNotificationVersion int64) error

UpdateDomainNotificationVersion test implementation

func (*TestShardContext) UpdateReplicatorAckLevel added in v0.3.11

func (s *TestShardContext) UpdateReplicatorAckLevel(ackLevel int64) error

UpdateReplicatorAckLevel test implementation

func (*TestShardContext) UpdateTimerAckLevel added in v0.3.0

func (s *TestShardContext) UpdateTimerAckLevel(ackLevel time.Time) error

UpdateTimerAckLevel test implementation

func (*TestShardContext) UpdateTimerClusterAckLevel added in v0.3.12

func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error

UpdateTimerClusterAckLevel test implementation

func (*TestShardContext) UpdateTimerFailoverLevel added in v0.3.14

func (s *TestShardContext) UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error

UpdateTimerFailoverLevel test implementation

func (*TestShardContext) UpdateTransferAckLevel added in v0.3.0

func (s *TestShardContext) UpdateTransferAckLevel(ackLevel int64) error

UpdateTransferAckLevel test implementation

func (*TestShardContext) UpdateTransferClusterAckLevel added in v0.3.12

func (s *TestShardContext) UpdateTransferClusterAckLevel(cluster string, ackLevel int64) error

UpdateTransferClusterAckLevel test implementation

func (*TestShardContext) UpdateTransferFailoverLevel added in v0.3.14

func (s *TestShardContext) UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error

UpdateTransferFailoverLevel test implementation

func (*TestShardContext) UpdateWorkflowExecution added in v0.3.0

func (s *TestShardContext) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error

UpdateWorkflowExecution test implementation

type TimerGate added in v0.3.11

type TimerGate interface {
	// FireChan return the channel which will be fired when time is up
	FireChan() <-chan struct{}
	// FireAfter check will the timer get fired after a certain time
	FireAfter(now time.Time) bool
	// Update update the timer gate, return true if update is a success
	// success means timer is idle or timer is set with a sooner time to fire
	Update(nextTime time.Time) bool
}

TimerGate interface

type TimerSequenceID added in v0.3.11

type TimerSequenceID struct {
	VisibilityTimestamp time.Time
	TaskID              int64
}

TimerSequenceID - Visibility timer stamp + Sequence Number.

func (TimerSequenceID) String added in v0.3.11

func (s TimerSequenceID) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL