Documentation ¶
Index ¶
- Constants
- Variables
- func DeconstructTimerKey(key SequenceID) (expiryTime int64, seqNum int64)
- func NewExecutionManagerFactory(config *config.Cassandra, logger bark.Logger, mClient metrics.Client) persistence.ExecutionManagerFactory
- func NewService(params *service.BootstrapParams) common.Daemon
- type Engine
- type EngineFactory
- type Handler
- func (h *Handler) CreateEngine(context ShardContext) Engine
- func (h *Handler) GetWorkflowExecutionNextEventID(ctx thrift.Context, getRequest *hist.GetWorkflowExecutionNextEventIDRequest) (*hist.GetWorkflowExecutionNextEventIDResponse, error)
- func (h *Handler) IsHealthy(ctx thrift.Context) (bool, error)
- func (h *Handler) RecordActivityTaskHeartbeat(ctx thrift.Context, wrappedRequest *hist.RecordActivityTaskHeartbeatRequest) (*gen.RecordActivityTaskHeartbeatResponse, error)
- func (h *Handler) RecordActivityTaskStarted(ctx thrift.Context, recordRequest *hist.RecordActivityTaskStartedRequest) (*hist.RecordActivityTaskStartedResponse, error)
- func (h *Handler) RecordChildExecutionCompleted(ctx thrift.Context, request *hist.RecordChildExecutionCompletedRequest) error
- func (h *Handler) RecordDecisionTaskStarted(ctx thrift.Context, recordRequest *hist.RecordDecisionTaskStartedRequest) (*hist.RecordDecisionTaskStartedResponse, error)
- func (h *Handler) RequestCancelWorkflowExecution(ctx thrift.Context, request *hist.RequestCancelWorkflowExecutionRequest) error
- func (h *Handler) RespondActivityTaskCanceled(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskCanceledRequest) error
- func (h *Handler) RespondActivityTaskCompleted(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskCompletedRequest) error
- func (h *Handler) RespondActivityTaskFailed(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskFailedRequest) error
- func (h *Handler) RespondDecisionTaskCompleted(ctx thrift.Context, wrappedRequest *hist.RespondDecisionTaskCompletedRequest) error
- func (h *Handler) ScheduleDecisionTask(ctx thrift.Context, request *hist.ScheduleDecisionTaskRequest) error
- func (h *Handler) SignalWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.SignalWorkflowExecutionRequest) error
- func (h *Handler) Start(thriftService []thrift.TChanServer) error
- func (h *Handler) StartWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.StartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error)
- func (h *Handler) Stop()
- func (h *Handler) TerminateWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.TerminateWorkflowExecutionRequest) error
- type MockHistoryEngine
- func (_m *MockHistoryEngine) GetWorkflowExecutionNextEventID(request *gohistory.GetWorkflowExecutionNextEventIDRequest) (*gohistory.GetWorkflowExecutionNextEventIDResponse, error)
- func (_m *MockHistoryEngine) RecordActivityTaskHeartbeat(request *gohistory.RecordActivityTaskHeartbeatRequest) (*shared.RecordActivityTaskHeartbeatResponse, error)
- func (_m *MockHistoryEngine) RecordActivityTaskStarted(request *gohistory.RecordActivityTaskStartedRequest) (*gohistory.RecordActivityTaskStartedResponse, error)
- func (_m *MockHistoryEngine) RecordChildExecutionCompleted(request *gohistory.RecordChildExecutionCompletedRequest) error
- func (_m *MockHistoryEngine) RecordDecisionTaskStarted(request *gohistory.RecordDecisionTaskStartedRequest) (*gohistory.RecordDecisionTaskStartedResponse, error)
- func (_m *MockHistoryEngine) RequestCancelWorkflowExecution(request *gohistory.RequestCancelWorkflowExecutionRequest) error
- func (_m *MockHistoryEngine) RespondActivityTaskCanceled(request *gohistory.RespondActivityTaskCanceledRequest) error
- func (_m *MockHistoryEngine) RespondActivityTaskCompleted(request *gohistory.RespondActivityTaskCompletedRequest) error
- func (_m *MockHistoryEngine) RespondActivityTaskFailed(request *gohistory.RespondActivityTaskFailedRequest) error
- func (_m *MockHistoryEngine) RespondDecisionTaskCompleted(request *gohistory.RespondDecisionTaskCompletedRequest) error
- func (_m *MockHistoryEngine) ScheduleDecisionTask(request *gohistory.ScheduleDecisionTaskRequest) error
- func (_m *MockHistoryEngine) SignalWorkflowExecution(request *gohistory.SignalWorkflowExecutionRequest) error
- func (_m *MockHistoryEngine) Start()
- func (_m *MockHistoryEngine) StartWorkflowExecution(request *gohistory.StartWorkflowExecutionRequest) (*shared.StartWorkflowExecutionResponse, error)
- func (_m *MockHistoryEngine) Stop()
- func (_m *MockHistoryEngine) TerminateWorkflowExecution(request *gohistory.TerminateWorkflowExecutionRequest) error
- type MockHistoryEngineFactory
- type SequenceID
- type SequenceNumberGenerator
- type Service
- type ShardContext
Constants ¶
const ( TimerQueueSeqNumBits = 26 // For timer-queues, use 38 bits of (expiry) timestamp, 26 bits of seqnum TimerQueueSeqNumBitmask = (int64(1) << TimerQueueSeqNumBits) - 1 TimerQueueTimeStampBitmask = math.MaxInt64 &^ TimerQueueSeqNumBitmask SeqNumMax = math.MaxInt64 & TimerQueueSeqNumBitmask // The max allowed seqnum (subject to mode-specific bitmask) MinTimerKey SequenceID = -1 MaxTimerKey SequenceID = math.MaxInt64 DefaultScheduleToStartActivityTimeoutInSecs = 10 DefaultScheduleToCloseActivityTimeoutInSecs = 10 DefaultStartToCloseActivityTimeoutInSecs = 10 )
Timer constansts
Variables ¶
var ( // ErrDuplicate is exported temporarily for integration test ErrDuplicate = errors.New("Duplicate task, completing it") // ErrConflict is exported temporarily for integration test ErrConflict = errors.New("Conditional update failed") // ErrMaxAttemptsExceeded is exported temporarily for integration test ErrMaxAttemptsExceeded = errors.New("Maximum attempts exceeded to update history") )
var ( // ErrTryLock is a temporary error that is thrown by the API // when it loses the race to create workflow execution context ErrTryLock = &workflow.InternalServiceError{Message: "Failed to acquire lock, backoff and retry"} )
Functions ¶
func DeconstructTimerKey ¶
func DeconstructTimerKey(key SequenceID) (expiryTime int64, seqNum int64)
DeconstructTimerKey decomoposes a unique sequence number to an expiry and sequence number.
func NewExecutionManagerFactory ¶
func NewExecutionManagerFactory(config *config.Cassandra, logger bark.Logger, mClient metrics.Client) persistence.ExecutionManagerFactory
NewExecutionManagerFactory builds and returns a factory object
func NewService ¶
func NewService(params *service.BootstrapParams) common.Daemon
NewService builds a new cadence-history service
Types ¶
type Engine ¶
type Engine interface { common.Daemon // TODO: Convert workflow.WorkflowExecution to pointer all over the place StartWorkflowExecution(request *h.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) GetWorkflowExecutionNextEventID( request *h.GetWorkflowExecutionNextEventIDRequest) (*h.GetWorkflowExecutionNextEventIDResponse, error) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) RecordActivityTaskStarted(request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error) RespondDecisionTaskCompleted(request *h.RespondDecisionTaskCompletedRequest) error RespondActivityTaskCompleted(request *h.RespondActivityTaskCompletedRequest) error RespondActivityTaskFailed(request *h.RespondActivityTaskFailedRequest) error RespondActivityTaskCanceled(request *h.RespondActivityTaskCanceledRequest) error RecordActivityTaskHeartbeat(request *h.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error) RequestCancelWorkflowExecution(request *h.RequestCancelWorkflowExecutionRequest) error SignalWorkflowExecution(request *h.SignalWorkflowExecutionRequest) error TerminateWorkflowExecution(request *h.TerminateWorkflowExecutionRequest) error ScheduleDecisionTask(request *h.ScheduleDecisionTaskRequest) error RecordChildExecutionCompleted(request *h.RecordChildExecutionCompletedRequest) error }
Engine represents an interface for managing workflow execution history.
func NewEngineWithShardContext ¶
func NewEngineWithShardContext(shard ShardContext, metadataMgr persistence.MetadataManager, visibilityMgr persistence.VisibilityManager, matching matching.Client, historyClient hc.Client) Engine
NewEngineWithShardContext creates an instance of history engine
type EngineFactory ¶
type EngineFactory interface {
CreateEngine(context ShardContext) Engine
}
EngineFactory is used to create an instance of sharded history engine
type Handler ¶
Handler - Thrift handler inteface for history service
func NewHandler ¶
func NewHandler(sVice service.Service, shardManager persistence.ShardManager, metadataMgr persistence.MetadataManager, visibilityMgr persistence.VisibilityManager, historyMgr persistence.HistoryManager, executionMgrFactory persistence.ExecutionManagerFactory, numberOfShards int) (*Handler, []thrift.TChanServer)
NewHandler creates a thrift handler for the history service
func (*Handler) CreateEngine ¶
func (h *Handler) CreateEngine(context ShardContext) Engine
CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
func (*Handler) GetWorkflowExecutionNextEventID ¶
func (h *Handler) GetWorkflowExecutionNextEventID(ctx thrift.Context, getRequest *hist.GetWorkflowExecutionNextEventIDRequest) (*hist.GetWorkflowExecutionNextEventIDResponse, error)
GetWorkflowExecutionNextEventID - returns the id of the next event in the execution's history
func (*Handler) RecordActivityTaskHeartbeat ¶
func (h *Handler) RecordActivityTaskHeartbeat(ctx thrift.Context, wrappedRequest *hist.RecordActivityTaskHeartbeatRequest) (*gen.RecordActivityTaskHeartbeatResponse, error)
RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
func (*Handler) RecordActivityTaskStarted ¶
func (h *Handler) RecordActivityTaskStarted(ctx thrift.Context, recordRequest *hist.RecordActivityTaskStartedRequest) (*hist.RecordActivityTaskStartedResponse, error)
RecordActivityTaskStarted - Record Activity Task started.
func (*Handler) RecordChildExecutionCompleted ¶
func (h *Handler) RecordChildExecutionCompleted(ctx thrift.Context, request *hist.RecordChildExecutionCompletedRequest) error
RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent. This is mainly called by transfer queue processor during the processing of DeleteExecution task.
func (*Handler) RecordDecisionTaskStarted ¶
func (h *Handler) RecordDecisionTaskStarted(ctx thrift.Context, recordRequest *hist.RecordDecisionTaskStartedRequest) (*hist.RecordDecisionTaskStartedResponse, error)
RecordDecisionTaskStarted - Record Decision Task started.
func (*Handler) RequestCancelWorkflowExecution ¶
func (h *Handler) RequestCancelWorkflowExecution(ctx thrift.Context, request *hist.RequestCancelWorkflowExecutionRequest) error
RequestCancelWorkflowExecution - requests cancellation of a workflow
func (*Handler) RespondActivityTaskCanceled ¶
func (h *Handler) RespondActivityTaskCanceled(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskCanceledRequest) error
RespondActivityTaskCanceled - records failure of an activity task
func (*Handler) RespondActivityTaskCompleted ¶
func (h *Handler) RespondActivityTaskCompleted(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskCompletedRequest) error
RespondActivityTaskCompleted - records completion of an activity task
func (*Handler) RespondActivityTaskFailed ¶
func (h *Handler) RespondActivityTaskFailed(ctx thrift.Context, wrappedRequest *hist.RespondActivityTaskFailedRequest) error
RespondActivityTaskFailed - records failure of an activity task
func (*Handler) RespondDecisionTaskCompleted ¶
func (h *Handler) RespondDecisionTaskCompleted(ctx thrift.Context, wrappedRequest *hist.RespondDecisionTaskCompletedRequest) error
RespondDecisionTaskCompleted - records completion of a decision task
func (*Handler) ScheduleDecisionTask ¶
func (h *Handler) ScheduleDecisionTask(ctx thrift.Context, request *hist.ScheduleDecisionTaskRequest) error
ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts child execution without creating the decision task and then calls this API after updating the mutable state of parent execution.
func (*Handler) SignalWorkflowExecution ¶
func (h *Handler) SignalWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.SignalWorkflowExecutionRequest) error
SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
func (*Handler) Start ¶
func (h *Handler) Start(thriftService []thrift.TChanServer) error
Start starts the handler
func (*Handler) StartWorkflowExecution ¶
func (h *Handler) StartWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.StartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error)
StartWorkflowExecution - creates a new workflow execution
func (*Handler) TerminateWorkflowExecution ¶
func (h *Handler) TerminateWorkflowExecution(ctx thrift.Context, wrappedRequest *hist.TerminateWorkflowExecutionRequest) error
TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event in the history and immediately terminating the execution instance.
type MockHistoryEngine ¶
MockHistoryEngine is used as mock implementation for HistoryEngine
func (*MockHistoryEngine) GetWorkflowExecutionNextEventID ¶
func (_m *MockHistoryEngine) GetWorkflowExecutionNextEventID(request *gohistory.GetWorkflowExecutionNextEventIDRequest) (*gohistory.GetWorkflowExecutionNextEventIDResponse, error)
GetWorkflowExecutionNextEventID is mock implementation for GetWorkflowExecutionNextEventID of HistoryEngine
func (*MockHistoryEngine) RecordActivityTaskHeartbeat ¶
func (_m *MockHistoryEngine) RecordActivityTaskHeartbeat(request *gohistory.RecordActivityTaskHeartbeatRequest) (*shared.RecordActivityTaskHeartbeatResponse, error)
RecordActivityTaskHeartbeat is mock implementation for RecordActivityTaskHeartbeat of HistoryEngine
func (*MockHistoryEngine) RecordActivityTaskStarted ¶
func (_m *MockHistoryEngine) RecordActivityTaskStarted(request *gohistory.RecordActivityTaskStartedRequest) (*gohistory.RecordActivityTaskStartedResponse, error)
RecordActivityTaskStarted is mock implementation for RecordActivityTaskStarted of HistoryEngine
func (*MockHistoryEngine) RecordChildExecutionCompleted ¶
func (_m *MockHistoryEngine) RecordChildExecutionCompleted(request *gohistory.RecordChildExecutionCompletedRequest) error
RecordChildExecutionCompleted is mock implementation for CompleteChildExecution of HistoryEngine
func (*MockHistoryEngine) RecordDecisionTaskStarted ¶
func (_m *MockHistoryEngine) RecordDecisionTaskStarted(request *gohistory.RecordDecisionTaskStartedRequest) (*gohistory.RecordDecisionTaskStartedResponse, error)
RecordDecisionTaskStarted is mock implementation for RecordDecisionTaskStarted of HistoryEngine
func (*MockHistoryEngine) RequestCancelWorkflowExecution ¶
func (_m *MockHistoryEngine) RequestCancelWorkflowExecution(request *gohistory.RequestCancelWorkflowExecutionRequest) error
RequestCancelWorkflowExecution is mock implementation for RequestCancelWorkflowExecution of HistoryEngine
func (*MockHistoryEngine) RespondActivityTaskCanceled ¶
func (_m *MockHistoryEngine) RespondActivityTaskCanceled(request *gohistory.RespondActivityTaskCanceledRequest) error
RespondActivityTaskCanceled is mock implementation for RespondActivityTaskCanceled of HistoryEngine
func (*MockHistoryEngine) RespondActivityTaskCompleted ¶
func (_m *MockHistoryEngine) RespondActivityTaskCompleted(request *gohistory.RespondActivityTaskCompletedRequest) error
RespondActivityTaskCompleted is mock implementation for RespondActivityTaskCompleted of HistoryEngine
func (*MockHistoryEngine) RespondActivityTaskFailed ¶
func (_m *MockHistoryEngine) RespondActivityTaskFailed(request *gohistory.RespondActivityTaskFailedRequest) error
RespondActivityTaskFailed is mock implementation for RespondActivityTaskFailed of HistoryEngine
func (*MockHistoryEngine) RespondDecisionTaskCompleted ¶
func (_m *MockHistoryEngine) RespondDecisionTaskCompleted(request *gohistory.RespondDecisionTaskCompletedRequest) error
RespondDecisionTaskCompleted is mock implementation for RespondDecisionTaskCompleted of HistoryEngine
func (*MockHistoryEngine) ScheduleDecisionTask ¶
func (_m *MockHistoryEngine) ScheduleDecisionTask(request *gohistory.ScheduleDecisionTaskRequest) error
ScheduleDecisionTask is mock implementation for ScheduleDecisionTask of HistoryEngine
func (*MockHistoryEngine) SignalWorkflowExecution ¶
func (_m *MockHistoryEngine) SignalWorkflowExecution(request *gohistory.SignalWorkflowExecutionRequest) error
SignalWorkflowExecution is mock implementation for SignalWorkflowExecution of HistoryEngine
func (*MockHistoryEngine) Start ¶
func (_m *MockHistoryEngine) Start()
Start is mock implementation for Start for HistoryEngine
func (*MockHistoryEngine) StartWorkflowExecution ¶
func (_m *MockHistoryEngine) StartWorkflowExecution(request *gohistory.StartWorkflowExecutionRequest) (*shared.StartWorkflowExecutionResponse, error)
StartWorkflowExecution is mock implementation for StartWorkflowExecution of HistoryEngine
func (*MockHistoryEngine) Stop ¶
func (_m *MockHistoryEngine) Stop()
Stop is mock implementation for Stop of HistoryEngine
func (*MockHistoryEngine) TerminateWorkflowExecution ¶
func (_m *MockHistoryEngine) TerminateWorkflowExecution(request *gohistory.TerminateWorkflowExecutionRequest) error
TerminateWorkflowExecution is mock implementation for TerminateWorkflowExecution of HistoryEngine
type MockHistoryEngineFactory ¶
MockHistoryEngineFactory is mock implementation for HistoryEngineFactory
func (*MockHistoryEngineFactory) CreateEngine ¶
func (_m *MockHistoryEngineFactory) CreateEngine(context ShardContext) Engine
CreateEngine is mock implementation for CreateEngine of HistoryEngineFactory
type SequenceID ¶
type SequenceID int64
SequenceID - Visibility timer stamp + Sequence Number.
func ConstructTimerKey ¶
func ConstructTimerKey(expiryTime int64, seqNum int64) SequenceID
ConstructTimerKey forms a unique sequence number given a expiry and sequence number.
func (SequenceID) String ¶
func (s SequenceID) String() string
type SequenceNumberGenerator ¶
type SequenceNumberGenerator interface {
NextSeq() int64
}
SequenceNumberGenerator - Generates next sequence number.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service represents the cadence-history service
type ShardContext ¶
type ShardContext interface { GetExecutionManager() persistence.ExecutionManager GetHistoryManager() persistence.HistoryManager GetNextTransferTaskID() (int64, error) GetTransferSequenceNumber() int64 GetTransferMaxReadLevel() int64 GetTransferAckLevel() int64 UpdateAckLevel(ackLevel int64) error GetTimerSequenceNumber() int64 CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) ( *persistence.CreateWorkflowExecutionResponse, error) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error AppendHistoryEvents(request *persistence.AppendHistoryEventsRequest) error GetLogger() bark.Logger GetMetricsClient() metrics.Client }
ShardContext represents a history engine shard
Source Files ¶
- MockHistoryEngine.go
- MockHistoryEngineFactory.go
- execMgrFactory.go
- handler.go
- historyBuilder.go
- historyCache.go
- historyEngine.go
- historyEngineInterfaces.go
- jsonHistoryEventSerializer.go
- mutableStateBuilder.go
- service.go
- shardContext.go
- shardController.go
- timerBuilder.go
- timerQueueProcessor.go
- transferQueueProcessor.go
- workflowExecutionContext.go