persistence

package
v0.1.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2017 License: MIT Imports: 16 Imported by: 35

Documentation

Index

Constants

View Source
const (
	DomainStatusRegistered = iota
	DomainStatusDeprecated
	DomainStatusDeleted
)

Domain status

View Source
const (
	WorkflowStateCreated = iota
	WorkflowStateRunning
	WorkflowStateCompleted
)

Workflow execution states

View Source
const (
	WorkflowCloseStatusNone = iota
	WorkflowCloseStatusCompleted
	WorkflowCloseStatusFailed
	WorkflowCloseStatusCanceled
	WorkflowCloseStatusTerminated
	WorkflowCloseStatusContinuedAsNew
	WorkflowCloseStatusTimedOut
)

Workflow execution close status

View Source
const (
	TaskListTypeDecision = iota
	TaskListTypeActivity
)

Types of task lists

View Source
const (
	TransferTaskTypeDecisionTask = iota
	TransferTaskTypeActivityTask
	TransferTaskTypeDeleteExecution
	TransferTaskTypeCancelExecution
	TransferTaskTypeStartChildExecution
)

Transfer task types

View Source
const (
	TaskTypeDecisionTimeout = iota
	TaskTypeActivityTimeout
	TaskTypeUserTimer
)

Types of timers

View Source
const (
	// DefaultEncodingType is the default encoding format for persisted history
	DefaultEncodingType = common.EncodingTypeJSON
)

Variables

This section is empty.

Functions

func GetDefaultHistoryVersion

func GetDefaultHistoryVersion() int

GetDefaultHistoryVersion returns the default history version

func GetMaxSupportedHistoryVersion

func GetMaxSupportedHistoryVersion() int

GetMaxSupportedHistoryVersion returns the max supported version

func NewHistoryVersionCompatibilityError

func NewHistoryVersionCompatibilityError(required int, supported int) error

NewHistoryVersionCompatibilityError returns a new instance of compatibility error type

func NewUnknownEncodingTypeError

func NewUnknownEncodingTypeError(encodingType common.EncodingType) error

NewUnknownEncodingTypeError returns a new instance of encoding type error

func SetDefaultHistoryVersion

func SetDefaultHistoryVersion(version int)

SetDefaultHistoryVersion resets the default history version only intended for integration test

func SetMaxSupportedHistoryVersion

func SetMaxSupportedHistoryVersion(version int)

SetMaxSupportedHistoryVersion resets the max supported history version this method is only intended for integration test

Types

type ActivityInfo

type ActivityInfo struct {
	ScheduleID               int64
	ScheduledEvent           []byte
	StartedID                int64
	StartedEvent             []byte
	ActivityID               string
	RequestID                string
	Details                  []byte
	ScheduleToStartTimeout   int32
	ScheduleToCloseTimeout   int32
	StartToCloseTimeout      int32
	HeartbeatTimeout         int32
	CancelRequested          bool
	CancelRequestID          int64
	LastHeartBeatUpdatedTime time.Time
}

ActivityInfo details.

type ActivityTask

type ActivityTask struct {
	TaskID     int64
	DomainID   string
	TaskList   string
	ScheduleID int64
}

ActivityTask identifies a transfer task for activity

func (*ActivityTask) GetTaskID

func (a *ActivityTask) GetTaskID() int64

GetTaskID returns the sequence ID of the activity task

func (*ActivityTask) GetType

func (a *ActivityTask) GetType() int

GetType returns the type of the activity task

func (*ActivityTask) SetTaskID

func (a *ActivityTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the activity task

type ActivityTimeoutTask

type ActivityTimeoutTask struct {
	TaskID      int64
	TimeoutType int
	EventID     int64
}

ActivityTimeoutTask identifies a timeout task.

func (*ActivityTimeoutTask) GetTaskID

func (a *ActivityTimeoutTask) GetTaskID() int64

GetTaskID returns the sequence ID.

func (*ActivityTimeoutTask) GetType

func (a *ActivityTimeoutTask) GetType() int

GetType returns the type of the timer task

func (*ActivityTimeoutTask) SetTaskID

func (a *ActivityTimeoutTask) SetTaskID(id int64)

SetTaskID sets the sequence ID.

type AppendHistoryEventsRequest

type AppendHistoryEventsRequest struct {
	DomainID      string
	Execution     workflow.WorkflowExecution
	FirstEventID  int64
	RangeID       int64
	TransactionID int64
	Events        *SerializedHistoryEventBatch
	Overwrite     bool
}

AppendHistoryEventsRequest is used to append new events to workflow execution history

type CancelExecutionTask

type CancelExecutionTask struct {
	TaskID           int64
	TargetDomainID   string
	TargetWorkflowID string
	TargetRunID      string
	ScheduleID       int64
}

CancelExecutionTask identifies a transfer task for cancel of execution

func (*CancelExecutionTask) GetTaskID

func (u *CancelExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the cancel transfer task.

func (*CancelExecutionTask) GetType

func (u *CancelExecutionTask) GetType() int

GetType returns the type of the cancel transfer task

func (*CancelExecutionTask) SetTaskID

func (u *CancelExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the cancel transfer task.

type CassandraTestCluster

type CassandraTestCluster struct {
	// contains filtered or unexported fields
}

CassandraTestCluster allows executing cassandra operations in testing.

type ChildExecutionInfo

type ChildExecutionInfo struct {
	InitiatedID     int64
	InitiatedEvent  []byte
	StartedID       int64
	StartedEvent    []byte
	CreateRequestID string
}

ChildExecutionInfo has details for pending child executions.

type CompleteTaskRequest

type CompleteTaskRequest struct {
	TaskList *TaskListInfo
	TaskID   int64
}

CompleteTaskRequest is used to complete a task

type CompleteTimerTaskRequest

type CompleteTimerTaskRequest struct {
	TaskID int64
}

CompleteTimerTaskRequest is used to complete a task in the timer task queue

type CompleteTransferTaskRequest

type CompleteTransferTaskRequest struct {
	TaskID int64
}

CompleteTransferTaskRequest is used to complete a task in the transfer task queue

type ConditionFailedError

type ConditionFailedError struct {
	Msg string
}

ConditionFailedError represents a failed conditional put

func (*ConditionFailedError) Error

func (e *ConditionFailedError) Error() string

type CreateDomainRequest

type CreateDomainRequest struct {
	Name        string
	Status      int
	Description string
	OwnerEmail  string
	Retention   int32
	EmitMetric  bool
}

CreateDomainRequest is used to create the domain

type CreateDomainResponse

type CreateDomainResponse struct {
	ID string
}

CreateDomainResponse is the response for CreateDomain

type CreateShardRequest

type CreateShardRequest struct {
	ShardInfo *ShardInfo
}

CreateShardRequest is used to create a shard in executions table

type CreateTaskInfo

type CreateTaskInfo struct {
	Execution workflow.WorkflowExecution
	Data      *TaskInfo
	TaskID    int64
}

CreateTaskInfo describes a task to be created in CreateTasksRequest

type CreateTasksRequest

type CreateTasksRequest struct {
	DomainID     string
	TaskList     string
	TaskListType int
	RangeID      int64
	Tasks        []*CreateTaskInfo
}

CreateTasksRequest is used to create a new task for a workflow exectution

type CreateTasksResponse

type CreateTasksResponse struct {
}

CreateTasksResponse is the response to CreateTasksRequest

type CreateWorkflowExecutionRequest

type CreateWorkflowExecutionRequest struct {
	RequestID                   string
	DomainID                    string
	Execution                   workflow.WorkflowExecution
	ParentDomainID              string
	ParentExecution             *workflow.WorkflowExecution
	InitiatedID                 int64
	TaskList                    string
	WorkflowTypeName            string
	DecisionTimeoutValue        int32
	ExecutionContext            []byte
	NextEventID                 int64
	LastProcessedEvent          int64
	TransferTasks               []Task
	TimerTasks                  []Task
	RangeID                     int64
	DecisionScheduleID          int64
	DecisionStartedID           int64
	DecisionStartToCloseTimeout int32
	ContinueAsNew               bool
}

CreateWorkflowExecutionRequest is used to write a new workflow execution

type CreateWorkflowExecutionResponse

type CreateWorkflowExecutionResponse struct {
	TaskID string
}

CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest

type DecisionTask

type DecisionTask struct {
	TaskID     int64
	DomainID   string
	TaskList   string
	ScheduleID int64
}

DecisionTask identifies a transfer task for decision

func (*DecisionTask) GetTaskID

func (d *DecisionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the decision task.

func (*DecisionTask) GetType

func (d *DecisionTask) GetType() int

GetType returns the type of the decision task

func (*DecisionTask) SetTaskID

func (d *DecisionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the decision task

type DecisionTimeoutTask

type DecisionTimeoutTask struct {
	TaskID  int64
	EventID int64
}

DecisionTimeoutTask identifies a timeout task.

func (*DecisionTimeoutTask) GetTaskID

func (d *DecisionTimeoutTask) GetTaskID() int64

GetTaskID returns the sequence ID.

func (*DecisionTimeoutTask) GetType

func (d *DecisionTimeoutTask) GetType() int

GetType returns the type of the timer task

func (*DecisionTimeoutTask) SetTaskID

func (d *DecisionTimeoutTask) SetTaskID(id int64)

SetTaskID sets the sequence ID.

type DeleteDomainByNameRequest

type DeleteDomainByNameRequest struct {
	Name string
}

DeleteDomainByNameRequest is used to delete domain entry from domains_by_name table

type DeleteDomainRequest

type DeleteDomainRequest struct {
	ID string
}

DeleteDomainRequest is used to delete domain entry from domains table

type DeleteExecutionTask

type DeleteExecutionTask struct {
	TaskID int64
}

DeleteExecutionTask identifies a transfer task for deletion of execution

func (*DeleteExecutionTask) GetTaskID

func (a *DeleteExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the delete execution task

func (*DeleteExecutionTask) GetType

func (a *DeleteExecutionTask) GetType() int

GetType returns the type of the delete execution task

func (*DeleteExecutionTask) SetTaskID

func (a *DeleteExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the delete execution task

type DeleteWorkflowExecutionHistoryRequest

type DeleteWorkflowExecutionHistoryRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
}

DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history

type DeleteWorkflowExecutionRequest

type DeleteWorkflowExecutionRequest struct {
	ExecutionInfo *WorkflowExecutionInfo
}

DeleteWorkflowExecutionRequest is used to delete a workflow execution

type DomainConfig

type DomainConfig struct {
	Retention  int32
	EmitMetric bool
}

DomainConfig describes the domain configuration

type DomainInfo

type DomainInfo struct {
	ID          string
	Name        string
	Status      int
	Description string
	OwnerEmail  string
}

DomainInfo describes the domain entity

type ExecutionManager

type ExecutionManager interface {
	CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
	GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error)
	UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error
	DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error
	GetCurrentExecution(request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
	GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
	CompleteTransferTask(request *CompleteTransferTaskRequest) error

	// Timer related methods.
	GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
	CompleteTimerTask(request *CompleteTimerTaskRequest) error
}

ExecutionManager is used to manage workflow executions

func NewCassandraWorkflowExecutionPersistence

func NewCassandraWorkflowExecutionPersistence(hosts string, dc string, keyspace string, shardID int, logger bark.Logger) (ExecutionManager, error)

NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation

func NewWorkflowExecutionPersistenceClient

func NewWorkflowExecutionPersistenceClient(persistence ExecutionManager, metricClient metrics.Client) ExecutionManager

NewWorkflowExecutionPersistenceClient creates a client to manage executions

type ExecutionManagerFactory

type ExecutionManagerFactory interface {
	CreateExecutionManager(shardID int) (ExecutionManager, error)
}

ExecutionManagerFactory creates an instance of ExecutionManager for a given shard

type GetCurrentExecutionRequest

type GetCurrentExecutionRequest struct {
	DomainID   string
	WorkflowID string
}

GetCurrentExecutionRequest is used to retrieve the current RunId for an execution

type GetCurrentExecutionResponse

type GetCurrentExecutionResponse struct {
	RunID string
}

GetCurrentExecutionResponse is the response to GetCurrentExecution

type GetDomainRequest

type GetDomainRequest struct {
	ID   string
	Name string
}

GetDomainRequest is used to read domain

type GetDomainResponse

type GetDomainResponse struct {
	Info   *DomainInfo
	Config *DomainConfig
}

GetDomainResponse is the response for GetDomain

type GetShardRequest

type GetShardRequest struct {
	ShardID int
}

GetShardRequest is used to get shard information

type GetShardResponse

type GetShardResponse struct {
	ShardInfo *ShardInfo
}

GetShardResponse is the response to GetShard

type GetTasksRequest

type GetTasksRequest struct {
	DomainID     string
	TaskList     string
	TaskType     int
	ReadLevel    int64
	MaxReadLevel int64 // inclusive
	BatchSize    int
	RangeID      int64
}

GetTasksRequest is used to retrieve tasks of a task list

type GetTasksResponse

type GetTasksResponse struct {
	Tasks []*TaskInfo
}

GetTasksResponse is the response to GetTasksRequests

type GetTimerIndexTasksRequest

type GetTimerIndexTasksRequest struct {
	MinKey    int64
	MaxKey    int64
	BatchSize int
}

GetTimerIndexTasksRequest is the request for GetTimerIndexTasks TODO: replace this with an iterator that can configure min and max index.

type GetTimerIndexTasksResponse

type GetTimerIndexTasksResponse struct {
	Timers []*TimerTaskInfo
}

GetTimerIndexTasksResponse is the response for GetTimerIndexTasks

type GetTransferTasksRequest

type GetTransferTasksRequest struct {
	ReadLevel    int64
	MaxReadLevel int64
	BatchSize    int
}

GetTransferTasksRequest is used to read tasks from the transfer task queue

type GetTransferTasksResponse

type GetTransferTasksResponse struct {
	Tasks []*TransferTaskInfo
}

GetTransferTasksResponse is the response to GetTransferTasksRequest

type GetWorkflowExecutionHistoryRequest

type GetWorkflowExecutionHistoryRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
	// Get the history events upto NextEventID.  Not Inclusive.
	NextEventID int64
	// Maximum number of history append transactions per page
	PageSize int
	// Token to continue reading next page of history append transactions.  Pass in empty slice for first page
	NextPageToken []byte
}

GetWorkflowExecutionHistoryRequest is used to retrieve history of a workflow execution

type GetWorkflowExecutionHistoryResponse

type GetWorkflowExecutionHistoryResponse struct {
	// Slice of history append transaction batches
	Events []SerializedHistoryEventBatch
	// Token to read next page if there are more events beyond page size.
	// Use this to set NextPageToken on GetworkflowExecutionHistoryRequest to read the next page.
	NextPageToken []byte
}

GetWorkflowExecutionHistoryResponse is the response to GetWorkflowExecutionHistoryRequest

type GetWorkflowExecutionRequest

type GetWorkflowExecutionRequest struct {
	DomainID  string
	Execution workflow.WorkflowExecution
}

GetWorkflowExecutionRequest is used to retrieve the info of a workflow execution

type GetWorkflowExecutionResponse

type GetWorkflowExecutionResponse struct {
	State *WorkflowMutableState
}

GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest

type HistoryDeserializationError

type HistoryDeserializationError struct {
	// contains filtered or unexported fields
}

HistoryDeserializationError is an error type that's returned on a history deserialization failure

func (*HistoryDeserializationError) Error

type HistoryEventBatch

type HistoryEventBatch struct {
	Version int
	Events  []*workflow.HistoryEvent
}

HistoryEventBatch represents a batch of history events

func NewHistoryEventBatch

func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch

NewHistoryEventBatch returns a new instance of HistoryEventBatch

func (*HistoryEventBatch) String

func (b *HistoryEventBatch) String() string

type HistoryManager

type HistoryManager interface {
	AppendHistoryEvents(request *AppendHistoryEventsRequest) error
	// GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution
	GetWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest) (*GetWorkflowExecutionHistoryResponse,
		error)
	DeleteWorkflowExecutionHistory(request *DeleteWorkflowExecutionHistoryRequest) error
}

HistoryManager is used to manage Workflow Execution HistoryEventBatch

func NewCassandraHistoryPersistence

func NewCassandraHistoryPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (HistoryManager,
	error)

NewCassandraHistoryPersistence is used to create an instance of HistoryManager implementation

func NewHistoryPersistenceClient

func NewHistoryPersistenceClient(persistence HistoryManager, metricClient metrics.Client) HistoryManager

NewHistoryPersistenceClient creates a HistoryManager client to manage workflow execution history

type HistorySerializationError

type HistorySerializationError struct {
	// contains filtered or unexported fields
}

HistorySerializationError is an error type that's returned on a history serialization failure

func (*HistorySerializationError) Error

func (e *HistorySerializationError) Error() string

type HistorySerializer

type HistorySerializer interface {
	Serialize(batch *HistoryEventBatch) (*SerializedHistoryEventBatch, error)
	Deserialize(batch *SerializedHistoryEventBatch) (*HistoryEventBatch, error)
}

HistorySerializer is used to serialize/deserialize history

func NewJSONHistorySerializer

func NewJSONHistorySerializer() HistorySerializer

NewJSONHistorySerializer returns a JSON HistorySerializer

type HistorySerializerFactory

type HistorySerializerFactory interface {
	// Get returns a history serializer corresponding
	// to a given encoding type
	Get(encodingType common.EncodingType) (HistorySerializer, error)
}

HistorySerializerFactory is a factory that vends HistorySerializers based on encoding type.

func NewHistorySerializerFactory

func NewHistorySerializerFactory() HistorySerializerFactory

NewHistorySerializerFactory creates and returns an instance of HistorySerializerFactory

type HistoryVersionCompatibilityError

type HistoryVersionCompatibilityError struct {
	// contains filtered or unexported fields
}

HistoryVersionCompatibilityError is an error type that's returned when history serialization or deserialization cannot proceed due to version incompatibility

func (*HistoryVersionCompatibilityError) Error

type LeaseTaskListRequest

type LeaseTaskListRequest struct {
	DomainID string
	TaskList string
	TaskType int
}

LeaseTaskListRequest is used to request lease of a task list

type LeaseTaskListResponse

type LeaseTaskListResponse struct {
	TaskListInfo *TaskListInfo
}

LeaseTaskListResponse is response to LeaseTaskListRequest

type ListClosedWorkflowExecutionsByStatusRequest

type ListClosedWorkflowExecutionsByStatusRequest struct {
	ListWorkflowExecutionsRequest
	Status s.WorkflowExecutionCloseStatus
}

ListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status

type ListWorkflowExecutionsByTypeRequest

type ListWorkflowExecutionsByTypeRequest struct {
	ListWorkflowExecutionsRequest
	WorkflowTypeName string
}

ListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain

type ListWorkflowExecutionsByWorkflowIDRequest

type ListWorkflowExecutionsByWorkflowIDRequest struct {
	ListWorkflowExecutionsRequest
	WorkflowID string
}

ListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain

type ListWorkflowExecutionsRequest

type ListWorkflowExecutionsRequest struct {
	DomainUUID        string
	EarliestStartTime int64
	LatestStartTime   int64
	// Maximum number of workflow executions per page
	PageSize int
	// Token to continue reading next page of workflow executions.
	// Pass in empty slice for first page.
	NextPageToken []byte
}

ListWorkflowExecutionsRequest is used to list executions in a domain

type ListWorkflowExecutionsResponse

type ListWorkflowExecutionsResponse struct {
	Executions []*s.WorkflowExecutionInfo
	// Token to read next page if there are more workflow executions beyond page size.
	// Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page.
	NextPageToken []byte
}

ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest

type MetadataManager

type MetadataManager interface {
	CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error)
	GetDomain(request *GetDomainRequest) (*GetDomainResponse, error)
	UpdateDomain(request *UpdateDomainRequest) error
	DeleteDomain(request *DeleteDomainRequest) error
	DeleteDomainByName(request *DeleteDomainByNameRequest) error
}

MetadataManager is used to manage metadata CRUD for various entities

func NewCassandraMetadataPersistence

func NewCassandraMetadataPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (MetadataManager,
	error)

NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation

func NewMetadataPersistenceClient

func NewMetadataPersistenceClient(persistence MetadataManager, metricClient metrics.Client) MetadataManager

NewMetadataPersistenceClient creates a HistoryManager client to manage workflow execution history

type RecordWorkflowExecutionClosedRequest

type RecordWorkflowExecutionClosedRequest struct {
	DomainUUID       string
	Execution        s.WorkflowExecution
	WorkflowTypeName string
	StartTimestamp   int64
	CloseTimestamp   int64
	Status           s.WorkflowExecutionCloseStatus
	RetentionSeconds int64
}

RecordWorkflowExecutionClosedRequest is used to add a record of a newly closed execution

type RecordWorkflowExecutionStartedRequest

type RecordWorkflowExecutionStartedRequest struct {
	DomainUUID       string
	Execution        s.WorkflowExecution
	WorkflowTypeName string
	StartTimestamp   int64
}

RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution

type SerializedHistoryEventBatch

type SerializedHistoryEventBatch struct {
	EncodingType common.EncodingType
	Version      int
	Data         []byte
}

SerializedHistoryEventBatch represents a serialized batch of history events

func NewSerializedHistoryEventBatch

func NewSerializedHistoryEventBatch(data []byte, encoding common.EncodingType, version int) *SerializedHistoryEventBatch

NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch

func (*SerializedHistoryEventBatch) String

func (h *SerializedHistoryEventBatch) String() string

type ShardAlreadyExistError

type ShardAlreadyExistError struct {
	Msg string
}

ShardAlreadyExistError is returned when conditionally creating a shard fails

func (*ShardAlreadyExistError) Error

func (e *ShardAlreadyExistError) Error() string

type ShardInfo

type ShardInfo struct {
	ShardID          int
	Owner            string
	RangeID          int64
	StolenSinceRenew int
	UpdatedAt        time.Time
	TransferAckLevel int64
}

ShardInfo describes a shard

type ShardManager

type ShardManager interface {
	CreateShard(request *CreateShardRequest) error
	GetShard(request *GetShardRequest) (*GetShardResponse, error)
	UpdateShard(request *UpdateShardRequest) error
}

ShardManager is used to manage all shards

func NewCassandraShardPersistence

func NewCassandraShardPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (ShardManager, error)

NewCassandraShardPersistence is used to create an instance of ShardManager implementation

func NewShardPersistenceClient

func NewShardPersistenceClient(persistence ShardManager, metricClient metrics.Client) ShardManager

NewShardPersistenceClient creates a client to manage shards

type ShardOwnershipLostError

type ShardOwnershipLostError struct {
	ShardID int
	Msg     string
}

ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard

func (*ShardOwnershipLostError) Error

func (e *ShardOwnershipLostError) Error() string

type StartChildExecutionTask

type StartChildExecutionTask struct {
	TaskID           int64
	TargetDomainID   string
	TargetWorkflowID string
	InitiatedID      int64
}

StartChildExecutionTask identifies a transfer task for starting child execution

func (*StartChildExecutionTask) GetTaskID

func (u *StartChildExecutionTask) GetTaskID() int64

GetTaskID returns the sequence ID of the cancel transfer task.

func (*StartChildExecutionTask) GetType

func (u *StartChildExecutionTask) GetType() int

GetType returns the type of the cancel transfer task

func (*StartChildExecutionTask) SetTaskID

func (u *StartChildExecutionTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the cancel transfer task.

type Task

type Task interface {
	GetType() int
	GetTaskID() int64
	SetTaskID(id int64)
}

Task is the generic interface for workflow tasks

type TaskInfo

type TaskInfo struct {
	DomainID               string
	WorkflowID             string
	RunID                  string
	TaskID                 int64
	ScheduleID             int64
	ScheduleToStartTimeout int32
}

TaskInfo describes either activity or decision task

type TaskListInfo

type TaskListInfo struct {
	DomainID string
	Name     string
	TaskType int
	RangeID  int64
	AckLevel int64
}

TaskListInfo describes a state of a task list implementation.

type TaskManager

type TaskManager interface {
	LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
	UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
	CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error)
	GetTasks(request *GetTasksRequest) (*GetTasksResponse, error)
	CompleteTask(request *CompleteTaskRequest) error
}

TaskManager is used to manage tasks

func NewCassandraTaskPersistence

func NewCassandraTaskPersistence(hosts string, dc string, keyspace string, logger bark.Logger) (TaskManager, error)

NewCassandraTaskPersistence is used to create an instance of TaskManager implementation

func NewTaskPersistenceClient

func NewTaskPersistenceClient(persistence TaskManager, metricClient metrics.Client) TaskManager

NewTaskPersistenceClient creates a client to manage tasks

type TestBase

type TestBase struct {
	ShardMgr            ShardManager
	ExecutionMgrFactory ExecutionManagerFactory
	WorkflowMgr         ExecutionManager
	TaskMgr             TaskManager
	HistoryMgr          HistoryManager
	MetadataManager     MetadataManager
	VisibilityMgr       VisibilityManager
	ShardInfo           *ShardInfo
	ShardContext        *testShardContext

	CassandraTestCluster
	// contains filtered or unexported fields
}

TestBase wraps the base setup needed to create workflows over engine layer.

func (*TestBase) ClearTransferQueue

func (s *TestBase) ClearTransferQueue()

ClearTransferQueue completes all tasks in transfer queue

func (*TestBase) CompleteTask

func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID int64, ackLevel int64) error

CompleteTask is a utility method to complete a task

func (*TestBase) CompleteTransferTask

func (s *TestBase) CompleteTransferTask(taskID int64) error

CompleteTransferTask is a utility method to complete a transfer task

func (*TestBase) ContinueAsNewExecution

func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, condition int64,
	newExecution workflow.WorkflowExecution, nextEventID, decisionScheduleID int64) error

ContinueAsNewExecution is a utility method to create workflow executions

func (*TestBase) CreateActivityTasks

func (s *TestBase) CreateActivityTasks(domainID string, workflowExecution workflow.WorkflowExecution,
	activities map[int64]string) ([]int64, error)

CreateActivityTasks is a utility method to create tasks

func (*TestBase) CreateChildWorkflowExecution

func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution,
	parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string,
	decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleID int64, timerTasks []Task) (string, error)

CreateChildWorkflowExecution is a utility method to create child workflow executions

func (*TestBase) CreateDecisionTask

func (s *TestBase) CreateDecisionTask(domainID string, workflowExecution workflow.WorkflowExecution, taskList string,
	decisionScheduleID int64) (int64, error)

CreateDecisionTask is a utility method to create a task

func (*TestBase) CreateShard

func (s *TestBase) CreateShard(shardID int, owner string, rangeID int64) error

CreateShard is a utility method to create the shard using persistence layer

func (*TestBase) CreateWorkflowExecution

func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList,
	wType string, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleID int64, timerTasks []Task) (string, error)

CreateWorkflowExecution is a utility method to create workflow executions

func (*TestBase) CreateWorkflowExecutionManyTasks

func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution,
	taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
	decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error)

CreateWorkflowExecutionManyTasks is a utility method to create workflow executions

func (*TestBase) DeleteChildExecutionsState

func (s *TestBase) DeleteChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64,
	deleteChildInfo int64) error

DeleteChildExecutionsState is a utility method to delete child execution from mutable state

func (*TestBase) DeleteWorkflowExecution

func (s *TestBase) DeleteWorkflowExecution(info *WorkflowExecutionInfo) error

DeleteWorkflowExecution is a utility method to delete a workflow execution

func (*TestBase) GetCurrentWorkflow

func (s *TestBase) GetCurrentWorkflow(domainID, workflowID string) (string, error)

GetCurrentWorkflow returns the workflow state for the given params

func (*TestBase) GetNextSequenceNumber

func (s *TestBase) GetNextSequenceNumber() int64

GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId

func (*TestBase) GetReadLevel

func (s *TestBase) GetReadLevel() int64

GetReadLevel returns the current read level for shard

func (*TestBase) GetShard

func (s *TestBase) GetShard(shardID int) (*ShardInfo, error)

GetShard is a utility method to get the shard using persistence layer

func (*TestBase) GetTasks

func (s *TestBase) GetTasks(domainID, taskList string, taskType int, batchSize int) (*GetTasksResponse, error)

GetTasks is a utility method to get tasks from persistence

func (*TestBase) GetTimerIndexTasks

func (s *TestBase) GetTimerIndexTasks(minKey int64, maxKey int64) ([]*TimerTaskInfo, error)

GetTimerIndexTasks is a utility method to get tasks from transfer task queue

func (*TestBase) GetTransferTasks

func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)

GetTransferTasks is a utility method to get tasks from transfer task queue

func (*TestBase) GetWorkflowExecutionInfo

func (s *TestBase) GetWorkflowExecutionInfo(domainID string, workflowExecution workflow.WorkflowExecution) (
	*WorkflowMutableState, error)

GetWorkflowExecutionInfo is a utility method to retrieve execution info

func (*TestBase) SetupWorkflowStore

func (s *TestBase) SetupWorkflowStore()

SetupWorkflowStore to setup workflow test base

func (*TestBase) SetupWorkflowStoreWithOptions

func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions)

SetupWorkflowStoreWithOptions to setup workflow test base

func (*TestBase) TearDownWorkflowStore

func (s *TestBase) TearDownWorkflowStore()

TearDownWorkflowStore to cleanup

func (*TestBase) UpdateShard

func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) error

UpdateShard is a utility method to update the shard using persistence layer

func (*TestBase) UpdateWorkflowExecution

func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
	activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task,
	upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64,
	upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error

UpdateWorkflowExecution is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionAndDelete

func (s *TestBase) UpdateWorkflowExecutionAndDelete(updatedInfo *WorkflowExecutionInfo, condition int64) error

UpdateWorkflowExecutionAndDelete is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionWithRangeID

func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
	activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task,
	upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo,
	deleteTimerInfos []string, upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64) error

UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution

func (*TestBase) UpdateWorkflowExecutionWithTransferTasks

func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
	updatedInfo *WorkflowExecutionInfo, condition int64, transferTasks []Task, upsertActivityInfo []*ActivityInfo) error

UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution

func (*TestBase) UpsertChildExecutionsState

func (s *TestBase) UpsertChildExecutionsState(updatedInfo *WorkflowExecutionInfo, condition int64,
	upsertChildInfos []*ChildExecutionInfo) error

UpsertChildExecutionsState is a utility method to update mutable state of workflow execution

type TestBaseOptions

type TestBaseOptions struct {
	ClusterHost  string
	KeySpace     string
	Datacenter   string
	DropKeySpace bool
	SchemaDir    string
}

TestBaseOptions options to configure workflow test base.

type TimeoutError

type TimeoutError struct {
	Msg string
}

TimeoutError is returned when a write operation fails due to a timeout

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

type TimerInfo

type TimerInfo struct {
	TimerID    string
	StartedID  int64
	ExpiryTime time.Time
	TaskID     int64
}

TimerInfo details - metadata about user timer info.

type TimerTaskInfo

type TimerTaskInfo struct {
	DomainID    string
	WorkflowID  string
	RunID       string
	TaskID      int64
	TaskType    int
	TimeoutType int
	EventID     int64
}

TimerTaskInfo describes a timer task.

type TransferTaskInfo

type TransferTaskInfo struct {
	DomainID         string
	WorkflowID       string
	RunID            string
	TaskID           int64
	TargetDomainID   string
	TargetWorkflowID string
	TargetRunID      string
	TaskList         string
	TaskType         int
	ScheduleID       int64
}

TransferTaskInfo describes a transfer task

type UnknownEncodingTypeError

type UnknownEncodingTypeError struct {
	// contains filtered or unexported fields
}

UnknownEncodingTypeError is an error type that's returned when the encoding type provided as input is unknown or unsupported

func (*UnknownEncodingTypeError) Error

func (e *UnknownEncodingTypeError) Error() string

type UpdateDomainRequest

type UpdateDomainRequest struct {
	Info   *DomainInfo
	Config *DomainConfig
}

UpdateDomainRequest is used to update domain

type UpdateShardRequest

type UpdateShardRequest struct {
	ShardInfo       *ShardInfo
	PreviousRangeID int64
}

UpdateShardRequest is used to update shard information

type UpdateTaskListRequest

type UpdateTaskListRequest struct {
	TaskListInfo *TaskListInfo
}

UpdateTaskListRequest is used to update task list implementation information

type UpdateTaskListResponse

type UpdateTaskListResponse struct {
}

UpdateTaskListResponse is the response to UpdateTaskList

type UpdateWorkflowExecutionRequest

type UpdateWorkflowExecutionRequest struct {
	ExecutionInfo   *WorkflowExecutionInfo
	TransferTasks   []Task
	TimerTasks      []Task
	DeleteTimerTask Task
	Condition       int64
	RangeID         int64
	ContinueAsNew   *CreateWorkflowExecutionRequest
	CloseExecution  bool

	// Mutable state
	UpsertActivityInfos       []*ActivityInfo
	DeleteActivityInfo        *int64
	UpserTimerInfos           []*TimerInfo
	DeleteTimerInfos          []string
	UpsertChildExecutionInfos []*ChildExecutionInfo
	DeleteChildExecutionInfo  *int64
}

UpdateWorkflowExecutionRequest is used to update a workflow execution

type UserTimerTask

type UserTimerTask struct {
	TaskID  int64
	EventID int64
}

UserTimerTask identifies a timeout task.

func (*UserTimerTask) GetTaskID

func (u *UserTimerTask) GetTaskID() int64

GetTaskID returns the sequence ID of the timer task.

func (*UserTimerTask) GetType

func (u *UserTimerTask) GetType() int

GetType returns the type of the timer task

func (*UserTimerTask) SetTaskID

func (u *UserTimerTask) SetTaskID(id int64)

SetTaskID sets the sequence ID of the timer task.

type VisibilityManager

type VisibilityManager interface {
	RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
	RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error
	ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error)
	ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error)
	ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
	ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
}

VisibilityManager is used to manage the visibility store

func NewCassandraVisibilityPersistence

func NewCassandraVisibilityPersistence(
	hosts string, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error)

NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation

type WorkflowExecutionInfo

type WorkflowExecutionInfo struct {
	DomainID             string
	WorkflowID           string
	RunID                string
	ParentDomainID       string
	ParentWorkflowID     string
	ParentRunID          string
	InitiatedID          int64
	CompletionEvent      []byte
	TaskList             string
	WorkflowTypeName     string
	DecisionTimeoutValue int32
	ExecutionContext     []byte
	State                int
	CloseStatus          int
	NextEventID          int64
	LastProcessedEvent   int64
	StartTimestamp       time.Time
	LastUpdatedTimestamp time.Time
	CreateRequestID      string
	DecisionScheduleID   int64
	DecisionStartedID    int64
	DecisionRequestID    string
	DecisionTimeout      int32
}

WorkflowExecutionInfo describes a workflow execution

type WorkflowMutableState

type WorkflowMutableState struct {
	ActivitInfos        map[int64]*ActivityInfo
	TimerInfos          map[string]*TimerInfo
	ChildExecutionInfos map[int64]*ChildExecutionInfo
	ExecutionInfo       *WorkflowExecutionInfo
}

WorkflowMutableState indicates workflow related state

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL