Documentation ¶
Index ¶
- Constants
- func GetDefaultHistoryVersion() int
- func GetMaxSupportedHistoryVersion() int
- func GetVisibilityTSFrom(task Task) time.Time
- func NewHistoryVersionCompatibilityError(required int, supported int) error
- func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
- func SetDefaultHistoryVersion(version int)
- func SetMaxSupportedHistoryVersion(version int)
- func SetVisibilityTSFrom(task Task, t time.Time)
- type ActivityInfo
- type ActivityTask
- type ActivityTimeoutTask
- type AppendHistoryEventsRequest
- type CancelExecutionTask
- type CassandraTestCluster
- type ChildExecutionInfo
- type CloseExecutionTask
- type Closeable
- type CompleteTaskRequest
- type CompleteTimerTaskRequest
- type CompleteTransferTaskRequest
- type ConditionFailedError
- type CreateDomainRequest
- type CreateDomainResponse
- type CreateShardRequest
- type CreateTaskInfo
- type CreateTasksRequest
- type CreateTasksResponse
- type CreateWorkflowExecutionRequest
- type CreateWorkflowExecutionResponse
- type DecisionTask
- type DecisionTimeoutTask
- type DeleteDomainByNameRequest
- type DeleteDomainRequest
- type DeleteHistoryEventTask
- type DeleteWorkflowExecutionHistoryRequest
- type DeleteWorkflowExecutionRequest
- type DomainConfig
- type DomainInfo
- type ExecutionManager
- type ExecutionManagerFactory
- type GetClosedWorkflowExecutionRequest
- type GetClosedWorkflowExecutionResponse
- type GetCurrentExecutionRequest
- type GetCurrentExecutionResponse
- type GetDomainRequest
- type GetDomainResponse
- type GetShardRequest
- type GetShardResponse
- type GetTasksRequest
- type GetTasksResponse
- type GetTimerIndexTasksRequest
- type GetTimerIndexTasksResponse
- type GetTransferTasksRequest
- type GetTransferTasksResponse
- type GetWorkflowExecutionHistoryRequest
- type GetWorkflowExecutionHistoryResponse
- type GetWorkflowExecutionRequest
- type GetWorkflowExecutionResponse
- type HistoryDeserializationError
- type HistoryEventBatch
- type HistoryManager
- type HistorySerializationError
- type HistorySerializer
- type HistorySerializerFactory
- type HistoryVersionCompatibilityError
- type LeaseTaskListRequest
- type LeaseTaskListResponse
- type ListClosedWorkflowExecutionsByStatusRequest
- type ListWorkflowExecutionsByTypeRequest
- type ListWorkflowExecutionsByWorkflowIDRequest
- type ListWorkflowExecutionsRequest
- type ListWorkflowExecutionsResponse
- type MetadataManager
- type RecordWorkflowExecutionClosedRequest
- type RecordWorkflowExecutionStartedRequest
- type RequestCancelInfo
- type SerializedHistoryEventBatch
- type ShardAlreadyExistError
- type ShardInfo
- type ShardManager
- type ShardOwnershipLostError
- type StartChildExecutionTask
- type Task
- type TaskInfo
- type TaskListInfo
- type TaskManager
- type TestBase
- func (s *TestBase) ClearTransferQueue()
- func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
- func (s *TestBase) CompleteTimerTask(ts time.Time, taskID int64) error
- func (s *TestBase) CompleteTransferTask(taskID int64) error
- func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, ...) ([]int64, error)
- func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (int64, error)
- func (s *TestBase) CreateShard(shardID int, owner string, rangeID int64) error
- func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, ...) (string, error)
- func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, ...) (string, error)
- func (s *TestBase) DeleteCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteCancelInfo int64) error
- func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
- func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
- func (s *TestBase) GetCurrentWorkflow(domainID, workflowID string) (string, error)
- func (s *TestBase) GetNextSequenceNumber() int64
- func (s *TestBase) GetReadLevel() int64
- func (s *TestBase) GetShard(shardID int) (*ShardInfo, error)
- func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
- func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error)
- func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
- func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) (*WorkflowMutableState, error)
- func (s *TestBase) SetupWorkflowStore()
- func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
- func (s *TestBase) TearDownWorkflowStore()
- func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) error
- func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
- func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, ...) error
- func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, ...) error
- func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- func (s *TestBase) UpsertRequestCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, ...) error
- type TestBaseOptions
- type TimeoutError
- type TimerInfo
- type TimerTaskInfo
- type TransferTaskIDGenerator
- type TransferTaskInfo
- type UnknownEncodingTypeError
- type UpdateDomainRequest
- type UpdateShardRequest
- type UpdateTaskListRequest
- type UpdateTaskListResponse
- type UpdateWorkflowExecutionRequest
- type UserTimerTask
- type VisibilityManager
- type WorkflowExecutionInfo
- type WorkflowMutableState
- type WorkflowTimeoutTask
Constants ¶
const ( DomainStatusRegistered = iota DomainStatusDeprecated DomainStatusDeleted )
Domain status
const ( WorkflowStateCreated = iota WorkflowStateRunning WorkflowStateCompleted )
Workflow execution states
const ( WorkflowCloseStatusNone = iota WorkflowCloseStatusCompleted WorkflowCloseStatusFailed WorkflowCloseStatusCanceled WorkflowCloseStatusTerminated WorkflowCloseStatusContinuedAsNew WorkflowCloseStatusTimedOut )
Workflow execution close status
const ( TaskListTypeDecision = iota TaskListTypeActivity )
Types of task lists
const ( TransferTaskTypeDecisionTask = iota TransferTaskTypeActivityTask TransferTaskTypeCloseExecution TransferTaskTypeCancelExecution TransferTaskTypeStartChildExecution )
Transfer task types
const ( TaskTypeDecisionTimeout = iota TaskTypeActivityTimeout TaskTypeUserTimer TaskTypeWorkflowTimeout TaskTypeDeleteHistoryEvent )
Types of timers
const ( // DefaultEncodingType is the default encoding format for persisted history DefaultEncodingType = common.EncodingTypeJSON )
Variables ¶
This section is empty.
Functions ¶
func GetDefaultHistoryVersion ¶
func GetDefaultHistoryVersion() int
GetDefaultHistoryVersion returns the default history version
func GetMaxSupportedHistoryVersion ¶
func GetMaxSupportedHistoryVersion() int
GetMaxSupportedHistoryVersion returns the max supported version
func GetVisibilityTSFrom ¶
GetVisibilityTSFrom - helper method to get visibility timestamp
func NewHistoryVersionCompatibilityError ¶
NewHistoryVersionCompatibilityError returns a new instance of compatibility error type
func NewUnknownEncodingTypeError ¶
func NewUnknownEncodingTypeError(encodingType common.EncodingType) error
NewUnknownEncodingTypeError returns a new instance of encoding type error
func SetDefaultHistoryVersion ¶
func SetDefaultHistoryVersion(version int)
SetDefaultHistoryVersion resets the default history version only intended for integration test
func SetMaxSupportedHistoryVersion ¶
func SetMaxSupportedHistoryVersion(version int)
SetMaxSupportedHistoryVersion resets the max supported history version this method is only intended for integration test
func SetVisibilityTSFrom ¶
SetVisibilityTSFrom - helper method to set visibility timestamp
Types ¶
type ActivityInfo ¶
type ActivityInfo struct { ScheduleID int64 ScheduledEvent []byte ScheduledTime time.Time StartedID int64 StartedEvent []byte StartedTime time.Time ActivityID string RequestID string Details []byte ScheduleToStartTimeout int32 ScheduleToCloseTimeout int32 StartToCloseTimeout int32 HeartbeatTimeout int32 CancelRequested bool CancelRequestID int64 LastHeartBeatUpdatedTime time.Time TimerTaskStatus int32 }
ActivityInfo details.
type ActivityTask ¶
ActivityTask identifies a transfer task for activity
func (*ActivityTask) GetTaskID ¶
func (a *ActivityTask) GetTaskID() int64
GetTaskID returns the sequence ID of the activity task
func (*ActivityTask) GetType ¶
func (a *ActivityTask) GetType() int
GetType returns the type of the activity task
func (*ActivityTask) SetTaskID ¶
func (a *ActivityTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the activity task
type ActivityTimeoutTask ¶
type ActivityTimeoutTask struct { VisibilityTimestamp time.Time TaskID int64 TimeoutType int EventID int64 }
ActivityTimeoutTask identifies a timeout task.
func (*ActivityTimeoutTask) GetTaskID ¶
func (a *ActivityTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*ActivityTimeoutTask) GetType ¶
func (a *ActivityTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*ActivityTimeoutTask) GetVisibilityTimestamp ¶
func (a *ActivityTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*ActivityTimeoutTask) SetTaskID ¶
func (a *ActivityTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
func (*ActivityTimeoutTask) SetVisibilityTimestamp ¶
func (a *ActivityTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type AppendHistoryEventsRequest ¶
type AppendHistoryEventsRequest struct { DomainID string Execution workflow.WorkflowExecution FirstEventID int64 RangeID int64 TransactionID int64 Events *SerializedHistoryEventBatch Overwrite bool }
AppendHistoryEventsRequest is used to append new events to workflow execution history
type CancelExecutionTask ¶
type CancelExecutionTask struct { TaskID int64 TargetDomainID string TargetWorkflowID string TargetRunID string ScheduleID int64 }
CancelExecutionTask identifies a transfer task for cancel of execution
func (*CancelExecutionTask) GetTaskID ¶
func (u *CancelExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*CancelExecutionTask) GetType ¶
func (u *CancelExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*CancelExecutionTask) SetTaskID ¶
func (u *CancelExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type CassandraTestCluster ¶
type CassandraTestCluster struct {
// contains filtered or unexported fields
}
CassandraTestCluster allows executing cassandra operations in testing.
type ChildExecutionInfo ¶
type ChildExecutionInfo struct { InitiatedID int64 InitiatedEvent []byte StartedID int64 StartedEvent []byte CreateRequestID string }
ChildExecutionInfo has details for pending child executions.
type CloseExecutionTask ¶ added in v0.3.3
type CloseExecutionTask struct {
TaskID int64
}
CloseExecutionTask identifies a transfer task for deletion of execution
func (*CloseExecutionTask) GetTaskID ¶ added in v0.3.3
func (a *CloseExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the close execution task
func (*CloseExecutionTask) GetType ¶ added in v0.3.3
func (a *CloseExecutionTask) GetType() int
GetType returns the type of the close execution task
func (*CloseExecutionTask) SetTaskID ¶ added in v0.3.3
func (a *CloseExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the close execution task
type Closeable ¶
type Closeable interface {
Close()
}
Closeable is an interface for any entity that supports a close operation to release resources
type CompleteTaskRequest ¶
type CompleteTaskRequest struct { TaskList *TaskListInfo TaskID int64 }
CompleteTaskRequest is used to complete a task
type CompleteTimerTaskRequest ¶
CompleteTimerTaskRequest is used to complete a task in the timer task queue
type CompleteTransferTaskRequest ¶
type CompleteTransferTaskRequest struct {
TaskID int64
}
CompleteTransferTaskRequest is used to complete a task in the transfer task queue
type ConditionFailedError ¶
type ConditionFailedError struct {
Msg string
}
ConditionFailedError represents a failed conditional put
func (*ConditionFailedError) Error ¶
func (e *ConditionFailedError) Error() string
type CreateDomainRequest ¶
type CreateDomainRequest struct { Name string Status int Description string OwnerEmail string Retention int32 EmitMetric bool }
CreateDomainRequest is used to create the domain
type CreateDomainResponse ¶
type CreateDomainResponse struct {
ID string
}
CreateDomainResponse is the response for CreateDomain
type CreateShardRequest ¶
type CreateShardRequest struct {
ShardInfo *ShardInfo
}
CreateShardRequest is used to create a shard in executions table
type CreateTaskInfo ¶
type CreateTaskInfo struct { Execution workflow.WorkflowExecution Data *TaskInfo TaskID int64 }
CreateTaskInfo describes a task to be created in CreateTasksRequest
type CreateTasksRequest ¶
type CreateTasksRequest struct { TaskListInfo *TaskListInfo Tasks []*CreateTaskInfo }
CreateTasksRequest is used to create a new task for a workflow exectution
type CreateTasksResponse ¶
type CreateTasksResponse struct { }
CreateTasksResponse is the response to CreateTasksRequest
type CreateWorkflowExecutionRequest ¶
type CreateWorkflowExecutionRequest struct { RequestID string DomainID string Execution workflow.WorkflowExecution ParentDomainID string ParentExecution *workflow.WorkflowExecution InitiatedID int64 TaskList string WorkflowTypeName string WorkflowTimeout int32 DecisionTimeoutValue int32 ExecutionContext []byte NextEventID int64 LastProcessedEvent int64 TransferTasks []Task TimerTasks []Task RangeID int64 DecisionScheduleID int64 DecisionStartedID int64 DecisionStartToCloseTimeout int32 ContinueAsNew bool }
CreateWorkflowExecutionRequest is used to write a new workflow execution
type CreateWorkflowExecutionResponse ¶
type CreateWorkflowExecutionResponse struct {
TaskID string
}
CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
type DecisionTask ¶
DecisionTask identifies a transfer task for decision
func (*DecisionTask) GetTaskID ¶
func (d *DecisionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the decision task.
func (*DecisionTask) GetType ¶
func (d *DecisionTask) GetType() int
GetType returns the type of the decision task
func (*DecisionTask) SetTaskID ¶
func (d *DecisionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the decision task
type DecisionTimeoutTask ¶
type DecisionTimeoutTask struct { VisibilityTimestamp time.Time TaskID int64 EventID int64 ScheduleAttempt int64 TimeoutType int }
DecisionTimeoutTask identifies a timeout task.
func (*DecisionTimeoutTask) GetTaskID ¶
func (d *DecisionTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID.
func (*DecisionTimeoutTask) GetType ¶
func (d *DecisionTimeoutTask) GetType() int
GetType returns the type of the timer task
func (*DecisionTimeoutTask) GetVisibilityTimestamp ¶
func (d *DecisionTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*DecisionTimeoutTask) SetTaskID ¶
func (d *DecisionTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID.
func (*DecisionTimeoutTask) SetVisibilityTimestamp ¶
func (d *DecisionTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type DeleteDomainByNameRequest ¶
type DeleteDomainByNameRequest struct {
Name string
}
DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table
type DeleteDomainRequest ¶
type DeleteDomainRequest struct {
ID string
}
DeleteDomainRequest is used to delete domain entry from domains table
type DeleteHistoryEventTask ¶
DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.
func (*DeleteHistoryEventTask) GetTaskID ¶
func (a *DeleteHistoryEventTask) GetTaskID() int64
GetTaskID returns the sequence ID of the delete execution task
func (*DeleteHistoryEventTask) GetType ¶
func (a *DeleteHistoryEventTask) GetType() int
GetType returns the type of the delete execution task
func (*DeleteHistoryEventTask) SetTaskID ¶
func (a *DeleteHistoryEventTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the delete execution task
type DeleteWorkflowExecutionHistoryRequest ¶
type DeleteWorkflowExecutionHistoryRequest struct { DomainID string Execution workflow.WorkflowExecution }
DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history
type DeleteWorkflowExecutionRequest ¶
DeleteWorkflowExecutionRequest is used to delete a workflow execution
type DomainConfig ¶
DomainConfig describes the domain configuration
type DomainInfo ¶
DomainInfo describes the domain entity
type ExecutionManager ¶
type ExecutionManager interface { Closeable CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error) GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) CompleteTransferTask(request *CompleteTransferTaskRequest) error // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) CompleteTimerTask(request *CompleteTimerTaskRequest) error }
ExecutionManager is used to manage workflow executions
func NewCassandraWorkflowExecutionPersistence ¶
func NewCassandraWorkflowExecutionPersistence(shardID int, session *gocql.Session, logger bark.Logger) (ExecutionManager, error)
NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
func NewWorkflowExecutionPersistenceClient ¶
func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client) ExecutionManager
NewWorkflowExecutionPersistenceClient creates a client to manage executions
type ExecutionManagerFactory ¶
type ExecutionManagerFactory interface { Closeable CreateExecutionManager(shardID int) (ExecutionManager, error) }
ExecutionManagerFactory creates an instance of ExecutionManager for a given shard
func NewCassandraPersistenceClientFactory ¶ added in v0.3.2
func NewCassandraPersistenceClientFactory(hosts string, port int, user, password, dc string, keyspace string, numConns int, logger bark.Logger, mClient metrics.Client) (ExecutionManagerFactory, error)
NewCassandraPersistenceClientFactory is used to create an instance of ExecutionManagerFactory implementation
type GetClosedWorkflowExecutionRequest ¶
type GetClosedWorkflowExecutionRequest struct { DomainUUID string Execution s.WorkflowExecution }
GetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
type GetClosedWorkflowExecutionResponse ¶
type GetClosedWorkflowExecutionResponse struct {
Execution *s.WorkflowExecutionInfo
}
GetClosedWorkflowExecutionResponse is the response to GetClosedWorkflowExecutionRequest
type GetCurrentExecutionRequest ¶
GetCurrentExecutionRequest is used to retrieve the current RunId for an execution
type GetCurrentExecutionResponse ¶
type GetCurrentExecutionResponse struct {
RunID string
}
GetCurrentExecutionResponse is the response to GetCurrentExecution
type GetDomainRequest ¶
GetDomainRequest is used to read domain
type GetDomainResponse ¶
type GetDomainResponse struct { Info *DomainInfo Config *DomainConfig }
GetDomainResponse is the response for GetDomain
type GetShardRequest ¶
type GetShardRequest struct {
ShardID int
}
GetShardRequest is used to get shard information
type GetShardResponse ¶
type GetShardResponse struct {
ShardInfo *ShardInfo
}
GetShardResponse is the response to GetShard
type GetTasksRequest ¶
type GetTasksRequest struct { DomainID string TaskList string TaskType int ReadLevel int64 MaxReadLevel int64 // inclusive BatchSize int RangeID int64 }
GetTasksRequest is used to retrieve tasks of a task list
type GetTasksResponse ¶
type GetTasksResponse struct {
Tasks []*TaskInfo
}
GetTasksResponse is the response to GetTasksRequests
type GetTimerIndexTasksRequest ¶
type GetTimerIndexTasksRequest struct { MinTimestamp time.Time MaxTimestamp time.Time BatchSize int }
GetTimerIndexTasksRequest is the request for GetTimerIndexTasks TODO: replace this with an iterator that can configure min and max index.
type GetTimerIndexTasksResponse ¶
type GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
}
GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
type GetTransferTasksRequest ¶
GetTransferTasksRequest is used to read tasks from the transfer task queue
type GetTransferTasksResponse ¶
type GetTransferTasksResponse struct {
Tasks []*TransferTaskInfo
}
GetTransferTasksResponse is the response to GetTransferTasksRequest
type GetWorkflowExecutionHistoryRequest ¶
type GetWorkflowExecutionHistoryRequest struct { DomainID string Execution workflow.WorkflowExecution // Get the history events from FirstEventID. Inclusive. FirstEventID int64 // Get the history events upto NextEventID. Not Inclusive. NextEventID int64 // Maximum number of history append transactions per page PageSize int // Token to continue reading next page of history append transactions. Pass in empty slice for first page NextPageToken []byte }
GetWorkflowExecutionHistoryRequest is used to retrieve history of a workflow execution
type GetWorkflowExecutionHistoryResponse ¶
type GetWorkflowExecutionHistoryResponse struct { // Slice of history append transaction batches Events []SerializedHistoryEventBatch // Token to read next page if there are more events beyond page size. // Use this to set NextPageToken on GetworkflowExecutionHistoryRequest to read the next page. NextPageToken []byte }
GetWorkflowExecutionHistoryResponse is the response to GetWorkflowExecutionHistoryRequest
type GetWorkflowExecutionRequest ¶
type GetWorkflowExecutionRequest struct { DomainID string Execution workflow.WorkflowExecution }
GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
type GetWorkflowExecutionResponse ¶
type GetWorkflowExecutionResponse struct {
State *WorkflowMutableState
}
GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest
type HistoryDeserializationError ¶
type HistoryDeserializationError struct {
// contains filtered or unexported fields
}
HistoryDeserializationError is an error type that's returned on a history deserialization failure
func (*HistoryDeserializationError) Error ¶
func (e *HistoryDeserializationError) Error() string
type HistoryEventBatch ¶
type HistoryEventBatch struct { Version int Events []*workflow.HistoryEvent }
HistoryEventBatch represents a batch of history events
func NewHistoryEventBatch ¶
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch
NewHistoryEventBatch returns a new instance of HistoryEventBatch
func (*HistoryEventBatch) String ¶
func (b *HistoryEventBatch) String() string
type HistoryManager ¶
type HistoryManager interface { Closeable AppendHistoryEvents(request *AppendHistoryEventsRequest) error // GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse, error) DeleteWorkflowExecutionHistory(request *DeleteWorkflowExecutionHistoryRequest) error }
HistoryManager is used to manage Workflow Execution HistoryEventBatch
func NewCassandraHistoryPersistence ¶
func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string, numConns int, logger bark.Logger) (HistoryManager, error)
NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation
func NewHistoryPersistenceClient ¶
func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client) HistoryManager
NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history
type HistorySerializationError ¶
type HistorySerializationError struct {
// contains filtered or unexported fields
}
HistorySerializationError is an error type that's returned on a history serialization failure
func (*HistorySerializationError) Error ¶
func (e *HistorySerializationError) Error() string
type HistorySerializer ¶
type HistorySerializer interface { Serialize(batch *HistoryEventBatch) (*SerializedHistoryEventBatch, error) Deserialize(batch *SerializedHistoryEventBatch) (*HistoryEventBatch, error) }
HistorySerializer is used to serialize/deserialize history
func NewJSONHistorySerializer ¶
func NewJSONHistorySerializer() HistorySerializer
NewJSONHistorySerializer returns a JSON HistorySerializer
type HistorySerializerFactory ¶
type HistorySerializerFactory interface { // Get returns a history serializer corresponding // to a given encoding type Get(encodingType common.EncodingType) (HistorySerializer, error) }
HistorySerializerFactory is a factory that vends HistorySerializers based on encoding type.
func NewHistorySerializerFactory ¶
func NewHistorySerializerFactory() HistorySerializerFactory
NewHistorySerializerFactory creates and returns an instance of HistorySerializerFactory
type HistoryVersionCompatibilityError ¶
type HistoryVersionCompatibilityError struct {
// contains filtered or unexported fields
}
HistoryVersionCompatibilityError is an error type that's returned when history serialization or deserialization cannot proceed due to version incompatibility
func (*HistoryVersionCompatibilityError) Error ¶
func (e *HistoryVersionCompatibilityError) Error() string
type LeaseTaskListRequest ¶
LeaseTaskListRequest is used to request lease of a task list
type LeaseTaskListResponse ¶
type LeaseTaskListResponse struct {
TaskListInfo *TaskListInfo
}
LeaseTaskListResponse is response to LeaseTaskListRequest
type ListClosedWorkflowExecutionsByStatusRequest ¶
type ListClosedWorkflowExecutionsByStatusRequest struct { ListWorkflowExecutionsRequest Status s.WorkflowExecutionCloseStatus }
ListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status
type ListWorkflowExecutionsByTypeRequest ¶
type ListWorkflowExecutionsByTypeRequest struct { ListWorkflowExecutionsRequest WorkflowTypeName string }
ListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain
type ListWorkflowExecutionsByWorkflowIDRequest ¶
type ListWorkflowExecutionsByWorkflowIDRequest struct { ListWorkflowExecutionsRequest WorkflowID string }
ListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain
type ListWorkflowExecutionsRequest ¶
type ListWorkflowExecutionsRequest struct { DomainUUID string EarliestStartTime int64 LatestStartTime int64 // Maximum number of workflow executions per page PageSize int // Token to continue reading next page of workflow executions. // Pass in empty slice for first page. NextPageToken []byte }
ListWorkflowExecutionsRequest is used to list executions in a domain
type ListWorkflowExecutionsResponse ¶
type ListWorkflowExecutionsResponse struct { Executions []*s.WorkflowExecutionInfo // Token to read next page if there are more workflow executions beyond page size. // Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page. NextPageToken []byte }
ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest
type MetadataManager ¶
type MetadataManager interface { Closeable CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) GetDomain(request *GetDomainRequest) (*GetDomainResponse, error) UpdateDomain(request *UpdateDomainRequest) error DeleteDomain(request *DeleteDomainRequest) error DeleteDomainByName(request *DeleteDomainByNameRequest) error }
MetadataManager is used to manage metadata CRUD for various entities
func NewCassandraMetadataPersistence ¶
func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (MetadataManager, error)
NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation
func NewMetadataPersistenceClient ¶
func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client) MetadataManager
NewMetadataPersistenceClient creates a HistoryManager client to manage workflow execution history
type RecordWorkflowExecutionClosedRequest ¶
type RecordWorkflowExecutionClosedRequest struct { DomainUUID string Execution s.WorkflowExecution WorkflowTypeName string StartTimestamp int64 CloseTimestamp int64 Status s.WorkflowExecutionCloseStatus HistoryLength int64 RetentionSeconds int64 }
RecordWorkflowExecutionClosedRequest is used to add a record of a newly closed execution
type RecordWorkflowExecutionStartedRequest ¶
type RecordWorkflowExecutionStartedRequest struct { DomainUUID string Execution s.WorkflowExecution WorkflowTypeName string StartTimestamp int64 WorkflowTimeout int64 }
RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution
type RequestCancelInfo ¶
RequestCancelInfo has details for pending external workflow cancellations
type SerializedHistoryEventBatch ¶
type SerializedHistoryEventBatch struct { EncodingType common.EncodingType Version int Data []byte }
SerializedHistoryEventBatch represents a serialized batch of history events
func NewSerializedHistoryEventBatch ¶
func NewSerializedHistoryEventBatch(data []byte, encoding common.EncodingType, version int) *SerializedHistoryEventBatch
NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch
func (*SerializedHistoryEventBatch) String ¶
func (h *SerializedHistoryEventBatch) String() string
type ShardAlreadyExistError ¶
type ShardAlreadyExistError struct {
Msg string
}
ShardAlreadyExistError is returned when conditionally creating a shard fails
func (*ShardAlreadyExistError) Error ¶
func (e *ShardAlreadyExistError) Error() string
type ShardInfo ¶
type ShardInfo struct { ShardID int Owner string RangeID int64 StolenSinceRenew int UpdatedAt time.Time TransferAckLevel int64 TimerAckLevel time.Time }
ShardInfo describes a shard
type ShardManager ¶
type ShardManager interface { Closeable CreateShard(request *CreateShardRequest) error GetShard(request *GetShardRequest) (*GetShardResponse, error) UpdateShard(request *UpdateShardRequest) error }
ShardManager is used to manage all shards
func NewCassandraShardPersistence ¶
func NewCassandraShardPersistence(hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (ShardManager, error)
NewCassandraShardPersistence is used to create an instance of ShardManager implementation
func NewShardPersistenceClient ¶
func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client) ShardManager
NewShardPersistenceClient creates a client to manage shards
type ShardOwnershipLostError ¶
ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard
func (*ShardOwnershipLostError) Error ¶
func (e *ShardOwnershipLostError) Error() string
type StartChildExecutionTask ¶
type StartChildExecutionTask struct { TaskID int64 TargetDomainID string TargetWorkflowID string InitiatedID int64 }
StartChildExecutionTask identifies a transfer task for starting child execution
func (*StartChildExecutionTask) GetTaskID ¶
func (u *StartChildExecutionTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*StartChildExecutionTask) GetType ¶
func (u *StartChildExecutionTask) GetType() int
GetType returns the type of the cancel transfer task
func (*StartChildExecutionTask) SetTaskID ¶
func (u *StartChildExecutionTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
type TaskInfo ¶
type TaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 ScheduleID int64 ScheduleToStartTimeout int32 }
TaskInfo describes either activity or decision task
type TaskListInfo ¶
TaskListInfo describes a state of a task list implementation.
type TaskManager ¶
type TaskManager interface { Closeable LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error) UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error) GetTasks(request *GetTasksRequest) (*GetTasksResponse, error) CompleteTask(request *CompleteTaskRequest) error }
TaskManager is used to manage tasks
func NewCassandraTaskPersistence ¶
func NewCassandraTaskPersistence(hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (TaskManager, error)
NewCassandraTaskPersistence is used to create an instance of TaskManager implementation
func NewTaskPersistenceClient ¶
func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client) TaskManager
NewTaskPersistenceClient creates a client to manage tasks
type TestBase ¶
type TestBase struct { ShardMgr ShardManager ExecutionMgrFactory ExecutionManagerFactory WorkflowMgr ExecutionManager TaskMgr TaskManager HistoryMgr HistoryManager MetadataManager MetadataManager VisibilityMgr VisibilityManager ShardInfo *ShardInfo TaskIDGenerator TransferTaskIDGenerator CassandraTestCluster // contains filtered or unexported fields }
TestBase wraps the base setup needed to create workflows over persistence layer.
func (*TestBase) ClearTransferQueue ¶
func (s *TestBase) ClearTransferQueue()
ClearTransferQueue completes all tasks in transfer queue
func (*TestBase) CompleteTask ¶
func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error
CompleteTask is a utility method to complete a task
func (*TestBase) CompleteTimerTask ¶
CompleteTimerTask is a utility method to complete a timer task
func (*TestBase) CompleteTransferTask ¶
CompleteTransferTask is a utility method to complete a transfer task
func (*TestBase) ContinueAsNewExecution ¶
func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64, newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64) error
ContinueAsNewExecution is a utility method to create workflow executions
func (*TestBase) CreateActivityTasks ¶
func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution, activities map[int64]string) ([]int64, error)
CreateActivityTasks is a utility method to create tasks
func (*TestBase) CreateChildWorkflowExecution ¶
func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateChildWorkflowExecution is a utility method to create child workflow executions
func (*TestBase) CreateDecisionTask ¶
func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, decisionScheduleID int64) (int64, error)
CreateDecisionTask is a utility method to create a task
func (*TestBase) CreateShard ¶
CreateShard is a utility method to create the shard using persistence layer
func (*TestBase) CreateWorkflowExecution ¶
func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleID int64, timerTasks []Task) (string, error)
CreateWorkflowExecution is a utility method to create workflow executions
func (*TestBase) CreateWorkflowExecutionManyTasks ¶
func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error)
CreateWorkflowExecutionManyTasks is a utility method to create workflow executions
func (*TestBase) DeleteCancelState ¶
func (s *TestBase) DeleteCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteCancelInfo int64) error
DeleteCancelState is a utility method to delete request cancel state from mutable state
func (*TestBase) DeleteChildExecutionsState ¶
func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, deleteChildInfo int64) error
DeleteChildExecutionsState is a utility method to delete child execution from mutable state
func (*TestBase) DeleteWorkflowExecution ¶
func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error
DeleteWorkflowExecution is a utility method to delete a workflow execution
func (*TestBase) GetCurrentWorkflow ¶
GetCurrentWorkflow returns the workflow state for the given params
func (*TestBase) GetNextSequenceNumber ¶
GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
func (*TestBase) GetReadLevel ¶
GetReadLevel returns the current read level for shard
func (*TestBase) GetTasks ¶
func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)
GetTasks is a utility method to get tasks from persistence
func (*TestBase) GetTimerIndexTasks ¶
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error)
GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetTransferTasks ¶
func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
GetTransferTasks is a utility method to get tasks from transfer task queue
func (*TestBase) GetWorkflowExecutionInfo ¶
func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) ( *WorkflowMutableState, error)
GetWorkflowExecutionInfo is a utility method to retrieve execution info
func (*TestBase) SetupWorkflowStore ¶
func (s *TestBase) SetupWorkflowStore()
SetupWorkflowStore to setup workflow test base
func (*TestBase) SetupWorkflowStoreWithOptions ¶
func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)
SetupWorkflowStoreWithOptions to setup workflow test base
func (*TestBase) TearDownWorkflowStore ¶
func (s *TestBase) TearDownWorkflowStore()
TearDownWorkflowStore to cleanup
func (*TestBase) UpdateShard ¶
UpdateShard is a utility method to update the shard using persistence layer
func (*TestBase) UpdateWorkflowExecution ¶
func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error
UpdateWorkflowExecution is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionAndDelete ¶
func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error
UpdateWorkflowExecutionAndDelete is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated ¶ added in v0.3.2
func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, childInfos []*ChildExecutionInfo) error
UpdateWorkflowExecutionForChildExecutionsInitiated is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionForRequestCancel ¶
func (s *TestBase) UpdateWorkflowExecutionForRequestCancel( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertRequestCancelInfo []*RequestCancelInfo) error
UpdateWorkflowExecutionForRequestCancel is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithRangeID ¶
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64) error
UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (*TestBase) UpdateWorkflowExecutionWithTransferTasks ¶
func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks( updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertActivityInfo []*ActivityInfo) error
UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution
func (*TestBase) UpsertChildExecutionsState ¶
func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64, upsertChildInfos []*ChildExecutionInfo) error
UpsertChildExecutionsState is a utility method to update mutable state of workflow execution
func (*TestBase) UpsertRequestCancelState ¶
func (s *TestBase) UpsertRequestCancelState(updatedInfo *WorkflowExecutionInfo, condition int64, upsertCancelInfos []*RequestCancelInfo) error
UpsertRequestCancelState is a utility method to update mutable state of workflow execution
type TestBaseOptions ¶
type TestBaseOptions struct { ClusterHost string ClusterPort int ClusterUser string ClusterPassword string KeySpace string Datacenter string DropKeySpace bool SchemaDir string }
TestBaseOptions options to configure workflow test base.
type TimeoutError ¶
type TimeoutError struct {
Msg string
}
TimeoutError is returned when a write operation fails due to a timeout
func (*TimeoutError) Error ¶
func (e *TimeoutError) Error() string
type TimerTaskInfo ¶
type TimerTaskInfo struct { DomainID string WorkflowID string RunID string VisibilityTimestamp time.Time TaskID int64 TaskType int TimeoutType int EventID int64 ScheduleAttempt int64 }
TimerTaskInfo describes a timer task.
type TransferTaskIDGenerator ¶ added in v0.3.0
TransferTaskIDGenerator generates IDs for transfer tasks written by helper methods
type TransferTaskInfo ¶
type TransferTaskInfo struct { DomainID string WorkflowID string RunID string TaskID int64 TargetDomainID string TargetWorkflowID string TargetRunID string TaskList string TaskType int ScheduleID int64 }
TransferTaskInfo describes a transfer task
type UnknownEncodingTypeError ¶
type UnknownEncodingTypeError struct {
// contains filtered or unexported fields
}
UnknownEncodingTypeError is an error type that's returned when the encoding type provided as input is unknown or unsupported
func (*UnknownEncodingTypeError) Error ¶
func (e *UnknownEncodingTypeError) Error() string
type UpdateDomainRequest ¶
type UpdateDomainRequest struct { Info *DomainInfo Config *DomainConfig }
UpdateDomainRequest is used to update domain
type UpdateShardRequest ¶
UpdateShardRequest is used to update shard information
type UpdateTaskListRequest ¶
type UpdateTaskListRequest struct {
TaskListInfo *TaskListInfo
}
UpdateTaskListRequest is used to update task list implementation information
type UpdateTaskListResponse ¶
type UpdateTaskListResponse struct { }
UpdateTaskListResponse is the response to UpdateTaskList
type UpdateWorkflowExecutionRequest ¶
type UpdateWorkflowExecutionRequest struct { ExecutionInfo *WorkflowExecutionInfo TransferTasks []Task TimerTasks []Task DeleteTimerTask Task Condition int64 RangeID int64 ContinueAsNew *CreateWorkflowExecutionRequest CloseExecution bool // Mutable state UpsertActivityInfos []*ActivityInfo DeleteActivityInfo *int64 UpserTimerInfos []*TimerInfo DeleteTimerInfos []string UpsertChildExecutionInfos []*ChildExecutionInfo DeleteChildExecutionInfo *int64 UpsertRequestCancelInfos []*RequestCancelInfo DeleteRequestCancelInfo *int64 NewBufferedEvents *SerializedHistoryEventBatch ClearBufferedEvents bool }
UpdateWorkflowExecutionRequest is used to update a workflow execution
type UserTimerTask ¶
UserTimerTask identifies a timeout task.
func (*UserTimerTask) GetTaskID ¶
func (u *UserTimerTask) GetTaskID() int64
GetTaskID returns the sequence ID of the timer task.
func (*UserTimerTask) GetType ¶
func (u *UserTimerTask) GetType() int
GetType returns the type of the timer task
func (*UserTimerTask) GetVisibilityTimestamp ¶
func (u *UserTimerTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*UserTimerTask) SetTaskID ¶
func (u *UserTimerTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the timer task.
func (*UserTimerTask) SetVisibilityTimestamp ¶
func (u *UserTimerTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp
type VisibilityManager ¶
type VisibilityManager interface { Closeable RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error) }
VisibilityManager is used to manage the visibility store
func NewCassandraVisibilityPersistence ¶
func NewCassandraVisibilityPersistence( hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error)
NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation
type WorkflowExecutionInfo ¶
type WorkflowExecutionInfo struct { DomainID string WorkflowID string RunID string ParentDomainID string ParentWorkflowID string ParentRunID string InitiatedID int64 CompletionEvent []byte TaskList string WorkflowTypeName string WorkflowTimeout int32 DecisionTimeoutValue int32 ExecutionContext []byte State int CloseStatus int NextEventID int64 LastProcessedEvent int64 StartTimestamp time.Time LastUpdatedTimestamp time.Time CreateRequestID string DecisionScheduleID int64 DecisionStartedID int64 DecisionRequestID string DecisionTimeout int32 DecisionAttempt int64 DecisionTimestamp int64 CancelRequested bool CancelRequestID string StickyTaskList string StickyScheduleToStartTimeout int32 }
WorkflowExecutionInfo describes a workflow execution
type WorkflowMutableState ¶
type WorkflowMutableState struct { ActivitInfos map[int64]*ActivityInfo TimerInfos map[string]*TimerInfo ChildExecutionInfos map[int64]*ChildExecutionInfo RequestCancelInfos map[int64]*RequestCancelInfo ExecutionInfo *WorkflowExecutionInfo BufferedEvents []*SerializedHistoryEventBatch }
WorkflowMutableState indicates workflow related state
type WorkflowTimeoutTask ¶
WorkflowTimeoutTask identifies a timeout task.
func (*WorkflowTimeoutTask) GetTaskID ¶
func (u *WorkflowTimeoutTask) GetTaskID() int64
GetTaskID returns the sequence ID of the cancel transfer task.
func (*WorkflowTimeoutTask) GetType ¶
func (u *WorkflowTimeoutTask) GetType() int
GetType returns the type of the timeout task.
func (*WorkflowTimeoutTask) GetVisibilityTimestamp ¶
func (u *WorkflowTimeoutTask) GetVisibilityTimestamp() time.Time
GetVisibilityTimestamp gets the visibility time stamp
func (*WorkflowTimeoutTask) SetTaskID ¶
func (u *WorkflowTimeoutTask) SetTaskID(id int64)
SetTaskID sets the sequence ID of the cancel transfer task.
func (*WorkflowTimeoutTask) SetVisibilityTimestamp ¶
func (u *WorkflowTimeoutTask) SetVisibilityTimestamp(t time.Time)
SetVisibilityTimestamp gets the visibility time stamp