persistence

package
v0.3.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2018 License: MIT Imports: 17 Imported by: 35

Documentation

Index

Constants

View Source
const (
	DomainTableVersionV1 = 1
	DomainTableVersionV2 = 2
)

TODO remove this table version this is a temp version indicating where is the domain resides either V1 or V2

View Source
const (
	DomainStatusRegistered = iota
	DomainStatusDeprecated
	DomainStatusDeleted
)

Domain status

View Source
const (
	WorkflowStateCreated = iota
	WorkflowStateRunning
	WorkflowStateCompleted
)

Workflow execution states

View Source
const (
	WorkflowCloseStatusNone = iota
	WorkflowCloseStatusCompleted
	WorkflowCloseStatusFailed
	WorkflowCloseStatusCanceled
	WorkflowCloseStatusTerminated
	WorkflowCloseStatusContinuedAsNew
	WorkflowCloseStatusTimedOut
)

Workflow execution close status

View Source
const (
	TaskListTypeDecision = iota
	TaskListTypeActivity
)

Types of task lists

View Source
const (
	TaskListKindNormal = iota
	TaskListKindSticky
)

Kinds of task lists

View Source
const (
	TransferTaskTypeDecisionTask = iota
	TransferTaskTypeActivityTask
	TransferTaskTypeCloseExecution
	TransferTaskTypeCancelExecution
	TransferTaskTypeStartChildExecution
	TransferTaskTypeSignalExecution
)

Transfer task types

View Source
const (
	ReplicationTaskTypeHistory = iota
	ReplicationTaskTypeHeartbeat
)

Types of replication tasks

View Source
const (
	TaskTypeDecisionTimeout = iota
	TaskTypeActivityTimeout
	TaskTypeUserTimer
	TaskTypeWorkflowTimeout
	TaskTypeDeleteHistoryEvent
	TaskTypeRetryTimer
)

Types of timers

View Source
const (
	// DefaultEncodingType is the default encoding format for persisted history
	DefaultEncodingType = common.EncodingTypeJSON
)

Variables

This section is empty.

Functions

func GetDefaultHistoryVersion

func GetDefaultHistoryVersion() int

GetDefaultHistoryVersion returns the default history version

func GetMaxSupportedHistoryVersion

func GetMaxSupportedHistoryVersion() int

GetMaxSupportedHistoryVersion returns the max supported version

func GetOrUseDefaultActiveCluster added in v0.3.7

func GetOrUseDefaultActiveCluster(currentClusterName string, activeClusterName string) string

GetOrUseDefaultActiveCluster return the current cluster name or use the input if valid

func GetVisibilityTSFrom

func GetVisibilityTSFrom(task Task) time.Time

GetVisibilityTSFrom - helper method to get visibility timestamp

func NewHistoryVersionCompatibilityError

func NewHistoryVersionCompatibilityError(required int, supported int) error

NewHistoryVersionCompatibilityError returns a new instance of compatibility error type

func NewUnknownEncodingTypeError

func NewUnknownEncodingTypeError(encodingType common.EncodingType) error

NewUnknownEncodingTypeError returns a new instance of encoding type error

func SetDefaultHistoryVersion

func SetDefaultHistoryVersion(version int)

SetDefaultHistoryVersion resets the default history version only intended for integration test

func SetMaxSupportedHistoryVersion

func SetMaxSupportedHistoryVersion(version int)

SetMaxSupportedHistoryVersion resets the max supported history version this method is only intended for integration test

func SetSerializedHistoryDefaults added in v0.3.11

func SetSerializedHistoryDefaults(history *SerializedHistoryEventBatch)

SetSerializedHistoryDefaults sets the version and encoding types to defaults if they are missing from persistence. This is purely for backwards compatibility

func SetVisibilityTSFrom

func SetVisibilityTSFrom(task Task, t time.Time)

SetVisibilityTSFrom - helper method to set visibility timestamp

Types

type ActivityInfo

type ActivityInfo struct {
	Version                  int64
	ScheduleID               int64
	ScheduledEvent           []byte
	ScheduledTime            time.Time
	StartedID                int64
	StartedEvent             []byte
	StartedTime              time.Time
	ActivityID               string
	RequestID                string
	Details                  []byte
	ScheduleToStartTimeout   int32
	ScheduleToCloseTimeout   int32
	StartToCloseTimeout      int32
	HeartbeatTimeout         int32
	CancelRequested          bool
	CancelRequestID          int64
	LastHeartBeatUpdatedTime time.Time
	TimerTaskStatus          int32
	// For retry
	Attempt            int32
	DomainID           string
	StartedIdentity    string
	TaskList           string
	HasRetryPolicy     bool
	InitialInterval    int32
	BackoffCoefficient float64
	MaximumInterval    int32
	ExpirationTime     time.Time
	MaximumAttempts    int32
	NonRetriableErrors []string
}

ActivityInfo details.

type ActivityTask

type ActivityTask struct {
	TaskID     int64
	DomainID   string
	TaskList   string
	ScheduleID int64
	Version    int64
}

ActivityTask identifies a transfer task for activity

func (*ActivityTask) GetTaskID

func (a *ActivityTask) GetTaskID() int64

GetTaskID returns the sequence ID of the activity task

func (*ActivityTask) GetType

func (a *ActivityTask) GetType() int

GetType returns the type of the activity task

func (*ActivityTask) GetVersion added in v0.3.12

func (a *ActivityTask) GetVersion() int64

GetVersion returns the version of the activity task

func (*ActivityTask) SetTaskID

func (a *ActivityTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the activity task

func (*ActivityTask) SetVersion added in v0.3.12

func (a *ActivityTask) SetVersion(version int64)

SetVersion returns the version of the activity task

type ActivityTimeoutTask

type ActivityTimeoutTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	TimeoutType         int
	EventID             int64
	Attempt             int64
	Version             int64
}

ActivityTimeoutTask identifies a timeout task.

func (*ActivityTimeoutTask) GetTaskID

func (a *ActivityTimeoutTask) GetTaskID() int64

GetTaskID returns the sequence ID.

func (*ActivityTimeoutTask) GetType

func (a *ActivityTimeoutTask) GetType() int

GetType returns the type of the timer task

func (*ActivityTimeoutTask) GetVersion added in v0.3.12

func (a *ActivityTimeoutTask) GetVersion() int64

GetVersion returns the version of the timer task

func (*ActivityTimeoutTask) GetVisibilityTimestamp

func (a *ActivityTimeoutTask) GetVisibilityTimestamp() time.Time

GetVisibilityTimestamp gets the visibility time stamp

func (*ActivityTimeoutTask) SetTaskID

func (a *ActivityTimeoutTask) SetTaskID(id int64)

SetTaskID sets the sequence ID.

func (*ActivityTimeoutTask) SetVersion added in v0.3.12

func (a *ActivityTimeoutTask) SetVersion(version int64)

SetVersion returns the version of the timer task

func (*ActivityTimeoutTask) SetVisibilityTimestamp

func (a *ActivityTimeoutTask) SetVisibilityTimestamp(t time.Time)

SetVisibilityTimestamp gets the visibility time stamp

type AppendHistoryEventsRequest

type AppendHistoryEventsRequest struct {
	DomainID      string
	Execution     workflow.WorkflowExecution
	FirstEventID  int64
	RangeID       int64
	TransactionID int64
	Events        *SerializedHistoryEventBatch
	Overwrite     bool
}

AppendHistoryEventsRequest is used to append new events to workflow execution history

type BufferedReplicationTask added in v0.3.12

type BufferedReplicationTask struct {
	FirstEventID  int64
	NextEventID   int64
	Version       int64
	History       *SerializedHistoryEventBatch
	NewRunHistory *SerializedHistoryEventBatch
}

BufferedReplicationTask has details to handle out of order receive of history events

type CancelExecutionTask

type CancelExecutionTask struct {
	TaskID                  int64
	TargetDomainID          string
	TargetWorkflowID        string
	TargetRunID             string
	TargetChildWorkflowOnly bool
	InitiatedID             int64
	Version                 int64
}

CancelExecutionTask identifies a transfer task for cancel of execution

func (*CancelExecutionTask) GetTaskID

func (u *CancelExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the cancel transfer task.

func (*CancelExecutionTask) GetType

func (u *CancelExecutionTask) GetType() int

GetType returns the type of the cancel transfer task

func (*CancelExecutionTask) GetVersion added in v0.3.12

func (u *CancelExecutionTask) GetVersion() int64

GetVersion returns the version of the cancel transfer task

func (*CancelExecutionTask) SetTaskID

func (u *CancelExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the cancel transfer task.

func (*CancelExecutionTask) SetVersion added in v0.3.12

func (u *CancelExecutionTask) SetVersion(version int64)

SetVersion returns the version of the cancel transfer task

type CassandraTestCluster

type CassandraTestCluster struct {
	// contains filtered or unexported fields
}

CassandraTestCluster allows executing cassandra operations in testing.

type ChildExecutionInfo

type ChildExecutionInfo struct {
	Version         int64
	InitiatedID     int64
	InitiatedEvent  []byte
	StartedID       int64
	StartedEvent    []byte
	CreateRequestID string
}

ChildExecutionInfo has details for pending child executions.

type CloseExecutionTask added in v0.3.3

type CloseExecutionTask struct {
	TaskID  int64
	Version int64
}

CloseExecutionTask identifies a transfer task for deletion of execution

func (*CloseExecutionTask) GetTaskID added in v0.3.3

func (a *CloseExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the close execution task

func (*CloseExecutionTask) GetType added in v0.3.3

func (a *CloseExecutionTask) GetType() int

GetType returns the type of the close execution task

func (*CloseExecutionTask) GetVersion added in v0.3.12

func (a *CloseExecutionTask) GetVersion() int64

GetVersion returns the version of the close execution task

func (*CloseExecutionTask) SetTaskID added in v0.3.3

func (a *CloseExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the close execution task

func (*CloseExecutionTask) SetVersion added in v0.3.12

func (a *CloseExecutionTask) SetVersion(version int64)

SetVersion returns the version of the close execution task

type Closeable

type Closeable interface {
	Close()
}

Closeable is an interface for any entity that supports a close operation to release resources

type ClusterReplicationConfig added in v0.3.7

type ClusterReplicationConfig struct {
	ClusterName string
}

ClusterReplicationConfig describes the cross DC cluster replication configuration

func GetOrUseDefaultClusters added in v0.3.7

func GetOrUseDefaultClusters(currentClusterName string, clusters []*ClusterReplicationConfig) []*ClusterReplicationConfig

GetOrUseDefaultClusters return the current cluster or use the input if valid

type CompleteReplicationTaskRequest added in v0.3.11

type CompleteReplicationTaskRequest struct {
	TaskID int64
}

CompleteReplicationTaskRequest is used to complete a task in the replication task queue

type CompleteTaskRequest

type CompleteTaskRequest struct {
	TaskList *TaskListInfo
	TaskID   int64
}

CompleteTaskRequest is used to complete a task

type CompleteTimerTaskRequest

type CompleteTimerTaskRequest struct {
	VisibilityTimestamp time.Time
	TaskID              int64
}

CompleteTimerTaskRequest is used to complete a task in the timer task queue

type CompleteTransferTaskRequest

type CompleteTransferTaskRequest struct {
	TaskID int64
}

CompleteTransferTaskRequest is used to complete a task in the transfer task queue

type ConditionFailedError

type ConditionFailedError struct {
	Msg string
}

ConditionFailedError represents a failed conditional put

func (*ConditionFailedError) Error

func (e *ConditionFailedError) Error() string

type CreateDomainRequest

type CreateDomainRequest struct {
	Info              *DomainInfo
	Config            *DomainConfig
	ReplicationConfig *DomainReplicationConfig
	IsGlobalDomain    bool
	ConfigVersion     int64
	FailoverVersion   int64
}

CreateDomainRequest is used to create the domain

type CreateDomainResponse

type CreateDomainResponse struct {
	ID string
}

CreateDomainResponse is the response for CreateDomain

type CreateShardRequest

type CreateShardRequest struct {
	ShardInfo *ShardInfo
}

CreateShardRequest is used to create a shard in executions table

type CreateTaskInfo

type CreateTaskInfo struct {
	Execution workflow.WorkflowExecution
	Data      *TaskInfo
	TaskID    int64
}

CreateTaskInfo describes a task to be created in CreateTasksRequest

type CreateTasksRequest

type CreateTasksRequest struct {
	TaskListInfo *TaskListInfo
	Tasks        []*CreateTaskInfo
}

CreateTasksRequest is used to create a new task for a workflow exectution

type CreateTasksResponse

type CreateTasksResponse struct {
}

CreateTasksResponse is the response to CreateTasksRequest

type CreateWorkflowExecutionRequest

type CreateWorkflowExecutionRequest struct {
	RequestID                   string
	DomainID                    string
	Execution                   workflow.WorkflowExecution
	ParentDomainID              string
	ParentExecution             *workflow.WorkflowExecution
	InitiatedID                 int64
	TaskList                    string
	WorkflowTypeName            string
	WorkflowTimeout             int32
	DecisionTimeoutValue        int32
	ExecutionContext            []byte
	NextEventID                 int64
	LastProcessedEvent          int64
	TransferTasks               []Task
	ReplicationTasks            []Task
	TimerTasks                  []Task
	RangeID                     int64
	DecisionVersion             int64
	DecisionScheduleID          int64
	DecisionStartedID           int64
	DecisionStartToCloseTimeout int32
	ContinueAsNew               bool
	PreviousRunID               string
	ReplicationState            *ReplicationState
}

CreateWorkflowExecutionRequest is used to write a new workflow execution

type CreateWorkflowExecutionResponse

type CreateWorkflowExecutionResponse struct {
	TaskID string
}

CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest

type DecisionTask

type DecisionTask struct {
	TaskID     int64
	DomainID   string
	TaskList   string
	ScheduleID int64
	Version    int64
}

DecisionTask identifies a transfer task for decision

func (*DecisionTask) GetTaskID

func (d *DecisionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the decision task.

func (*DecisionTask) GetType

func (d *DecisionTask) GetType() int

GetType returns the type of the decision task

func (*DecisionTask) GetVersion added in v0.3.12

func (d *DecisionTask) GetVersion() int64

GetVersion returns the version of the decision task

func (*DecisionTask) SetTaskID

func (d *DecisionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the decision task

func (*DecisionTask) SetVersion added in v0.3.12

func (d *DecisionTask) SetVersion(version int64)

SetVersion returns the version of the decision task

type DecisionTimeoutTask

type DecisionTimeoutTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	EventID             int64
	ScheduleAttempt     int64
	TimeoutType         int
	Version             int64
}

DecisionTimeoutTask identifies a timeout task.

func (*DecisionTimeoutTask) GetTaskID

func (d *DecisionTimeoutTask) GetTaskID() int64

GetTaskID returns the sequence ID.

func (*DecisionTimeoutTask) GetType

func (d *DecisionTimeoutTask) GetType() int

GetType returns the type of the timer task

func (*DecisionTimeoutTask) GetVersion added in v0.3.12

func (d *DecisionTimeoutTask) GetVersion() int64

GetVersion returns the version of the timer task

func (*DecisionTimeoutTask) GetVisibilityTimestamp

func (d *DecisionTimeoutTask) GetVisibilityTimestamp() time.Time

GetVisibilityTimestamp gets the visibility time stamp

func (*DecisionTimeoutTask) SetTaskID

func (d *DecisionTimeoutTask) SetTaskID(id int64)

SetTaskID sets the sequence ID.

func (*DecisionTimeoutTask) SetVersion added in v0.3.12

func (d *DecisionTimeoutTask) SetVersion(version int64)

SetVersion returns the version of the timer task

func (*DecisionTimeoutTask) SetVisibilityTimestamp

func (d *DecisionTimeoutTask) SetVisibilityTimestamp(t time.Time)

SetVisibilityTimestamp gets the visibility time stamp

type DeleteDomainByNameRequest

type DeleteDomainByNameRequest struct {
	Name string
}

DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table

type DeleteDomainRequest

type DeleteDomainRequest struct {
	ID string
}

DeleteDomainRequest is used to delete domain entry from domains table

type DeleteHistoryEventTask

type DeleteHistoryEventTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	Version             int64
}

DeleteHistoryEventTask identifies a timer task for deletion of history events of completed execution.

func (*DeleteHistoryEventTask) GetTaskID

func (a *DeleteHistoryEventTask) GetTaskID() int64

GetTaskID returns the sequence ID of the delete execution task

func (*DeleteHistoryEventTask) GetType

func (a *DeleteHistoryEventTask) GetType() int

GetType returns the type of the delete execution task

func (*DeleteHistoryEventTask) GetVersion added in v0.3.12

func (a *DeleteHistoryEventTask) GetVersion() int64

GetVersion returns the version of the delete execution task

func (*DeleteHistoryEventTask) SetTaskID

func (a *DeleteHistoryEventTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the delete execution task

func (*DeleteHistoryEventTask) SetVersion added in v0.3.12

func (a *DeleteHistoryEventTask) SetVersion(version int64)

SetVersion returns the version of the delete execution task

type DeleteWorkflowExecutionHistoryRequest

type DeleteWorkflowExecutionHistoryRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
}

DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history

type DeleteWorkflowExecutionRequest

type DeleteWorkflowExecutionRequest struct {
	DomainID   string
	WorkflowID string
	RunID      string
}

DeleteWorkflowExecutionRequest is used to delete a workflow execution

type DomainConfig

type DomainConfig struct {
	// NOTE: this retention is in days, not in seconds
	Retention  int32
	EmitMetric bool
}

DomainConfig describes the domain configuration

type DomainInfo

type DomainInfo struct {
	ID          string
	Name        string
	Status      int
	Description string
	OwnerEmail  string
}

DomainInfo describes the domain entity

type DomainReplicationConfig added in v0.3.7

type DomainReplicationConfig struct {
	ActiveClusterName string
	Clusters          []*ClusterReplicationConfig
}

DomainReplicationConfig describes the cross DC domain replication configuration

type ExecutionManager

type ExecutionManager interface {
	Closeable
	CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
	GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
	UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
	ResetMutableState(request *ResetMutableStateRequest) error
	DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
	GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
	GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
	CompleteTransferTask(request *CompleteTransferTaskRequest) error

	// Replication task related methods
	GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
	CompleteReplicationTask(request *CompleteReplicationTaskRequest) error

	// Timer related methods.
	GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
	CompleteTimerTask(request *CompleteTimerTaskRequest) error
}

ExecutionManager is used to manage workflow executions

func NewCassandraWorkflowExecutionPersistence

func NewCassandraWorkflowExecutionPersistence(shardID int, session *gocql.Session,
	logger bark.Logger) (ExecutionManager, error)

NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation

func NewWorkflowExecutionPersistenceClient

func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client, logger bark.Logger) ExecutionManager

NewWorkflowExecutionPersistenceClient creates a client to manage executions

type ExecutionManagerFactory

type ExecutionManagerFactory interface {
	Closeable
	CreateExecutionManager(shardID int) (ExecutionManager, error)
}

ExecutionManagerFactory creates an instance of ExecutionManager for a given shard

func NewCassandraPersistenceClientFactory added in v0.3.2

func NewCassandraPersistenceClientFactory(hosts string, port int, user, password, dc string, keyspace string,
	numConns int, logger bark.Logger, metricsClient metrics.Client) (ExecutionManagerFactory, error)

NewCassandraPersistenceClientFactory is used to create an instance of ExecutionManagerFactory implementation

type GetClosedWorkflowExecutionRequest

type GetClosedWorkflowExecutionRequest struct {
	DomainUUID string
	Execution  s.WorkflowExecution
}

GetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution

type GetClosedWorkflowExecutionResponse

type GetClosedWorkflowExecutionResponse struct {
	Execution *s.WorkflowExecutionInfo
}

GetClosedWorkflowExecutionResponse is the response to GetClosedWorkflowExecutionRequest

type GetCurrentExecutionRequest

type GetCurrentExecutionRequest struct {
	DomainID   string
	WorkflowID string
}

GetCurrentExecutionRequest is used to retrieve the current RunId for an execution

type GetCurrentExecutionResponse

type GetCurrentExecutionResponse struct {
	StartRequestID string
	RunID          string
	State          int
	CloseStatus    int
}

GetCurrentExecutionResponse is the response to GetCurrentExecution

type GetDomainRequest

type GetDomainRequest struct {
	ID   string
	Name string
}

GetDomainRequest is used to read domain

type GetDomainResponse

type GetDomainResponse struct {
	Info                        *DomainInfo
	Config                      *DomainConfig
	ReplicationConfig           *DomainReplicationConfig
	IsGlobalDomain              bool
	ConfigVersion               int64
	FailoverVersion             int64
	FailoverNotificationVersion int64
	NotificationVersion         int64
	TableVersion                int
}

GetDomainResponse is the response for GetDomain

type GetMetadataResponse added in v0.3.13

type GetMetadataResponse struct {
	NotificationVersion int64
}

GetMetadataResponse is the response for GetMetadata

type GetReplicationTasksRequest added in v0.3.11

type GetReplicationTasksRequest struct {
	ReadLevel     int64
	MaxReadLevel  int64
	BatchSize     int
	NextPageToken []byte
}

GetReplicationTasksRequest is used to read tasks from the replication task queue

type GetReplicationTasksResponse added in v0.3.11

type GetReplicationTasksResponse struct {
	Tasks         []*ReplicationTaskInfo
	NextPageToken []byte
}

GetReplicationTasksResponse is the response to GetReplicationTask

type GetShardRequest

type GetShardRequest struct {
	ShardID int
}

GetShardRequest is used to get shard information

type GetShardResponse

type GetShardResponse struct {
	ShardInfo *ShardInfo
}

GetShardResponse is the response to GetShard

type GetTasksRequest

type GetTasksRequest struct {
	DomainID     string
	TaskList     string
	TaskType     int
	ReadLevel    int64
	MaxReadLevel int64 // inclusive
	BatchSize    int
	RangeID      int64
}

GetTasksRequest is used to retrieve tasks of a task list

type GetTasksResponse

type GetTasksResponse struct {
	Tasks []*TaskInfo
}

GetTasksResponse is the response to GetTasksRequests

type GetTimerIndexTasksRequest

type GetTimerIndexTasksRequest struct {
	MinTimestamp  time.Time
	MaxTimestamp  time.Time
	BatchSize     int
	NextPageToken []byte
}

GetTimerIndexTasksRequest is the request for GetTimerIndexTasks TODO: replace this with an iterator that can configure min and max index.

type GetTimerIndexTasksResponse

type GetTimerIndexTasksResponse struct {
	Timers        []*TimerTaskInfo
	NextPageToken []byte
}

GetTimerIndexTasksResponse is the response for GetTimerIndexTasks

type GetTransferTasksRequest

type GetTransferTasksRequest struct {
	ReadLevel     int64
	MaxReadLevel  int64
	BatchSize     int
	NextPageToken []byte
}

GetTransferTasksRequest is used to read tasks from the transfer task queue

type GetTransferTasksResponse

type GetTransferTasksResponse struct {
	Tasks         []*TransferTaskInfo
	NextPageToken []byte
}

GetTransferTasksResponse is the response to GetTransferTasksRequest

type GetWorkflowExecutionHistoryRequest

type GetWorkflowExecutionHistoryRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
	// Get the history events from FirstEventID. Inclusive.
	FirstEventID int64
	// Get the history events upto NextEventID.  Not Inclusive.
	NextEventID int64
	// Maximum number of history append transactions per page
	PageSize int
	// Token to continue reading next page of history append transactions.  Pass in empty slice for first page
	NextPageToken []byte
}

GetWorkflowExecutionHistoryRequest is used to retrieve history of a workflow execution

type GetWorkflowExecutionHistoryResponse

type GetWorkflowExecutionHistoryResponse struct {
	// Slice of history append transaction batches
	Events []SerializedHistoryEventBatch
	// Token to read next page if there are more events beyond page size.
	// Use this to set NextPageToken on GetworkflowExecutionHistoryRequest to read the next page.
	NextPageToken []byte
}

GetWorkflowExecutionHistoryResponse is the response to GetWorkflowExecutionHistoryRequest

type GetWorkflowExecutionRequest

type GetWorkflowExecutionRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
}

GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution

type GetWorkflowExecutionResponse

type GetWorkflowExecutionResponse struct {
	State *WorkflowMutableState
}

GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest

type HistoryDeserializationError

type HistoryDeserializationError struct {
	// contains filtered or unexported fields
}

HistoryDeserializationError is an error type that's returned on a history deserialization failure

func (*HistoryDeserializationError) Error

type HistoryEventBatch

type HistoryEventBatch struct {
	Version int
	Events  []*workflow.HistoryEvent
}

HistoryEventBatch represents a batch of history events

func NewHistoryEventBatch

func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch

NewHistoryEventBatch returns a new instance of HistoryEventBatch

func (*HistoryEventBatch) String

func (b *HistoryEventBatch) String() string

type HistoryManager

type HistoryManager interface {
	Closeable
	AppendHistoryEvents(request *AppendHistoryEventsRequest) error
	// GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution
	GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse,
		error)
	DeleteWorkflowExecutionHistory(request *DeleteWorkflowExecutionHistoryRequest) error
}

HistoryManager is used to manage Workflow Execution HistoryEventBatch

func NewCassandraHistoryPersistence

func NewCassandraHistoryPersistence(hosts string, port int, user, password, dc string, keyspace string,
	numConns int, logger bark.Logger) (HistoryManager,
	error)

NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation

func NewHistoryPersistenceClient

func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client, logger bark.Logger) HistoryManager

NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history

type HistoryReplicationTask added in v0.3.11

type HistoryReplicationTask struct {
	TaskID              int64
	FirstEventID        int64
	NextEventID         int64
	Version             int64
	LastReplicationInfo map[string]*ReplicationInfo
}

HistoryReplicationTask is the transfer task created for shipping history replication events to other clusters

func (*HistoryReplicationTask) GetTaskID added in v0.3.11

func (a *HistoryReplicationTask) GetTaskID() int64

GetTaskID returns the sequence ID of the history replication task

func (*HistoryReplicationTask) GetType added in v0.3.11

func (a *HistoryReplicationTask) GetType() int

GetType returns the type of the history replication task

func (*HistoryReplicationTask) GetVersion added in v0.3.12

func (a *HistoryReplicationTask) GetVersion() int64

GetVersion returns the version of the history replication task

func (*HistoryReplicationTask) SetTaskID added in v0.3.11

func (a *HistoryReplicationTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the history replication task

func (*HistoryReplicationTask) SetVersion added in v0.3.12

func (a *HistoryReplicationTask) SetVersion(version int64)

SetVersion returns the version of the history replication task

type HistorySerializationError

type HistorySerializationError struct {
	// contains filtered or unexported fields
}

HistorySerializationError is an error type that's returned on a history serialization failure

func (*HistorySerializationError) Error

func (e *HistorySerializationError) Error() string

type HistorySerializer

type HistorySerializer interface {
	Serialize(batch *HistoryEventBatch) (*SerializedHistoryEventBatch, error)
	Deserialize(batch *SerializedHistoryEventBatch) (*HistoryEventBatch, error)
}

HistorySerializer is used to serialize/deserialize history

func NewJSONHistorySerializer

func NewJSONHistorySerializer() HistorySerializer

NewJSONHistorySerializer returns a JSON HistorySerializer

type HistorySerializerFactory

type HistorySerializerFactory interface {
	// Get returns a history serializer corresponding
	// to a given encoding type
	Get(encodingType common.EncodingType) (HistorySerializer, error)
}

HistorySerializerFactory is a factory that vends HistorySerializers based on encoding type.

func NewHistorySerializerFactory

func NewHistorySerializerFactory() HistorySerializerFactory

NewHistorySerializerFactory creates and returns an instance of HistorySerializerFactory

type HistoryVersionCompatibilityError

type HistoryVersionCompatibilityError struct {
	// contains filtered or unexported fields
}

HistoryVersionCompatibilityError is an error type that's returned when history serialization or deserialization cannot proceed due to version incompatibility

func (*HistoryVersionCompatibilityError) Error

type LeaseTaskListRequest

type LeaseTaskListRequest struct {
	DomainID     string
	TaskList     string
	TaskType     int
	TaskListKind int
}

LeaseTaskListRequest is used to request lease of a task list

type LeaseTaskListResponse

type LeaseTaskListResponse struct {
	TaskListInfo *TaskListInfo
}

LeaseTaskListResponse is response to LeaseTaskListRequest

type ListClosedWorkflowExecutionsByStatusRequest

type ListClosedWorkflowExecutionsByStatusRequest struct {
	ListWorkflowExecutionsRequest
	Status s.WorkflowExecutionCloseStatus
}

ListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status

type ListDomainRequest added in v0.3.13

type ListDomainRequest struct {
	PageSize      int
	NextPageToken []byte
}

ListDomainRequest is used to list domains

type ListDomainResponse added in v0.3.13

type ListDomainResponse struct {
	Domains       []*GetDomainResponse
	NextPageToken []byte
}

ListDomainResponse is the response for GetDomain

type ListWorkflowExecutionsByTypeRequest

type ListWorkflowExecutionsByTypeRequest struct {
	ListWorkflowExecutionsRequest
	WorkflowTypeName string
}

ListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain

type ListWorkflowExecutionsByWorkflowIDRequest

type ListWorkflowExecutionsByWorkflowIDRequest struct {
	ListWorkflowExecutionsRequest
	WorkflowID string
}

ListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain

type ListWorkflowExecutionsRequest

type ListWorkflowExecutionsRequest struct {
	DomainUUID        string
	EarliestStartTime int64
	LatestStartTime   int64
	// Maximum number of workflow executions per page
	PageSize int
	// Token to continue reading next page of workflow executions.
	// Pass in empty slice for first page.
	NextPageToken []byte
}

ListWorkflowExecutionsRequest is used to list executions in a domain

type ListWorkflowExecutionsResponse

type ListWorkflowExecutionsResponse struct {
	Executions []*s.WorkflowExecutionInfo
	// Token to read next page if there are more workflow executions beyond page size.
	// Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page.
	NextPageToken []byte
}

ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest

type MetadataManager

type MetadataManager interface {
	Closeable
	CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error)
	GetDomain(request *GetDomainRequest) (*GetDomainResponse, error)
	UpdateDomain(request *UpdateDomainRequest) error
	DeleteDomain(request *DeleteDomainRequest) error
	DeleteDomainByName(request *DeleteDomainByNameRequest) error
	ListDomain(request *ListDomainRequest) (*ListDomainResponse, error)
	GetMetadata() (*GetMetadataResponse, error)
}

MetadataManager is used to manage metadata CRUD for domain entities

func NewCassandraMetadataPersistence

func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string,
	currentClusterName string, logger bark.Logger) (MetadataManager,
	error)

NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation

func NewCassandraMetadataPersistenceV2 added in v0.3.13

func NewCassandraMetadataPersistenceV2(hosts string, port int, user, password, dc string, keyspace string,
	currentClusterName string, logger bark.Logger) (MetadataManager, error)

NewCassandraMetadataPersistenceV2 is used to create an instance of HistoryManager implementation

func NewMetadataManagerProxy added in v0.3.13

func NewMetadataManagerProxy(hosts string, port int, user, password, dc string, keyspace string,
	currentClusterName string, logger bark.Logger) (MetadataManager, error)

NewMetadataManagerProxy is used for merging the functionality the v1 and v2 MetadataManager

func NewMetadataPersistenceClient

func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client, logger bark.Logger) MetadataManager

NewMetadataPersistenceClient creates a MetadataManager client to manage metadata

type RecordWorkflowExecutionClosedRequest

type RecordWorkflowExecutionClosedRequest struct {
	DomainUUID       string
	Execution        s.WorkflowExecution
	WorkflowTypeName string
	StartTimestamp   int64
	CloseTimestamp   int64
	Status           s.WorkflowExecutionCloseStatus
	HistoryLength    int64
	RetentionSeconds int64
}

RecordWorkflowExecutionClosedRequest is used to add a record of a newly closed execution

type RecordWorkflowExecutionStartedRequest

type RecordWorkflowExecutionStartedRequest struct {
	DomainUUID       string
	Execution        s.WorkflowExecution
	WorkflowTypeName string
	StartTimestamp   int64
	WorkflowTimeout  int64
}

RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution

type ReplicationInfo added in v0.3.11

type ReplicationInfo struct {
	Version     int64
	LastEventID int64
}

ReplicationInfo represents the information stored for last replication event details per cluster

type ReplicationState added in v0.3.11

type ReplicationState struct {
	CurrentVersion      int64
	StartVersion        int64
	LastWriteVersion    int64
	LastWriteEventID    int64
	LastReplicationInfo map[string]*ReplicationInfo
}

ReplicationState represents mutable state information for global domains. This information is used by replication protocol when applying events from remote clusters

type ReplicationTaskInfo added in v0.3.11

type ReplicationTaskInfo struct {
	DomainID            string
	WorkflowID          string
	RunID               string
	TaskID              int64
	TaskType            int
	FirstEventID        int64
	NextEventID         int64
	Version             int64
	LastReplicationInfo map[string]*ReplicationInfo
}

ReplicationTaskInfo describes the replication task created for replication of history events

func (*ReplicationTaskInfo) GetTaskID added in v0.3.11

func (t *ReplicationTaskInfo) GetTaskID() int64

GetTaskID returns the task ID for replication task

func (*ReplicationTaskInfo) GetTaskType added in v0.3.11

func (t *ReplicationTaskInfo) GetTaskType() int

GetTaskType returns the task type for replication task

func (*ReplicationTaskInfo) GetVersion added in v0.3.12

func (t *ReplicationTaskInfo) GetVersion() int64

GetVersion returns the task version for replication task

type RequestCancelInfo

type RequestCancelInfo struct {
	Version         int64
	InitiatedID     int64
	CancelRequestID string
}

RequestCancelInfo has details for pending external workflow cancellations

type ResetMutableStateRequest added in v0.3.12

type ResetMutableStateRequest struct {
	ExecutionInfo    *WorkflowExecutionInfo
	ReplicationState *ReplicationState
	Condition        int64
	RangeID          int64

	// Mutable state
	InsertActivityInfos       []*ActivityInfo
	InsertTimerInfos          []*TimerInfo
	InsertChildExecutionInfos []*ChildExecutionInfo
	InsertRequestCancelInfos  []*RequestCancelInfo
	InsertSignalInfos         []*SignalInfo
	InsertSignalRequestedIDs  []string
}

ResetMutableStateRequest is used to reset workflow execution state

type RetryTimerTask added in v0.3.12

type RetryTimerTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	EventID             int64
	Version             int64
	Attempt             int32
}

RetryTimerTask to schedule a retry task for activity

func (*RetryTimerTask) GetTaskID added in v0.3.12

func (r *RetryTimerTask) GetTaskID() int64

GetTaskID returns the sequence ID.

func (*RetryTimerTask) GetType added in v0.3.12

func (r *RetryTimerTask) GetType() int

GetType returns the type of the retry timer task

func (*RetryTimerTask) GetVersion added in v0.3.12

func (r *RetryTimerTask) GetVersion() int64

GetVersion returns the version of the retry timer task

func (*RetryTimerTask) GetVisibilityTimestamp added in v0.3.12

func (r *RetryTimerTask) GetVisibilityTimestamp() time.Time

GetVisibilityTimestamp gets the visibility time stamp

func (*RetryTimerTask) SetTaskID added in v0.3.12

func (r *RetryTimerTask) SetTaskID(id int64)

SetTaskID sets the sequence ID.

func (*RetryTimerTask) SetVersion added in v0.3.12

func (r *RetryTimerTask) SetVersion(version int64)

SetVersion returns the version of the retry timer task

func (*RetryTimerTask) SetVisibilityTimestamp added in v0.3.12

func (r *RetryTimerTask) SetVisibilityTimestamp(t time.Time)

SetVisibilityTimestamp gets the visibility time stamp

type SerializedHistoryEventBatch

type SerializedHistoryEventBatch struct {
	EncodingType common.EncodingType
	Version      int
	Data         []byte
}

SerializedHistoryEventBatch represents a serialized batch of history events

func NewSerializedHistoryEventBatch

func NewSerializedHistoryEventBatch(data []byte, encoding common.EncodingType, version int) *SerializedHistoryEventBatch

NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch

func (*SerializedHistoryEventBatch) String

func (h *SerializedHistoryEventBatch) String() string

type ShardAlreadyExistError

type ShardAlreadyExistError struct {
	Msg string
}

ShardAlreadyExistError is returned when conditionally creating a shard fails

func (*ShardAlreadyExistError) Error

func (e *ShardAlreadyExistError) Error() string

type ShardInfo

type ShardInfo struct {
	ShardID                   int
	Owner                     string
	RangeID                   int64
	StolenSinceRenew          int
	UpdatedAt                 time.Time
	ReplicationAckLevel       int64
	TransferAckLevel          int64     // TO BE DEPRECATED IN FAVOR OF ClusterTransferAckLevel
	TimerAckLevel             time.Time // TO BE DEPRECATED IN FAVOR OF ClusteerTimerAckLevel
	ClusterTransferAckLevel   map[string]int64
	ClusterTimerAckLevel      map[string]time.Time
	DomainNotificationVersion int64
}

ShardInfo describes a shard

type ShardManager

type ShardManager interface {
	Closeable
	CreateShard(request *CreateShardRequest) error
	GetShard(request *GetShardRequest) (*GetShardResponse, error)
	UpdateShard(request *UpdateShardRequest) error
}

ShardManager is used to manage all shards

func NewCassandraShardPersistence

func NewCassandraShardPersistence(hosts string, port int, user, password, dc string, keyspace string,
	currentClusterName string, logger bark.Logger) (ShardManager, error)

NewCassandraShardPersistence is used to create an instance of ShardManager implementation

func NewShardPersistenceClient

func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client, logger bark.Logger) ShardManager

NewShardPersistenceClient creates a client to manage shards

type ShardOwnershipLostError

type ShardOwnershipLostError struct {
	ShardID int
	Msg     string
}

ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard

func (*ShardOwnershipLostError) Error

func (e *ShardOwnershipLostError) Error() string

type SignalExecutionTask added in v0.3.6

type SignalExecutionTask struct {
	TaskID                  int64
	TargetDomainID          string
	TargetWorkflowID        string
	TargetRunID             string
	TargetChildWorkflowOnly bool
	InitiatedID             int64
	Version                 int64
}

SignalExecutionTask identifies a transfer task for signal execution

func (*SignalExecutionTask) GetTaskID added in v0.3.6

func (u *SignalExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the signal transfer task.

func (*SignalExecutionTask) GetType added in v0.3.6

func (u *SignalExecutionTask) GetType() int

GetType returns the type of the signal transfer task

func (*SignalExecutionTask) GetVersion added in v0.3.12

func (u *SignalExecutionTask) GetVersion() int64

GetVersion returns the version of the signal transfer task

func (*SignalExecutionTask) SetTaskID added in v0.3.6

func (u *SignalExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the signal transfer task.

func (*SignalExecutionTask) SetVersion added in v0.3.12

func (u *SignalExecutionTask) SetVersion(version int64)

SetVersion returns the version of the signal transfer task

type SignalInfo added in v0.3.6

type SignalInfo struct {
	Version         int64
	InitiatedID     int64
	SignalRequestID string
	SignalName      string
	Input           []byte
	Control         []byte
}

SignalInfo has details for pending external workflow signal

type StartChildExecutionTask

type StartChildExecutionTask struct {
	TaskID           int64
	TargetDomainID   string
	TargetWorkflowID string
	InitiatedID      int64
	Version          int64
}

StartChildExecutionTask identifies a transfer task for starting child execution

func (*StartChildExecutionTask) GetTaskID

func (u *StartChildExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the start child transfer task

func (*StartChildExecutionTask) GetType

func (u *StartChildExecutionTask) GetType() int

GetType returns the type of the start child transfer task

func (*StartChildExecutionTask) GetVersion added in v0.3.12

func (u *StartChildExecutionTask) GetVersion() int64

GetVersion returns the version of the start child transfer task

func (*StartChildExecutionTask) SetTaskID

func (u *StartChildExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the start child transfer task

func (*StartChildExecutionTask) SetVersion added in v0.3.12

func (u *StartChildExecutionTask) SetVersion(version int64)

SetVersion returns the version of the start child transfer task

type Task

type Task interface {
	GetType() int
	GetVersion() int64
	SetVersion(version int64)
	GetTaskID() int64
	SetTaskID(id int64)
}

Task is the generic interface for workflow tasks

type TaskInfo

type TaskInfo struct {
	DomainID               string
	WorkflowID             string
	RunID                  string
	TaskID                 int64
	ScheduleID             int64
	ScheduleToStartTimeout int32
}

TaskInfo describes either activity or decision task

type TaskListInfo

type TaskListInfo struct {
	DomainID string
	Name     string
	TaskType int
	RangeID  int64
	AckLevel int64
	Kind     int
}

TaskListInfo describes a state of a task list implementation.

type TaskManager

type TaskManager interface {
	Closeable
	LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
	UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
	CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error)
	GetTasks(request *GetTasksRequest) (*GetTasksResponse, error)
	CompleteTask(request *CompleteTaskRequest) error
}

TaskManager is used to manage tasks

func NewCassandraTaskPersistence

func NewCassandraTaskPersistence(hosts string, port int, user, password, dc string, keyspace string,
	logger bark.Logger) (TaskManager, error)

NewCassandraTaskPersistence is used to create an instance of TaskManager implementation

func NewTaskPersistenceClient

func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client, logger bark.Logger) TaskManager

NewTaskPersistenceClient creates a client to manage tasks

type TestBase

type TestBase struct {
	ShardMgr            ShardManager
	ExecutionMgrFactory ExecutionManagerFactory
	WorkflowMgr         ExecutionManager
	TaskMgr             TaskManager
	HistoryMgr          HistoryManager
	MetadataManager     MetadataManager
	MetadataManagerV2   MetadataManager
	MetadataProxy       MetadataManager
	VisibilityMgr       VisibilityManager
	ShardInfo           *ShardInfo
	TaskIDGenerator     TransferTaskIDGenerator
	ClusterMetadata     cluster.Metadata

	CassandraTestCluster
	// contains filtered or unexported fields
}

TestBase wraps the base setup needed to create workflows over persistence layer.

func (*TestBase) ClearReplicationQueue added in v0.3.11

func (s *TestBase) ClearReplicationQueue()

ClearReplicationQueue completes all tasks in replication queue

func (*TestBase) ClearTasks added in v0.3.11

func (s *TestBase) ClearTasks()

ClearTasks completes all transfer tasks and replication tasks

func (*TestBase) ClearTransferQueue

func (s *TestBase) ClearTransferQueue()

ClearTransferQueue completes all tasks in transfer queue

func (*TestBase) CompleteReplicationTask added in v0.3.11

func (s *TestBase) CompleteReplicationTask(taskID int64) error

CompleteReplicationTask is a utility method to complete a replication task

func (*TestBase) CompleteTask

func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error

CompleteTask is a utility method to complete a task

func (*TestBase) CompleteTimerTask

func (s *TestBase) CompleteTimerTask(ts time.Time, taskID int64) error

CompleteTimerTask is a utility method to complete a timer task

func (*TestBase) CompleteTransferTask

func (s *TestBase) CompleteTransferTask(taskID int64) error

CompleteTransferTask is a utility method to complete a transfer task

func (*TestBase) ContinueAsNewExecution

func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64,
	newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64) error

ContinueAsNewExecution is a utility method to create workflow executions

func (*TestBase) CreateActivityTasks

func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution,
	activities map[int64]string) ([]int64, error)

CreateActivityTasks is a utility method to create tasks

func (*TestBase) CreateChildWorkflowExecution

func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution,
	parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string,
	wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error)

CreateChildWorkflowExecution is a utility method to create child workflow executions

func (*TestBase) CreateDecisionTask

func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string,
	decisionScheduleID int64) (int64, error)

CreateDecisionTask is a utility method to create a task

func (*TestBase) CreateShard

func (s *TestBase) CreateShard(shardID int, owner string, rangeID int64) error

CreateShard is a utility method to create the shard using persistence layer

func (*TestBase) CreateWorkflowExecution

func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList,
	wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error)

CreateWorkflowExecution is a utility method to create workflow executions

func (*TestBase) CreateWorkflowExecutionManyTasks

func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution,
	taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleIDs []int64, activityScheduleIDs []int64) (*CreateWorkflowExecutionResponse, error)

CreateWorkflowExecutionManyTasks is a utility method to create workflow executions

func (*TestBase) CreateWorkflowExecutionWithReplication added in v0.3.11

func (s *TestBase) CreateWorkflowExecutionWithReplication(domainID string, workflowExecution workflow.WorkflowExecution,
	taskList, wType string, wTimeout int32, decisionTimeout int32, nextEventID int64,
	lastProcessedEventID int64, decisionScheduleID int64, state *ReplicationState, txTasks []Task) (
	*CreateWorkflowExecutionResponse, error)

CreateWorkflowExecutionWithReplication is a utility method to create workflow executions

func (*TestBase) DeleteCancelState

func (s *TestBase) DeleteCancelState(updatedInfo *WorkflowExecutionInfo, condition int64,
	deleteCancelInfo int64) error

DeleteCancelState is a utility method to delete request cancel state from mutable state

func (*TestBase) DeleteChildExecutionsState

func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64,
	deleteChildInfo int64) error

DeleteChildExecutionsState is a utility method to delete child execution from mutable state

func (*TestBase) DeleteSignalState added in v0.3.6

func (s *TestBase) DeleteSignalState(updatedInfo *WorkflowExecutionInfo, condition int64,
	deleteSignalInfo int64) error

DeleteSignalState is a utility method to delete request cancel state from mutable state

func (*TestBase) DeleteSignalsRequestedState added in v0.3.6

func (s *TestBase) DeleteSignalsRequestedState(updatedInfo *WorkflowExecutionInfo, condition int64,
	deleteSignalsRequestedID string) error

DeleteSignalsRequestedState is a utility method to delete mutable state of workflow execution

func (*TestBase) DeleteWorkflowExecution

func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error

DeleteWorkflowExecution is a utility method to delete a workflow execution

func (*TestBase) GetCurrentWorkflowRunID added in v0.3.5

func (s *TestBase) GetCurrentWorkflowRunID(domainID, workflowID string) (string, error)

GetCurrentWorkflowRunID returns the workflow run ID for the given params

func (*TestBase) GetNextSequenceNumber

func (s *TestBase) GetNextSequenceNumber() int64

GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId

func (*TestBase) GetReplicationReadLevel added in v0.3.11

func (s *TestBase) GetReplicationReadLevel() int64

GetReplicationReadLevel returns the current read level for shard

func (*TestBase) GetReplicationTasks added in v0.3.11

func (s *TestBase) GetReplicationTasks(batchSize int, getAll bool) ([]*ReplicationTaskInfo, error)

GetReplicationTasks is a utility method to get tasks from replication task queue

func (*TestBase) GetShard

func (s *TestBase) GetShard(shardID int) (*ShardInfo, error)

GetShard is a utility method to get the shard using persistence layer

func (*TestBase) GetTasks

func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)

GetTasks is a utility method to get tasks from persistence

func (*TestBase) GetTimerIndexTasks

func (s *TestBase) GetTimerIndexTasks(batchSize int, getAll bool) ([]*TimerTaskInfo, error)

GetTimerIndexTasks is a utility method to get tasks from transfer task queue

func (*TestBase) GetTransferReadLevel added in v0.3.11

func (s *TestBase) GetTransferReadLevel() int64

GetTransferReadLevel returns the current read level for shard

func (*TestBase) GetTransferTasks

func (s *TestBase) GetTransferTasks(batchSize int, getAll bool) ([]*TransferTaskInfo, error)

GetTransferTasks is a utility method to get tasks from transfer task queue

func (*TestBase) GetWorkflowExecutionInfo

func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) (
	*WorkflowMutableState, error)

GetWorkflowExecutionInfo is a utility method to retrieve execution info

func (*TestBase) ResetMutableState added in v0.3.12

func (s *TestBase) ResetMutableState(info *WorkflowExecutionInfo, replicationState *ReplicationState, nextEventID int64,
	activityInfos []*ActivityInfo, timerInfos []*TimerInfo, childExecutionInfos []*ChildExecutionInfo,
	requestCancelInfos []*RequestCancelInfo, signalInfos []*SignalInfo, ids []string) error

ResetMutableState is utility method to reset mutable state

func (*TestBase) SetupWorkflowStore

func (s *TestBase) SetupWorkflowStore()

SetupWorkflowStore to setup workflow test base

func (*TestBase) SetupWorkflowStoreWithOptions

func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions, metadata cluster.Metadata)

SetupWorkflowStoreWithOptions to setup workflow test base

func (*TestBase) TearDownWorkflowStore

func (s *TestBase) TearDownWorkflowStore()

TearDownWorkflowStore to cleanup

func (*TestBase) UpdateAllMutableState added in v0.3.12

func (s *TestBase) UpdateAllMutableState(updatedMutableState *WorkflowMutableState, condition int64) error

UpdateAllMutableState is a utility method to update workflow execution

func (*TestBase) UpdateShard

func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) error

UpdateShard is a utility method to update the shard using persistence layer

func (*TestBase) UpdateWorkflowExecution

func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
	activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task,
	upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64,
	upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error

UpdateWorkflowExecution is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionAndFinish added in v0.3.5

func (s *TestBase) UpdateWorkflowExecutionAndFinish(updatedInfo *WorkflowExecutionInfo, condition int64) error

UpdateWorkflowExecutionAndFinish is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated added in v0.3.2

func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated(
	updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, childInfos []*ChildExecutionInfo) error

UpdateWorkflowExecutionForChildExecutionsInitiated is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionForRequestCancel

func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(
	updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task,
	upsertRequestCancelInfo []*RequestCancelInfo) error

UpdateWorkflowExecutionForRequestCancel is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionForSignal added in v0.3.6

func (s *TestBase) UpdateWorkflowExecutionForSignal(
	updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task,
	upsertSignalInfos []*SignalInfo) error

UpdateWorkflowExecutionForSignal is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionWithRangeID

func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
	activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task,
	upsertActivityInfos []*ActivityInfo, deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo,
	deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64,
	upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64,
	upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64,
	upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error

UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionWithReplication added in v0.3.11

func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowExecutionInfo,
	updatedReplicationState *ReplicationState, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID,
	condition int64, timerTasks []Task, txTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo,
	deleteActivityInfos []int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string,
	upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo,
	deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string,
	deleteSignalRequestedID string, newBufferedReplicationTask *BufferedReplicationTask,
	deleteBufferedReplicationTask *int64) error

UpdateWorkflowExecutionWithReplication is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionWithTransferTasks

func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
	updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertActivityInfo []*ActivityInfo) error

UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution

func (*TestBase) UpdateWorklowStateAndReplication added in v0.3.11

func (s *TestBase) UpdateWorklowStateAndReplication(updatedInfo *WorkflowExecutionInfo,
	updatedReplicationState *ReplicationState, newBufferedReplicationTask *BufferedReplicationTask,
	deleteBufferedReplicationTask *int64, condition int64, txTasks []Task) error

UpdateWorklowStateAndReplication is a utility method to update workflow execution

func (*TestBase) UpsertChildExecutionsState

func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64,
	upsertChildInfos []*ChildExecutionInfo) error

UpsertChildExecutionsState is a utility method to update mutable state of workflow execution

func (*TestBase) UpsertRequestCancelState

func (s *TestBase) UpsertRequestCancelState(updatedInfo *WorkflowExecutionInfo, condition int64,
	upsertCancelInfos []*RequestCancelInfo) error

UpsertRequestCancelState is a utility method to update mutable state of workflow execution

func (*TestBase) UpsertSignalInfoState added in v0.3.6

func (s *TestBase) UpsertSignalInfoState(updatedInfo *WorkflowExecutionInfo, condition int64,
	upsertSignalInfos []*SignalInfo) error

UpsertSignalInfoState is a utility method to update mutable state of workflow execution

func (*TestBase) UpsertSignalsRequestedState added in v0.3.6

func (s *TestBase) UpsertSignalsRequestedState(updatedInfo *WorkflowExecutionInfo, condition int64,
	upsertSignalsRequested []string) error

UpsertSignalsRequestedState is a utility method to update mutable state of workflow execution

type TestBaseOptions

type TestBaseOptions struct {
	ClusterHost     string
	ClusterPort     int
	ClusterUser     string
	ClusterPassword string
	KeySpace        string
	Datacenter      string
	DropKeySpace    bool
	SchemaDir       string
	// TODO this is used for global domain test
	// when crtoss DC is public, remove EnableGlobalDomain
	EnableGlobalDomain bool
	IsMasterCluster    bool
}

TestBaseOptions options to configure workflow test base.

type TimeoutError

type TimeoutError struct {
	Msg string
}

TimeoutError is returned when a write operation fails due to a timeout

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

type TimerInfo

type TimerInfo struct {
	Version    int64
	TimerID    string
	StartedID  int64
	ExpiryTime time.Time
	TaskID     int64
}

TimerInfo details - metadata about user timer info.

type TimerTaskInfo

type TimerTaskInfo struct {
	DomainID            string
	WorkflowID          string
	RunID               string
	VisibilityTimestamp time.Time
	TaskID              int64
	TaskType            int
	TimeoutType         int
	EventID             int64
	ScheduleAttempt     int64
	Version             int64
}

TimerTaskInfo describes a timer task.

func (*TimerTaskInfo) GetTaskID added in v0.3.12

func (t *TimerTaskInfo) GetTaskID() int64

GetTaskID returns the task ID for timer task

func (*TimerTaskInfo) GetTaskType added in v0.3.12

func (t *TimerTaskInfo) GetTaskType() int

GetTaskType returns the task type for timer task

func (*TimerTaskInfo) GetVersion added in v0.3.12

func (t *TimerTaskInfo) GetVersion() int64

GetVersion returns the task version for timer task

type TransferTaskIDGenerator added in v0.3.0

type TransferTaskIDGenerator interface {
	GetNextTransferTaskID() (int64, error)
}

TransferTaskIDGenerator generates IDs for transfer tasks written by helper methods

type TransferTaskInfo

type TransferTaskInfo struct {
	DomainID                string
	WorkflowID              string
	RunID                   string
	TaskID                  int64
	TargetDomainID          string
	TargetWorkflowID        string
	TargetRunID             string
	TargetChildWorkflowOnly bool
	TaskList                string
	TaskType                int
	ScheduleID              int64
	Version                 int64
}

TransferTaskInfo describes a transfer task

func (*TransferTaskInfo) GetTaskID added in v0.3.11

func (t *TransferTaskInfo) GetTaskID() int64

GetTaskID returns the task ID for transfer task

func (*TransferTaskInfo) GetTaskType added in v0.3.11

func (t *TransferTaskInfo) GetTaskType() int

GetTaskType returns the task type for transfer task

func (*TransferTaskInfo) GetVersion added in v0.3.12

func (t *TransferTaskInfo) GetVersion() int64

GetVersion returns the task version for transfer task

type UnknownEncodingTypeError

type UnknownEncodingTypeError struct {
	// contains filtered or unexported fields
}

UnknownEncodingTypeError is an error type that's returned when the encoding type provided as input is unknown or unsupported

func (*UnknownEncodingTypeError) Error

func (e *UnknownEncodingTypeError) Error() string

type UpdateDomainRequest

type UpdateDomainRequest struct {
	Info                        *DomainInfo
	Config                      *DomainConfig
	ReplicationConfig           *DomainReplicationConfig
	ConfigVersion               int64
	FailoverVersion             int64
	FailoverNotificationVersion int64
	NotificationVersion         int64
	TableVersion                int
}

UpdateDomainRequest is used to update domain

type UpdateShardRequest

type UpdateShardRequest struct {
	ShardInfo       *ShardInfo
	PreviousRangeID int64
}

UpdateShardRequest is used to update shard information

type UpdateTaskListRequest

type UpdateTaskListRequest struct {
	TaskListInfo *TaskListInfo
}

UpdateTaskListRequest is used to update task list implementation information

type UpdateTaskListResponse

type UpdateTaskListResponse struct {
}

UpdateTaskListResponse is the response to UpdateTaskList

type UpdateWorkflowExecutionRequest

type UpdateWorkflowExecutionRequest struct {
	ExecutionInfo        *WorkflowExecutionInfo
	ReplicationState     *ReplicationState
	TransferTasks        []Task
	TimerTasks           []Task
	ReplicationTasks     []Task
	DeleteTimerTask      Task
	Condition            int64
	RangeID              int64
	ContinueAsNew        *CreateWorkflowExecutionRequest
	FinishExecution      bool
	FinishedExecutionTTL int32

	// Mutable state
	UpsertActivityInfos           []*ActivityInfo
	DeleteActivityInfos           []int64
	UpserTimerInfos               []*TimerInfo
	DeleteTimerInfos              []string
	UpsertChildExecutionInfos     []*ChildExecutionInfo
	DeleteChildExecutionInfo      *int64
	UpsertRequestCancelInfos      []*RequestCancelInfo
	DeleteRequestCancelInfo       *int64
	UpsertSignalInfos             []*SignalInfo
	DeleteSignalInfo              *int64
	UpsertSignalRequestedIDs      []string
	DeleteSignalRequestedID       string
	NewBufferedEvents             *SerializedHistoryEventBatch
	ClearBufferedEvents           bool
	NewBufferedReplicationTask    *BufferedReplicationTask
	DeleteBufferedReplicationTask *int64
}

UpdateWorkflowExecutionRequest is used to update a workflow execution

type UserTimerTask

type UserTimerTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	EventID             int64
	Version             int64
}

UserTimerTask identifies a timeout task.

func (*UserTimerTask) GetTaskID

func (u *UserTimerTask) GetTaskID() int64

GetTaskID returns the sequence ID of the timer task.

func (*UserTimerTask) GetType

func (u *UserTimerTask) GetType() int

GetType returns the type of the timer task

func (*UserTimerTask) GetVersion added in v0.3.12

func (u *UserTimerTask) GetVersion() int64

GetVersion returns the version of the timer task

func (*UserTimerTask) GetVisibilityTimestamp

func (u *UserTimerTask) GetVisibilityTimestamp() time.Time

GetVisibilityTimestamp gets the visibility time stamp

func (*UserTimerTask) SetTaskID

func (u *UserTimerTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the timer task.

func (*UserTimerTask) SetVersion added in v0.3.12

func (u *UserTimerTask) SetVersion(version int64)

SetVersion returns the version of the timer task

func (*UserTimerTask) SetVisibilityTimestamp

func (u *UserTimerTask) SetVisibilityTimestamp(t time.Time)

SetVisibilityTimestamp gets the visibility time stamp

type VisibilityManager

type VisibilityManager interface {
	Closeable
	RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
	RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error
	ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
	ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
	ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
	GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error)
}

VisibilityManager is used to manage the visibility store

func NewCassandraVisibilityPersistence

func NewCassandraVisibilityPersistence(
	hosts string, port int, user, password, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error)

NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation

func NewVisibilityPersistenceClient added in v0.3.5

func NewVisibilityPersistenceClient(persistence VisibilityManager, metricClient metrics.Client, logger bark.Logger) VisibilityManager

NewVisibilityPersistenceClient creates a client to manage visibility

type WorkflowExecutionAlreadyStartedError added in v0.3.5

type WorkflowExecutionAlreadyStartedError struct {
	Msg            string
	StartRequestID string
	RunID          string
	State          int
	CloseStatus    int
}

WorkflowExecutionAlreadyStartedError is returned when creating a new workflow failed.

func (*WorkflowExecutionAlreadyStartedError) Error added in v0.3.5

type WorkflowExecutionInfo

type WorkflowExecutionInfo struct {
	DomainID                     string
	WorkflowID                   string
	RunID                        string
	ParentDomainID               string
	ParentWorkflowID             string
	ParentRunID                  string
	InitiatedID                  int64
	CompletionEvent              []byte
	TaskList                     string
	WorkflowTypeName             string
	WorkflowTimeout              int32
	DecisionTimeoutValue         int32
	ExecutionContext             []byte
	State                        int
	CloseStatus                  int
	LastFirstEventID             int64
	NextEventID                  int64
	LastProcessedEvent           int64
	StartTimestamp               time.Time
	LastUpdatedTimestamp         time.Time
	CreateRequestID              string
	DecisionVersion              int64
	DecisionScheduleID           int64
	DecisionStartedID            int64
	DecisionRequestID            string
	DecisionTimeout              int32
	DecisionAttempt              int64
	DecisionTimestamp            int64
	CancelRequested              bool
	CancelRequestID              string
	StickyTaskList               string
	StickyScheduleToStartTimeout int32
	ClientLibraryVersion         string
	ClientFeatureVersion         string
	ClientImpl                   string
}

WorkflowExecutionInfo describes a workflow execution

type WorkflowMutableState

type WorkflowMutableState struct {
	ActivitInfos             map[int64]*ActivityInfo
	TimerInfos               map[string]*TimerInfo
	ChildExecutionInfos      map[int64]*ChildExecutionInfo
	RequestCancelInfos       map[int64]*RequestCancelInfo
	SignalInfos              map[int64]*SignalInfo
	SignalRequestedIDs       map[string]struct{}
	ExecutionInfo            *WorkflowExecutionInfo
	ReplicationState         *ReplicationState
	BufferedEvents           []*SerializedHistoryEventBatch
	BufferedReplicationTasks map[int64]*BufferedReplicationTask
}

WorkflowMutableState indicates workflow related state

type WorkflowTimeoutTask

type WorkflowTimeoutTask struct {
	VisibilityTimestamp time.Time
	TaskID              int64
	Version             int64
}

WorkflowTimeoutTask identifies a timeout task.

func (*WorkflowTimeoutTask) GetTaskID

func (u *WorkflowTimeoutTask) GetTaskID() int64

GetTaskID returns the sequence ID of the cancel transfer task.

func (*WorkflowTimeoutTask) GetType

func (u *WorkflowTimeoutTask) GetType() int

GetType returns the type of the timeout task.

func (*WorkflowTimeoutTask) GetVersion added in v0.3.12

func (u *WorkflowTimeoutTask) GetVersion() int64

GetVersion returns the version of the timeout task

func (*WorkflowTimeoutTask) GetVisibilityTimestamp

func (u *WorkflowTimeoutTask) GetVisibilityTimestamp() time.Time

GetVisibilityTimestamp gets the visibility time stamp

func (*WorkflowTimeoutTask) SetTaskID

func (u *WorkflowTimeoutTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the cancel transfer task.

func (*WorkflowTimeoutTask) SetVersion added in v0.3.12

func (u *WorkflowTimeoutTask) SetVersion(version int64)

SetVersion returns the version of the timeout task

func (*WorkflowTimeoutTask) SetVisibilityTimestamp

func (u *WorkflowTimeoutTask) SetVisibilityTimestamp(t time.Time)

SetVisibilityTimestamp gets the visibility time stamp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL