Documentation ¶
Index ¶
- Constants
- func GetDefaultHistoryVersion() int
- func GetMaxSupportedHistoryVersion() int
- func NewHistoryVersionCompatibilityError(required int, supported int) error
- func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
- func SetDefaultHistoryVersion(version int)
- func SetMaxSupportedHistoryVersion(version int)
- type ActivityInfo
- type ActivityTask
- type ActivityTimeoutTask
- type AppendHistoryEventsRequest
- type CancelExecutionTask
- type CassandraTestCluster
- type ChildExecutionInfo
- type CompleteTaskRequest
- type CompleteTimerTaskRequest
- type CompleteTransferTaskRequest
- type ConditionFailedError
- type CreateDomainRequest
- type CreateDomainResponse
- type CreateShardRequest
- type CreateTaskInfo
- type CreateTasksRequest
- type CreateTasksResponse
- type CreateWorkflowExecutionRequest
- type CreateWorkflowExecutionResponse
- type DecisionTask
- type DecisionTimeoutTask
- type DeleteDomainByNameRequest
- type DeleteDomainRequest
- type DeleteExecutionTask
- type DeleteWorkflowExecutionHistoryRequest
- type DeleteWorkflowExecutionRequest
- type DomainConfig
- type DomainInfo
- type ExecutionManager
- type ExecutionManagerFactory
- type GetCurrentExecutionRequest
- type GetCurrentExecutionResponse
- type GetDomainRequest
- type GetDomainResponse
- type GetShardRequest
- type GetShardResponse
- type GetTasksRequest
- type GetTasksResponse
- type GetTimerIndexTasksRequest
- type GetTimerIndexTasksResponse
- type GetTransferTasksRequest
- type GetTransferTasksResponse
- type GetWorkflowExecutionHistoryRequest
- type GetWorkflowExecutionHistoryResponse
- type GetWorkflowExecutionRequest
- type GetWorkflowExecutionResponse
- type HistoryDeserializationError
- type HistoryEventBatch
- type HistoryManager
- type HistorySerializationError
- type HistorySerializer
- type HistorySerializerFactory
- type HistoryVersionCompatibilityError
- type LeaseTaskListRequest
- type LeaseTaskListResponse
- type ListClosedWorkflowExecutionsByStatusRequest
- type ListWorkflowExecutionsByTypeRequest
- type ListWorkflowExecutionsByWorkflowIDRequest
- type ListWorkflowExecutionsRequest
- type ListWorkflowExecutionsResponse
- type MetadataManager
- type RecordWorkflowExecutionClosedRequest
- type RecordWorkflowExecutionStartedRequest
- type SerializedHistoryEventBatch
- type ShardAlreadyExistError
- type ShardInfo
- type ShardManager
- type ShardOwnershipLostError
- type StartChildExecutionTask
- type Task
- type TaskInfo
- type TaskListInfo
- type TaskManager
- type TestBase
- func (s *TestBase) ClearTransferQueue()
- func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
- func (s *TestBase) CompleteTransferTask(taskID int64) error
- func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, ...) ([]int64, error)
- func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (int64, error)
- func (s *TestBase) CreateShard(shardID int, owner string, rangeID int64) error
- func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (string, error)
- func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
- func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
- func (s *TestBase) GetCurrentWorkflow(domainID, workflowID string) (string, error)
- func (s *TestBase) GetNextSequenceNumber() int64
- func (s *TestBase) GetReadLevel() int64
- func (s *TestBase) GetShard(shardID int) (*ShardInfo, error)
- func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
- func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskInfo, error)
- func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
- func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) (*WorkflowMutableState, error)
- func (s *TestBase) SetupWorkflowStore()
- func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
- func (s *TestBase) TearDownWorkflowStore()
- func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) error
- func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
- func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- type TestBaseOptions
- type TimeoutError
- type TimerInfo
- type TimerTaskInfo
- type TransferTaskInfo
- type UnknownEncodingTypeError
- type UpdateDomainRequest
- type UpdateShardRequest
- type UpdateTaskListRequest
- type UpdateTaskListResponse
- type UpdateWorkflowExecutionRequest
- type UserTimerTask
- type VisibilityManager
- type WorkflowExecutionInfo
- type WorkflowMutableState
Constants ¶
const ( DomainStatusRegistered = iota DomainStatusDeprecated DomainStatusDeleted )
Domain status
const ( WorkflowStateCreated = iota WorkflowStateRunning WorkflowStateCompleted )
Workflow execution states
const ( WorkflowCloseStatusNone = iota WorkflowCloseStatusCompleted WorkflowCloseStatusFailed WorkflowCloseStatusCanceled WorkflowCloseStatusTerminated WorkflowCloseStatusContinuedAsNew WorkflowCloseStatusTimedOut )
Workflow execution close status
const ( TaskListTypeDecision = iota TaskListTypeActivity )
Types of task lists
const ( TransferTaskTypeDecisionTask = iota TransferTaskTypeActivityTask TransferTaskTypeDeleteExecution TransferTaskTypeCancelExecution TransferTaskTypeStartChildExecution )
Transfer task types
const ( TaskTypeDecisionTimeout = iota TaskTypeActivityTimeout TaskTypeUserTimer )
Types of timers
const ( // DefaultEncodingType is the default encoding format for persisted history DefaultEncodingType = common.EncodingTypeJSON )
Variables ¶
This section is empty.
Functions ¶
func GetDefaultHistoryVersion ¶
func GetDefaultHistoryVersion() int
GetDefaultHistoryVersion returns the default history version
func GetMaxSupportedHistoryVersion ¶
func GetMaxSupportedHistoryVersion() int
GetMaxSupportedHistoryVersion returns the max supported version
func NewHistoryVersionCompatibilityError ¶
NewHistoryVersionCompatibilityError returns a new instance of compatibility error type
func NewUnknownEncodingTypeError ¶
func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
NewUnknownEncodingTypeError returns a new instance of encoding type error
func SetDefaultHistoryVersion ¶
func SetDefaultHistoryVersion(version int)
SetDefaultHistoryVersion resets the default history version only intended for integration test
func SetMaxSupportedHistoryVersion ¶
func SetMaxSupportedHistoryVersion(version int)
SetMaxSupportedHistoryVersion resets the max supported history version this method is only intended for integration test
Types ¶
type ActivityInfo ¶
type ActivityInfo struct { ScheduleID int64 ScheduledEvent []byte StartedID int64 StartedEvent []byte ActivityID string RequestID string Details []byte ScheduleToStartTimeout int32 ScheduleToCloseTimeout int32 StartToCloseTimeout int32 HeartbeatTimeout int32 CancelRequested bool CancelRequestID int64 LastHeartBeatUpdatedTime time.Time }
ActivityInfo details.
type ActivityTask ¶
ActivityTask identifies a transfer task for activity
func (*ActivityTask) GetTaskID ¶
func (a *ActivityTask) GetTaskID() int64
GetTaskID returns the sequence ID of the activity task
func (*ActivityTask) GetType ¶
func (a *ActivityTask) GetType() int
GetType returns the type of the activity task
func (*ActivityTask) SetTaskID ¶
func (a *ActivityTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the activity task
type ActivityTimeoutTask ¶
ActivityTimeoutTask identifies a timeout task.
func (*ActivityTimeoutTask) GetTaskID ¶
func (a *ActivityTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*ActivityTimeoutTask) GetType ¶
func (a *ActivityTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*ActivityTimeoutTask) SetTaskID ¶
func (a *ActivityTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
type AppendHistoryEventsRequest ¶
type AppendHistoryEventsRequest struct { DomainID string Execution workflow.WorkflowExecution FirstEventID int64 RangeID int64 TransactionID int64 Events *SerializedHistoryEventBatch Overwrite bool }
AppendHistoryEventsRequest is used to append new events to workflow execution history
type CancelExecutionTask ¶
type CancelExecutionTask struct { TaskID int64 TargetDomainID string TargetWorkflowID string TargetRunID string ScheduleID int64 }
CancelExecutionTask identifies a transfer task for cancel of execution
func (*CancelExecutionTask) GetTaskID ¶
func (u *CancelExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*CancelExecutionTask) GetType ¶
func (u *CancelExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*CancelExecutionTask) SetTaskID ¶
func (u *CancelExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type CassandraTestCluster ¶
type CassandraTestCluster struct {
// contains filtered or unexported fields
}
CassandraTestCluster allows executing cassandra operations in testing.
type ChildExecutionInfo ¶
type ChildExecutionInfo struct { InitiatedID int64 InitiatedEvent []byte StartedID int64 StartedEvent []byte CreateRequestID string }
ChildExecutionInfo has details for pending child executions.
type CompleteTaskRequest ¶
type CompleteTaskRequest struct { TaskList *TaskListInfo TaskID int64 }
CompleteTaskRequest is used to complete a task
type CompleteTimerTaskRequest ¶
type CompleteTimerTaskRequest struct {
TaskID int64
}
CompleteTimerTaskRequest is used to complete a task in the timer task queue
type CompleteTransferTaskRequest ¶
type CompleteTransferTaskRequest struct {
TaskID int64
}
CompleteTransferTaskRequest is used to complete a task in the transfer task queue
type ConditionFailedError ¶
type ConditionFailedError struct {
Msg string
}
ConditionFailedError represents a failed conditional put
func (*ConditionFailedError) Error ¶
func (e *ConditionFailedError) Error() string
type CreateDomainRequest ¶
type CreateDomainRequest struct { Name string Status int Description string OwnerEmail string Retention int32 EmitMetric bool }
CreateDomainRequest is used to create the domain
type CreateDomainResponse ¶
type CreateDomainResponse struct {
ID string
}
CreateDomainResponse is the response for CreateDomain
type CreateShardRequest ¶
type CreateShardRequest struct {
ShardInfo *ShardInfo
}
CreateShardRequest is used to create a shard in executions table
type CreateTaskInfo ¶
type CreateTaskInfo struct { Execution workflow.WorkflowExecution Data *TaskInfo TaskID int64 }
CreateTaskInfo describes a task to be created in CreateTasksRequest
type CreateTasksRequest ¶
type CreateTasksRequest struct { DomainID string TaskList string TaskListType int RangeID int64 Tasks []*CreateTaskInfo }
CreateTasksRequest is used to create a new task for a workflow exectution
type CreateTasksResponse ¶
type CreateTasksResponse struct { }
CreateTasksResponse is the response to CreateTasksRequest
type CreateWorkflowExecutionRequest ¶
type CreateWorkflowExecutionRequest struct { RequestID string DomainID string Execution workflow.WorkflowExecution ParentDomainID string ParentExecution *workflow.WorkflowExecution InitiatedID int64 TaskList string WorkflowTypeName string DecisionTimeoutValue int32 ExecutionContext []byte NextEventID int64 LastProcessedEvent int64 TransferTasks []Task TimerTasks []Task RangeID int64 DecisionScheduleID int64 DecisionStartedID int64 DecisionStartToCloseTimeout int32 ContinueAsNew bool }
CreateWorkflowExecutionRequest is used to write a new workflow execution
type CreateWorkflowExecutionResponse ¶
type CreateWorkflowExecutionResponse struct {
TaskID string
}
CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
type DecisionTask ¶
DecisionTask identifies a transfer task for decision
func (*DecisionTask) GetTaskID ¶
func (d *DecisionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the decision task.
func (*DecisionTask) GetType ¶
func (d *DecisionTask) GetType() int
GetType returns the type of the decision task
func (*DecisionTask) SetTaskID ¶
func (d *DecisionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the decision task
type DecisionTimeoutTask ¶
DecisionTimeoutTask identifies a timeout task.
func (*DecisionTimeoutTask) GetTaskID ¶
func (d *DecisionTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*DecisionTimeoutTask) GetType ¶
func (d *DecisionTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*DecisionTimeoutTask) SetTaskID ¶
func (d *DecisionTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
type DeleteDomainByNameRequest ¶
type DeleteDomainByNameRequest struct {
Name string
}
DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table
type DeleteDomainRequest ¶
type DeleteDomainRequest struct {
ID string
}
DeleteDomainRequest is used to delete domain entry from domains table
type DeleteExecutionTask ¶
type DeleteExecutionTask struct {
TaskID int64
}
DeleteExecutionTask identifies a transfer task for deletion of execution
func (*DeleteExecutionTask) GetTaskID ¶
func (a *DeleteExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the delete execution task
func (*DeleteExecutionTask) GetType ¶
func (a *DeleteExecutionTask) GetType() int
GetType returns the type of the delete execution task
func (*DeleteExecutionTask) SetTaskID ¶
func (a *DeleteExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the delete execution task
type DeleteWorkflowExecutionHistoryRequest ¶
type DeleteWorkflowExecutionHistoryRequest struct { DomainID string Execution workflow.WorkflowExecution }
DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history
type DeleteWorkflowExecutionRequest ¶
type DeleteWorkflowExecutionRequest struct {
ExecutionInfo *WorkflowExecutionInfo
}
DeleteWorkflowExecutionRequest is used to delete a workflow execution
type DomainConfig ¶
DomainConfig describes the domain configuration
type DomainInfo ¶
DomainInfo describes the domain entity
type ExecutionManager ¶
type ExecutionManager interface { CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) CompleteTransferTask(request *CompleteTransferTaskRequest) error // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) CompleteTimerTask(request *CompleteTimerTaskRequest) error }
ExecutionManager is used to manage workflow executions
func NewCassandraWorkflowExecutionPersistence ¶
func NewCassandraWorkflowExecutionPersistence(hosts string, dc string, keyspace string, shardID int, logger bark.Logger) (ExecutionManager, error)
NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
func NewWorkflowExecutionPersistenceClient ¶
func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client) ExecutionManager
NewWorkflowExecutionPersistenceClient creates a client to manage executions
type ExecutionManagerFactory ¶
type ExecutionManagerFactory interface {
CreateExecutionManager(shardID int) (ExecutionManager, error)
}
ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
type GetCurrentExecutionRequest ¶
GetCurrentExecutionRequest is used to retrieve the current RunId for an execution
type GetCurrentExecutionResponse ¶
type GetCurrentExecutionResponse struct {
RunID string
}
GetCurrentExecutionResponse is the response to GetCurrentExecution
type GetDomainRequest ¶
GetDomainRequest is used to read domain
type GetDomainResponse ¶
type GetDomainResponse struct { Info *DomainInfo Config *DomainConfig }
GetDomainResponse is the response for GetDomain
type GetShardRequest ¶
type GetShardRequest struct {
ShardID int
}
GetShardRequest is used to get shard information
type GetShardResponse ¶
type GetShardResponse struct {
ShardInfo *ShardInfo
}
GetShardResponse is the response to GetShard
type GetTasksRequest ¶
type GetTasksRequest struct { DomainID string TaskList string TaskType int ReadLevel int64 MaxReadLevel int64 // inclusive BatchSize int RangeID int64 }
GetTasksRequest is used to retrieve tasks of a task list
type GetTasksResponse ¶
type GetTasksResponse struct {
Tasks []*TaskInfo
}
GetTasksResponse is the response to GetTasksRequests
type GetTimerIndexTasksRequest ¶
GetTimerIndexTasksRequest is the request for GetTimerIndexTasks TODO: replace this with an iterator that can configure min and max index.
type GetTimerIndexTasksResponse ¶
type GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
}
GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
type GetTransferTasksRequest ¶
GetTransferTasksRequest is used to read tasks from the transfer task queue
type GetTransferTasksResponse ¶
type GetTransferTasksResponse struct {
Tasks []*TransferTaskInfo
}
GetTransferTasksResponse is the response to GetTransferTasksRequest
type GetWorkflowExecutionHistoryRequest ¶
type GetWorkflowExecutionHistoryRequest struct { DomainID string Execution workflow.WorkflowExecution // Get the history events upto NextEventID. Not Inclusive. NextEventID int64 // Maximum number of history append transactions per page PageSize int // Token to continue reading next page of history append transactions. Pass in empty slice for first page NextPageToken []byte }
GetWorkflowExecutionHistoryRequest is used to retrieve history of a workflow execution
type GetWorkflowExecutionHistoryResponse ¶
type GetWorkflowExecutionHistoryResponse struct { // Slice of history append transaction batches Events []SerializedHistoryEventBatch // Token to read next page if there are more events beyond page size. // Use this to set NextPageToken on GetworkflowExecutionHistoryRequest to read the next page. NextPageToken []byte }
GetWorkflowExecutionHistoryResponse is the response to GetWorkflowExecutionHistoryRequest
type GetWorkflowExecutionRequest ¶
type GetWorkflowExecutionRequest struct { DomainID string Execution workflow.WorkflowExecution }
GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
type GetWorkflowExecutionResponse ¶
type GetWorkflowExecutionResponse struct {
State *WorkflowMutableState
}
GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest
type HistoryDeserializationError ¶
type HistoryDeserializationError struct {
// contains filtered or unexported fields
}
HistoryDeserializationError is an error type that's returned on a history deserialization failure
func (*HistoryDeserializationError) Error ¶
func (e *HistoryDeserializationError) Error() string
type HistoryEventBatch ¶
type HistoryEventBatch struct { Version int Events []*workflow.HistoryEvent }
HistoryEventBatch represents a batch of history events
func NewHistoryEventBatch ¶
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch
NewHistoryEventBatch returns a new instance of HistoryEventBatch
func (*HistoryEventBatch) String ¶
func (b *HistoryEventBatch) String() string
type HistoryManager ¶
type HistoryManager interface { AppendHistoryEvents(request *AppendHistoryEventsRequest) error // GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse, error) DeleteWorkflowExecutionHistory(request *DeleteWorkflowExecutionHistoryRequest) error }
HistoryManager is used to manage Workflow Execution HistoryEventBatch
func NewCassandraHistoryPersistence ¶
func NewCassandraHistoryPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (HistoryManager, error)
NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation
func NewHistoryPersistenceClient ¶
func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client) HistoryManager
NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history
type HistorySerializationError ¶
type HistorySerializationError struct {
// contains filtered or unexported fields
}
HistorySerializationError is an error type that's returned on a history serialization failure
func (*HistorySerializationError) Error ¶
func (e *HistorySerializationError) Error() string
type HistorySerializer ¶
type HistorySerializer interface { Serialize(batch *HistoryEventBatch) (*SerializedHistoryEventBatch, error) Deserialize(batch *SerializedHistoryEventBatch) (*HistoryEventBatch, error) }
HistorySerializer is used to serialize/deserialize history
func NewJSONHistorySerializer ¶
func NewJSONHistorySerializer() HistorySerializer
NewJSONHistorySerializer returns a JSON HistorySerializer
type HistorySerializerFactory ¶
type HistorySerializerFactory interface { // Get returns a history serializer corresponding // to a given encoding type Get(encodingType common.EncodingType) (HistorySerializer, error) }
HistorySerializerFactory is a factory that vends HistorySerializers based on encoding type.
func NewHistorySerializerFactory ¶
func NewHistorySerializerFactory() HistorySerializerFactory
NewHistorySerializerFactory creates and returns an instance of HistorySerializerFactory
type HistoryVersionCompatibilityError ¶
type HistoryVersionCompatibilityError struct {
// contains filtered or unexported fields
}
HistoryVersionCompatibilityError is an error type that's returned when history serialization or deserialization cannot proceed due to version incompatibility
func (*HistoryVersionCompatibilityError) Error ¶
func (e *HistoryVersionCompatibilityError) Error() string
type LeaseTaskListRequest ¶
LeaseTaskListRequest is used to request lease of a task list
type LeaseTaskListResponse ¶
type LeaseTaskListResponse struct {
TaskListInfo *TaskListInfo
}
LeaseTaskListResponse is response to LeaseTaskListRequest
type ListClosedWorkflowExecutionsByStatusRequest ¶
type ListClosedWorkflowExecutionsByStatusRequest struct { ListWorkflowExecutionsRequest Status s.WorkflowExecutionCloseStatus }
ListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status
type ListWorkflowExecutionsByTypeRequest ¶
type ListWorkflowExecutionsByTypeRequest struct { ListWorkflowExecutionsRequest WorkflowTypeName string }
ListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain
type ListWorkflowExecutionsByWorkflowIDRequest ¶
type ListWorkflowExecutionsByWorkflowIDRequest struct { ListWorkflowExecutionsRequest WorkflowID string }
ListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain
type ListWorkflowExecutionsRequest ¶
type ListWorkflowExecutionsRequest struct { DomainUUID string EarliestStartTime int64 LatestStartTime int64 // Maximum number of workflow executions per page PageSize int // Token to continue reading next page of workflow executions. // Pass in empty slice for first page. NextPageToken []byte }
ListWorkflowExecutionsRequest is used to list executions in a domain
type ListWorkflowExecutionsResponse ¶
type ListWorkflowExecutionsResponse struct { Executions []*s.WorkflowExecutionInfo // Token to read next page if there are more workflow executions beyond page size. // Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page. NextPageToken []byte }
ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest
type MetadataManager ¶
type MetadataManager interface { CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) GetDomain(request *GetDomainRequest) (*GetDomainResponse, error) UpdateDomain(request *UpdateDomainRequest) error DeleteDomain(request *DeleteDomainRequest) error DeleteDomainByName(request *DeleteDomainByNameRequest) error }
MetadataManager is used to manage metadata CRUD for various entities
func NewCassandraMetadataPersistence ¶
func NewCassandraMetadataPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (MetadataManager, error)
NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation
func NewMetadataPersistenceClient ¶
func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client) MetadataManager
NewMetadataPersistenceClient creates a HistoryManager client to manage workflow execution history
type RecordWorkflowExecutionClosedRequest ¶
type RecordWorkflowExecutionClosedRequest struct { DomainUUID string Execution s.WorkflowExecution WorkflowTypeName string StartTimestamp int64 CloseTimestamp int64 Status s.WorkflowExecutionCloseStatus RetentionSeconds int64 }
RecordWorkflowExecutionClosedRequest is used to add a record of a newly closed execution
type RecordWorkflowExecutionStartedRequest ¶
type RecordWorkflowExecutionStartedRequest struct { DomainUUID string Execution s.WorkflowExecution WorkflowTypeName string StartTimestamp int64 }
RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution
type SerializedHistoryEventBatch ¶
type SerializedHistoryEventBatch struct { EncodingType common.EncodingType Version int Data []byte }
SerializedHistoryEventBatch represents a serialized batch of history events
func NewSerializedHistoryEventBatch ¶
func NewSerializedHistoryEventBatch(data []byte, encoding common.EncodingType, version int) *SerializedHistoryEventBatch
NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch
func (*SerializedHistoryEventBatch) String ¶
func (h *SerializedHistoryEventBatch) String() string
type ShardAlreadyExistError ¶
type ShardAlreadyExistError struct {
Msg string
}
ShardAlreadyExistError is returned when conditionally creating a shard fails
func (*ShardAlreadyExistError) Error ¶
func (e *ShardAlreadyExistError) Error() string
type ShardInfo ¶
type ShardInfo struct { ShardID int Owner string RangeID int64 StolenSinceRenew int UpdatedAt time.Time TransferAckLevel int64 }
ShardInfo describes a shard
type ShardManager ¶
type ShardManager interface { CreateShard(request *CreateShardRequest) error GetShard(request *GetShardRequest) (*GetShardResponse, error) UpdateShard(request *UpdateShardRequest) error }
ShardManager is used to manage all shards
func NewCassandraShardPersistence ¶
func NewCassandraShardPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (ShardManager, error)
NewCassandraShardPersistence is used to create an instance of ShardManager implementation
func NewShardPersistenceClient ¶
func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client) ShardManager
NewShardPersistenceClient creates a client to manage shards
type ShardOwnershipLostError ¶
ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard
func (*ShardOwnershipLostError) Error ¶
func (e *ShardOwnershipLostError) Error() string
type StartChildExecutionTask ¶
type StartChildExecutionTask struct { TaskID int64 TargetDomainID string TargetWorkflowID string InitiatedID int64 }
StartChildExecutionTask identifies a transfer task for starting child execution
func (*StartChildExecutionTask) GetTaskID ¶
func (u *StartChildExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*StartChildExecutionTask) GetType ¶
func (u *StartChildExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*StartChildExecutionTask) SetTaskID ¶
func (u *StartChildExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type TaskInfo ¶
type TaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 ScheduleID int64 ScheduleToStartTimeout int32 }
TaskInfo describes either activity or decision task
type TaskListInfo ¶
TaskListInfo describes a state of a task list implementation.
type TaskManager ¶
type TaskManager interface { LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error) GetTasks(request *GetTasksRequest) (*GetTasksResponse, error) CompleteTask(request *CompleteTaskRequest) error }
TaskManager is used to manage tasks
func NewCassandraTaskPersistence ¶
func NewCassandraTaskPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (TaskManager, error)
NewCassandraTaskPersistence is used to create an instance of TaskManager implementation
func NewTaskPersistenceClient ¶
func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client) TaskManager
NewTaskPersistenceClient creates a client to manage tasks
type TestBase ¶
type TestBase struct { ShardMgr ShardManager ExecutionMgrFactory ExecutionManagerFactory WorkflowMgr ExecutionManager TaskMgr TaskManager HistoryMgr HistoryManager MetadataManager MetadataManager VisibilityMgr VisibilityManager ShardInfo *ShardInfo ShardContext *testShardContext CassandraTestCluster // contains filtered or unexported fields }
TestBase wraps the base setup needed to create workflows over engine layer.
func (*TestBase) ClearTransferQueue ¶
func (s *TestBase) ClearTransferQueue()
ClearTransferQueue completes all tasks in transfer queue
func (*TestBase) CompleteTask ¶
func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
CompleteTask is a utility method to complete a task
func (*TestBase) CompleteTransferTask ¶
CompleteTransferTask is a utility method to complete a transfer task
func (*TestBase) ContinueAsNewExecution ¶
func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64) error
ContinueAsNewExecution is a utility method to create workflow executions
func (*TestBase) CreateActivityTasks ¶
func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, activities map[int64]string) ([]int64, error)
CreateActivityTasks is a utility method to create tasks
func (*TestBase) CreateChildWorkflowExecution ¶
func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateChildWorkflowExecution is a utility method to create child workflow executions
func (*TestBase) CreateDecisionTask ¶
func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, decisionScheduleID int64) (int64, error)
CreateDecisionTask is a utility method to create a task
func (*TestBase) CreateShard ¶
CreateShard is a utility method to create the shard using persistence layer
func (*TestBase) CreateWorkflowExecution ¶
func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList, wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateWorkflowExecution is a utility method to create workflow executions
func (*TestBase) CreateWorkflowExecutionManyTasks ¶
func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error)
CreateWorkflowExecutionManyTasks is a utility method to create workflow executions
func (*TestBase) DeleteChildExecutionsState ¶
func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
DeleteChildExecutionsState is a utility method to delete child execution from mutable state
func (*TestBase) DeleteWorkflowExecution ¶
func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
DeleteWorkflowExecution is a utility method to delete a workflow execution
func (*TestBase) GetCurrentWorkflow ¶
GetCurrentWorkflow returns the workflow state for the given params
func (*TestBase) GetNextSequenceNumber ¶
GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
func (*TestBase) GetReadLevel ¶
GetReadLevel returns the current read level for shard
func (*TestBase) GetTasks ¶
func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
GetTasks is a utility method to get tasks from persistence
func (*TestBase) GetTimerIndexTasks ¶
func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskInfo, error)
GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetTransferTasks ¶
func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
GetTransferTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetWorkflowExecutionInfo ¶
func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) ( *WorkflowMutableState, error)
GetWorkflowExecutionInfo is a utility method to retrieve execution info
func (*TestBase) SetupWorkflowStore ¶
func (s *TestBase) SetupWorkflowStore()
SetupWorkflowStore to setup workflow test base
func (*TestBase) SetupWorkflowStoreWithOptions ¶
func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
SetupWorkflowStoreWithOptions to setup workflow test base
func (*TestBase) TearDownWorkflowStore ¶
func (s *TestBase) TearDownWorkflowStore()
TearDownWorkflowStore to cleanup
func (*TestBase) UpdateShard ¶
UpdateShard is a utility method to update the shard using persistence layer
func (*TestBase) UpdateWorkflowExecution ¶
func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error
UpdateWorkflowExecution is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionAndDelete ¶
func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
UpdateWorkflowExecutionAndDelete is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithRangeID ¶
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64) error
UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithTransferTasks ¶
func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertActivityInfo []*ActivityInfo) error
UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution
func (*TestBase) UpsertChildExecutionsState ¶
func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, upsertChildInfos []*ChildExecutionInfo) error
UpsertChildExecutionsState is a utility method to update mutable state of workflow execution
type TestBaseOptions ¶
type TestBaseOptions struct { ClusterHost string KeySpace string Datacenter string DropKeySpace bool SchemaDir string }
TestBaseOptions options to configure workflow test base.
type TimeoutError ¶
type TimeoutError struct {
Msg string
}
TimeoutError is returned when a write operation fails due to a timeout
func (*TimeoutError) Error ¶
func (e *TimeoutError) Error() string
type TimerTaskInfo ¶
type TimerTaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 TaskType int TimeoutType int EventID int64 }
TimerTaskInfo describes a timer task.
type TransferTaskInfo ¶
type TransferTaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 TargetDomainID string TargetWorkflowID string TargetRunID string TaskList string TaskType int ScheduleID int64 }
TransferTaskInfo describes a transfer task
type UnknownEncodingTypeError ¶
type UnknownEncodingTypeError struct {
// contains filtered or unexported fields
}
UnknownEncodingTypeError is an error type that's returned when the encoding type provided as input is unknown or unsupported
func (*UnknownEncodingTypeError) Error ¶
func (e *UnknownEncodingTypeError) Error() string
type UpdateDomainRequest ¶
type UpdateDomainRequest struct { Info *DomainInfo Config *DomainConfig }
UpdateDomainRequest is used to update domain
type UpdateShardRequest ¶
UpdateShardRequest is used to update shard information
type UpdateTaskListRequest ¶
type UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
}
UpdateTaskListRequest is used to update task list implementation information
type UpdateTaskListResponse ¶
type UpdateTaskListResponse struct { }
UpdateTaskListResponse is the response to UpdateTaskList
type UpdateWorkflowExecutionRequest ¶
type UpdateWorkflowExecutionRequest struct { ExecutionInfo *WorkflowExecutionInfo TransferTasks []Task TimerTasks []Task DeleteTimerTask Task Condition int64 RangeID int64 ContinueAsNew *CreateWorkflowExecutionRequest CloseExecution bool // Mutable state UpsertActivityInfos []*ActivityInfo DeleteActivityInfo *int64 UpserTimerInfos []*TimerInfo DeleteTimerInfos []string UpsertChildExecutionInfos []*ChildExecutionInfo DeleteChildExecutionInfo *int64 }
UpdateWorkflowExecutionRequest is used to update a workflow execution
type UserTimerTask ¶
UserTimerTask identifies a timeout task.
func (*UserTimerTask) GetTaskID ¶
func (u *UserTimerTask) GetTaskID() int64
GetTaskID returns the sequence ID of the timer task.
func (*UserTimerTask) GetType ¶
func (u *UserTimerTask) GetType() int
GetType returns the type of the timer task
func (*UserTimerTask) SetTaskID ¶
func (u *UserTimerTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the timer task.
type VisibilityManager ¶
type VisibilityManager interface { RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) }
VisibilityManager is used to manage the visibility store
func NewCassandraVisibilityPersistence ¶
func NewCassandraVisibilityPersistence( hosts string, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error)
NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation
type WorkflowExecutionInfo ¶
type WorkflowExecutionInfo struct { DomainID string WorkflowID string RunID string ParentDomainID string ParentWorkflowID string ParentRunID string InitiatedID int64 CompletionEvent []byte TaskList string WorkflowTypeName string DecisionTimeoutValue int32 ExecutionContext []byte State int CloseStatus int NextEventID int64 LastProcessedEvent int64 StartTimestamp time.Time LastUpdatedTimestamp time.Time CreateRequestID string DecisionScheduleID int64 DecisionStartedID int64 DecisionRequestID string DecisionTimeout int32 }
WorkflowExecutionInfo describes a workflow execution
type WorkflowMutableState ¶
type WorkflowMutableState struct { ActivitInfos map[int64]*ActivityInfo TimerInfos map[string]*TimerInfo ChildExecutionInfos map[int64]*ChildExecutionInfo ExecutionInfo *WorkflowExecutionInfo }
WorkflowMutableState indicates workflow related state