Documentation ¶
Index ¶
- Variables
- func CheckCompatibleVersion(cfg config.Cassandra, r resolver.ServiceResolver, expectedVersion string) error
- func CreateCassandraKeyspace(s gocql.Session, keyspace string, replicas int, overwrite bool, ...) (err error)
- func DropCassandraKeyspace(s gocql.Session, keyspace string, logger log.Logger) (err error)
- func GetTaskTTL(expireTime *time.Time) int64
- func NewClusterMetadataStore(session gocql.Session, logger log.Logger) (p.ClusterMetadataStore, error)
- func NewMetadataStore(currentClusterName string, session gocql.Session, logger log.Logger) (p.MetadataStore, error)
- func NewQueueStore(queueType persistence.QueueType, session gocql.Session, logger log.Logger) (persistence.Queue, error)
- func VerifyCompatibleVersion(cfg config.Persistence, r resolver.ServiceResolver) error
- type ClusterMetadataStore
- func (m *ClusterMetadataStore) Close()
- func (m *ClusterMetadataStore) GetClusterMembers(request *p.GetClusterMembersRequest) (*p.GetClusterMembersResponse, error)
- func (m *ClusterMetadataStore) GetClusterMetadata() (*p.InternalGetClusterMetadataResponse, error)
- func (m *ClusterMetadataStore) GetName() string
- func (m *ClusterMetadataStore) PruneClusterMembership(request *p.PruneClusterMembershipRequest) error
- func (m *ClusterMetadataStore) SaveClusterMetadata(request *p.InternalSaveClusterMetadataRequest) (bool, error)
- func (m *ClusterMetadataStore) UpsertClusterMembership(request *p.UpsertClusterMembershipRequest) error
- type ExecutionStore
- func (d *ExecutionStore) Close()
- func (d *ExecutionStore) ConflictResolveWorkflowExecution(request *p.InternalConflictResolveWorkflowExecutionRequest) error
- func (d *ExecutionStore) CreateWorkflowExecution(request *p.InternalCreateWorkflowExecutionRequest) (*p.InternalCreateWorkflowExecutionResponse, error)
- func (d *ExecutionStore) GetName() string
- func (d *ExecutionStore) UpdateWorkflowExecution(request *p.InternalUpdateWorkflowExecutionRequest) error
- type Factory
- func (f *Factory) Close()
- func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
- func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
- func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
- func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error)
- func (f *Factory) NewShardStore() (p.ShardStore, error)
- func (f *Factory) NewTaskStore() (p.TaskStore, error)
- type FieldNotFoundError
- type HistoryStore
- func (h *HistoryStore) AppendHistoryNodes(request *p.InternalAppendHistoryNodesRequest) error
- func (h *HistoryStore) DeleteHistoryBranch(request *p.InternalDeleteHistoryBranchRequest) error
- func (h *HistoryStore) DeleteHistoryNodes(request *p.InternalDeleteHistoryNodesRequest) error
- func (h *HistoryStore) ForkHistoryBranch(request *p.InternalForkHistoryBranchRequest) error
- func (h *HistoryStore) GetAllHistoryTreeBranches(request *p.GetAllHistoryTreeBranchesRequest) (*p.InternalGetAllHistoryTreeBranchesResponse, error)
- func (h *HistoryStore) GetHistoryTree(request *p.GetHistoryTreeRequest) (*p.InternalGetHistoryTreeResponse, error)
- func (h *HistoryStore) ReadHistoryBranch(request *p.InternalReadHistoryBranchRequest) (*p.InternalReadHistoryBranchResponse, error)
- type MatchingTaskStore
- func (d *MatchingTaskStore) Close()
- func (d *MatchingTaskStore) CompleteTask(request *p.CompleteTaskRequest) error
- func (d *MatchingTaskStore) CompleteTasksLessThan(request *p.CompleteTasksLessThanRequest) (int, error)
- func (d *MatchingTaskStore) CreateTaskQueue(request *p.InternalCreateTaskQueueRequest) error
- func (d *MatchingTaskStore) CreateTasks(request *p.InternalCreateTasksRequest) (*p.CreateTasksResponse, error)
- func (d *MatchingTaskStore) DeleteTaskQueue(request *p.DeleteTaskQueueRequest) error
- func (d *MatchingTaskStore) ExtendLease(request *p.InternalExtendLeaseRequest) error
- func (d *MatchingTaskStore) GetName() string
- func (d *MatchingTaskStore) GetTaskQueue(request *p.InternalGetTaskQueueRequest) (*p.InternalGetTaskQueueResponse, error)
- func (d *MatchingTaskStore) GetTasks(request *p.GetTasksRequest) (*p.InternalGetTasksResponse, error)
- func (d *MatchingTaskStore) ListTaskQueue(_ *p.ListTaskQueueRequest) (*p.InternalListTaskQueueResponse, error)
- func (d *MatchingTaskStore) UpdateTaskQueue(request *p.InternalUpdateTaskQueueRequest) (*p.UpdateTaskQueueResponse, error)
- type MetadataStore
- func (m *MetadataStore) Close()
- func (m *MetadataStore) CreateNamespace(request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
- func (m *MetadataStore) CreateNamespaceInV2Table(request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
- func (m *MetadataStore) DeleteNamespace(request *p.DeleteNamespaceRequest) error
- func (m *MetadataStore) DeleteNamespaceByName(request *p.DeleteNamespaceByNameRequest) error
- func (m *MetadataStore) GetMetadata() (*p.GetMetadataResponse, error)
- func (m *MetadataStore) GetName() string
- func (m *MetadataStore) GetNamespace(request *p.GetNamespaceRequest) (*p.InternalGetNamespaceResponse, error)
- func (m *MetadataStore) ListNamespaces(request *p.ListNamespacesRequest) (*p.InternalListNamespacesResponse, error)
- func (m *MetadataStore) UpdateNamespace(request *p.InternalUpdateNamespaceRequest) error
- type MutableStateStore
- func (d *MutableStateStore) ConflictResolveWorkflowExecution(request *p.InternalConflictResolveWorkflowExecutionRequest) error
- func (d *MutableStateStore) CreateWorkflowExecution(request *p.InternalCreateWorkflowExecutionRequest) (*p.InternalCreateWorkflowExecutionResponse, error)
- func (d *MutableStateStore) DeleteCurrentWorkflowExecution(request *p.DeleteCurrentWorkflowExecutionRequest) error
- func (d *MutableStateStore) DeleteWorkflowExecution(request *p.DeleteWorkflowExecutionRequest) error
- func (d *MutableStateStore) GetCurrentExecution(request *p.GetCurrentExecutionRequest) (*p.InternalGetCurrentExecutionResponse, error)
- func (d *MutableStateStore) GetWorkflowExecution(request *p.GetWorkflowExecutionRequest) (*p.InternalGetWorkflowExecutionResponse, error)
- func (d *MutableStateStore) ListConcreteExecutions(request *p.ListConcreteExecutionsRequest) (*p.InternalListConcreteExecutionsResponse, error)
- func (d *MutableStateStore) UpdateWorkflowExecution(request *p.InternalUpdateWorkflowExecutionRequest) error
- type MutableStateTaskStore
- func (d *MutableStateTaskStore) AddTasks(request *p.InternalAddTasksRequest) error
- func (d *MutableStateTaskStore) CompleteReplicationTask(request *p.CompleteReplicationTaskRequest) error
- func (d *MutableStateTaskStore) CompleteTimerTask(request *p.CompleteTimerTaskRequest) error
- func (d *MutableStateTaskStore) CompleteTransferTask(request *p.CompleteTransferTaskRequest) error
- func (d *MutableStateTaskStore) CompleteVisibilityTask(request *p.CompleteVisibilityTaskRequest) error
- func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ(request *p.DeleteReplicationTaskFromDLQRequest) error
- func (d *MutableStateTaskStore) GetReplicationTask(request *p.GetReplicationTaskRequest) (*p.GetReplicationTaskResponse, error)
- func (d *MutableStateTaskStore) GetReplicationTasks(request *p.GetReplicationTasksRequest) (*p.GetReplicationTasksResponse, error)
- func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ(request *p.GetReplicationTasksFromDLQRequest) (*p.GetReplicationTasksFromDLQResponse, error)
- func (d *MutableStateTaskStore) GetTimerIndexTasks(request *p.GetTimerIndexTasksRequest) (*p.GetTimerIndexTasksResponse, error)
- func (d *MutableStateTaskStore) GetTimerTask(request *p.GetTimerTaskRequest) (*p.GetTimerTaskResponse, error)
- func (d *MutableStateTaskStore) GetTransferTask(request *p.GetTransferTaskRequest) (*p.GetTransferTaskResponse, error)
- func (d *MutableStateTaskStore) GetTransferTasks(request *p.GetTransferTasksRequest) (*p.GetTransferTasksResponse, error)
- func (d *MutableStateTaskStore) GetVisibilityTask(request *p.GetVisibilityTaskRequest) (*p.GetVisibilityTaskResponse, error)
- func (d *MutableStateTaskStore) GetVisibilityTasks(request *p.GetVisibilityTasksRequest) (*p.GetVisibilityTasksResponse, error)
- func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(request *p.PutReplicationTaskToDLQRequest) error
- func (d *MutableStateTaskStore) RangeCompleteReplicationTask(request *p.RangeCompleteReplicationTaskRequest) error
- func (d *MutableStateTaskStore) RangeCompleteTimerTask(request *p.RangeCompleteTimerTaskRequest) error
- func (d *MutableStateTaskStore) RangeCompleteTransferTask(request *p.RangeCompleteTransferTaskRequest) error
- func (d *MutableStateTaskStore) RangeCompleteVisibilityTask(request *p.RangeCompleteVisibilityTaskRequest) error
- func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(request *p.RangeDeleteReplicationTaskFromDLQRequest) error
- type PersistedTypeMismatchError
- type QueueStore
- func (q *QueueStore) Close()
- func (q *QueueStore) DeleteMessageFromDLQ(messageID int64) error
- func (q *QueueStore) DeleteMessagesBefore(messageID int64) error
- func (q *QueueStore) EnqueueMessage(blob commonpb.DataBlob) error
- func (q *QueueStore) EnqueueMessageToDLQ(blob commonpb.DataBlob) (int64, error)
- func (q *QueueStore) GetAckLevels() (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) GetDLQAckLevels() (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) Init(blob *commonpb.DataBlob) error
- func (q *QueueStore) RangeDeleteMessagesFromDLQ(firstMessageID int64, lastMessageID int64) error
- func (q *QueueStore) ReadMessages(lastMessageID int64, maxCount int) ([]*persistence.QueueMessage, error)
- func (q *QueueStore) ReadMessagesFromDLQ(firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*persistence.QueueMessage, []byte, error)
- func (q *QueueStore) UpdateAckLevel(metadata *persistence.InternalQueueMetadata) error
- func (q *QueueStore) UpdateDLQAckLevel(metadata *persistence.InternalQueueMetadata) error
- type SchemaVersionReader
- type ShardStore
- func (d *ShardStore) Close()
- func (d *ShardStore) CreateShard(request *p.InternalCreateShardRequest) error
- func (d *ShardStore) GetClusterName() string
- func (d *ShardStore) GetName() string
- func (d *ShardStore) GetShard(request *p.InternalGetShardRequest) (*p.InternalGetShardResponse, error)
- func (d *ShardStore) UpdateShard(request *p.InternalUpdateShardRequest) error
- type TestCluster
- func (s *TestCluster) Config() config.Persistence
- func (s *TestCluster) CreateDatabase()
- func (s *TestCluster) CreateSession(keyspace string)
- func (s *TestCluster) DatabaseName() string
- func (s *TestCluster) DropDatabase()
- func (s *TestCluster) LoadSchema(schemaFile string)
- func (s *TestCluster) SetupTestDatabase()
- func (s *TestCluster) TearDownTestDatabase()
Constants ¶
This section is empty.
Variables ¶
var (
ErrGetSchemaVersion = errors.New("failed to get current schema version from cassandra")
)
Functions ¶
func CheckCompatibleVersion ¶ added in v1.12.0
func CheckCompatibleVersion( cfg config.Cassandra, r resolver.ServiceResolver, expectedVersion string, ) error
CheckCompatibleVersion check the version compatibility
func CreateCassandraKeyspace ¶
func CreateCassandraKeyspace(s gocql.Session, keyspace string, replicas int, overwrite bool, logger log.Logger) (err error)
CreateCassandraKeyspace creates the keyspace using this session for given replica count
func DropCassandraKeyspace ¶
DropCassandraKeyspace drops the given keyspace, if it exists
func GetTaskTTL ¶ added in v0.27.0
func NewClusterMetadataStore ¶ added in v1.13.0
func NewClusterMetadataStore( session gocql.Session, logger log.Logger, ) (p.ClusterMetadataStore, error)
NewClusterMetadataStore is used to create an instance of ClusterMetadataStore implementation
func NewMetadataStore ¶ added in v1.13.0
func NewMetadataStore( currentClusterName string, session gocql.Session, logger log.Logger, ) (p.MetadataStore, error)
NewMetadataStore is used to create an instance of the Namespace MetadataStore implementation
func NewQueueStore ¶ added in v1.13.0
func NewQueueStore( queueType persistence.QueueType, session gocql.Session, logger log.Logger, ) (persistence.Queue, error)
func VerifyCompatibleVersion ¶ added in v1.5.7
func VerifyCompatibleVersion( cfg config.Persistence, r resolver.ServiceResolver, ) error
VerifyCompatibleVersion ensures that the installed version of temporal and visibility keyspaces is greater than or equal to the expected version. In most cases, the versions should match. However if after a schema upgrade there is a code rollback, the code version (expected version) would fall lower than the actual version in cassandra.
Types ¶
type ClusterMetadataStore ¶ added in v1.13.0
type ClusterMetadataStore struct {
// contains filtered or unexported fields
}
func (*ClusterMetadataStore) Close ¶ added in v1.13.0
func (m *ClusterMetadataStore) Close()
func (*ClusterMetadataStore) GetClusterMembers ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetClusterMembers(request *p.GetClusterMembersRequest) (*p.GetClusterMembersResponse, error)
func (*ClusterMetadataStore) GetClusterMetadata ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetClusterMetadata() (*p.InternalGetClusterMetadataResponse, error)
func (*ClusterMetadataStore) GetName ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetName() string
func (*ClusterMetadataStore) PruneClusterMembership ¶ added in v1.13.0
func (m *ClusterMetadataStore) PruneClusterMembership(request *p.PruneClusterMembershipRequest) error
func (*ClusterMetadataStore) SaveClusterMetadata ¶ added in v1.13.0
func (m *ClusterMetadataStore) SaveClusterMetadata(request *p.InternalSaveClusterMetadataRequest) (bool, error)
func (*ClusterMetadataStore) UpsertClusterMembership ¶ added in v1.13.0
func (m *ClusterMetadataStore) UpsertClusterMembership(request *p.UpsertClusterMembershipRequest) error
type ExecutionStore ¶ added in v1.13.0
type ExecutionStore struct { *HistoryStore *MutableStateStore *MutableStateTaskStore }
func NewExecutionStore ¶ added in v1.12.0
func NewExecutionStore( session gocql.Session, logger log.Logger, ) *ExecutionStore
func (*ExecutionStore) Close ¶ added in v1.13.0
func (d *ExecutionStore) Close()
func (*ExecutionStore) ConflictResolveWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) ConflictResolveWorkflowExecution( request *p.InternalConflictResolveWorkflowExecutionRequest, ) error
func (*ExecutionStore) CreateWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) CreateWorkflowExecution( request *p.InternalCreateWorkflowExecutionRequest, ) (*p.InternalCreateWorkflowExecutionResponse, error)
func (*ExecutionStore) GetName ¶ added in v1.13.0
func (d *ExecutionStore) GetName() string
func (*ExecutionStore) UpdateWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) UpdateWorkflowExecution( request *p.InternalUpdateWorkflowExecutionRequest, ) error
type Factory ¶
Factory vends datastore implementations backed by cassandra
func NewFactory ¶
func NewFactory( cfg config.Cassandra, r resolver.ServiceResolver, clusterName string, logger log.Logger, ) *Factory
NewFactory returns an instance of a factory object which can be used to create data stores that are backed by cassandra
func NewFactoryFromSession ¶ added in v1.12.0
func NewFactoryFromSession( cfg config.Cassandra, clusterName string, logger log.Logger, session gocql.Session, ) *Factory
NewFactoryFromSession returns an instance of a factory object from the given session.
func (*Factory) NewClusterMetadataStore ¶ added in v0.27.0
func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
NewClusterMetadataStore returns a metadata store
func (*Factory) NewExecutionStore ¶
func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
NewExecutionStore returns a new ExecutionStore.
func (*Factory) NewMetadataStore ¶
func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
NewMetadataStore returns a metadata store
func (*Factory) NewShardStore ¶
func (f *Factory) NewShardStore() (p.ShardStore, error)
NewShardStore returns a new shard store
type FieldNotFoundError ¶ added in v0.27.0
type FieldNotFoundError struct {
Msg string
}
FieldNotFoundError is an error type returned when an untyped query return does not contain the expected fields.
func (FieldNotFoundError) Error ¶ added in v0.27.0
func (f FieldNotFoundError) Error() string
type HistoryStore ¶ added in v1.13.0
func NewHistoryStore ¶ added in v1.13.0
func NewHistoryStore( session gocql.Session, logger log.Logger, ) *HistoryStore
func (*HistoryStore) AppendHistoryNodes ¶ added in v1.13.0
func (h *HistoryStore) AppendHistoryNodes( request *p.InternalAppendHistoryNodesRequest, ) error
AppendHistoryNodes upsert a batch of events as a single node to a history branch Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (*HistoryStore) DeleteHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) DeleteHistoryBranch( request *p.InternalDeleteHistoryBranchRequest, ) error
DeleteHistoryBranch removes a branch
func (*HistoryStore) DeleteHistoryNodes ¶ added in v1.13.0
func (h *HistoryStore) DeleteHistoryNodes( request *p.InternalDeleteHistoryNodesRequest, ) error
DeleteHistoryNodes delete a history node
func (*HistoryStore) ForkHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) ForkHistoryBranch( request *p.InternalForkHistoryBranchRequest, ) error
ForkHistoryBranch forks a new branch from an existing branch Note that application must provide a void forking nodeID, it must be a valid nodeID in that branch. A valid forking nodeID can be an ancestor from the existing branch. For example, we have branch B1 with three nodes(1[1,2], 3[3,4,5] and 6[6,7,8]. 1, 3 and 6 are nodeIDs (first eventID of the batch). So B1 looks like this:
1[1,2] / 3[3,4,5] / 6[6,7,8]
Assuming we have branch B2 which contains one ancestor B1 stopping at 6 (exclusive). So B2 inherit nodeID 1 and 3 from B1, and have its own nodeID 6 and 8. Branch B2 looks like this:
1[1,2] / 3[3,4,5] \ 6[6,7] \ 8[8]
Now we want to fork a new branch B3 from B2. The only valid forking nodeIDs are 3,6 or 8. 1 is not valid because we can't fork from first node. 2/4/5 is NOT valid either because they are inside a batch.
Case #1: If we fork from nodeID 6, then B3 will have an ancestor B1 which stops at 6(exclusive). As we append a batch of events[6,7,8,9] to B3, it will look like :
1[1,2] / 3[3,4,5] \ 6[6,7,8,9]
Case #2: If we fork from node 8, then B3 will have two ancestors: B1 stops at 6(exclusive) and ancestor B2 stops at 8(exclusive) As we append a batch of events[8,9] to B3, it will look like:
1[1,2] / 3[3,4,5] / 6[6,7] \ 8[8,9]
func (*HistoryStore) GetAllHistoryTreeBranches ¶ added in v1.13.0
func (h *HistoryStore) GetAllHistoryTreeBranches( request *p.GetAllHistoryTreeBranchesRequest, ) (*p.InternalGetAllHistoryTreeBranchesResponse, error)
func (*HistoryStore) GetHistoryTree ¶ added in v1.13.0
func (h *HistoryStore) GetHistoryTree( request *p.GetHistoryTreeRequest, ) (*p.InternalGetHistoryTreeResponse, error)
GetHistoryTree returns all branch information of a tree
func (*HistoryStore) ReadHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) ReadHistoryBranch( request *p.InternalReadHistoryBranchRequest, ) (*p.InternalReadHistoryBranchResponse, error)
ReadHistoryBranch returns history node data for a branch NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
type MatchingTaskStore ¶ added in v1.13.0
func NewMatchingTaskStore ¶ added in v1.13.0
func NewMatchingTaskStore( session gocql.Session, logger log.Logger, ) *MatchingTaskStore
func (*MatchingTaskStore) Close ¶ added in v1.13.0
func (d *MatchingTaskStore) Close()
func (*MatchingTaskStore) CompleteTask ¶ added in v1.13.0
func (d *MatchingTaskStore) CompleteTask( request *p.CompleteTaskRequest, ) error
CompleteTask delete a task
func (*MatchingTaskStore) CompleteTasksLessThan ¶ added in v1.13.0
func (d *MatchingTaskStore) CompleteTasksLessThan( request *p.CompleteTasksLessThanRequest, ) (int, error)
CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will be returned to the caller
func (*MatchingTaskStore) CreateTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) CreateTaskQueue( request *p.InternalCreateTaskQueueRequest, ) error
func (*MatchingTaskStore) CreateTasks ¶ added in v1.13.0
func (d *MatchingTaskStore) CreateTasks( request *p.InternalCreateTasksRequest, ) (*p.CreateTasksResponse, error)
CreateTasks add tasks
func (*MatchingTaskStore) DeleteTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) DeleteTaskQueue( request *p.DeleteTaskQueueRequest, ) error
func (*MatchingTaskStore) ExtendLease ¶ added in v1.13.0
func (d *MatchingTaskStore) ExtendLease( request *p.InternalExtendLeaseRequest, ) error
func (*MatchingTaskStore) GetName ¶ added in v1.13.0
func (d *MatchingTaskStore) GetName() string
func (*MatchingTaskStore) GetTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) GetTaskQueue( request *p.InternalGetTaskQueueRequest, ) (*p.InternalGetTaskQueueResponse, error)
func (*MatchingTaskStore) GetTasks ¶ added in v1.13.0
func (d *MatchingTaskStore) GetTasks( request *p.GetTasksRequest, ) (*p.InternalGetTasksResponse, error)
GetTasks get a task
func (*MatchingTaskStore) ListTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) ListTaskQueue( _ *p.ListTaskQueueRequest, ) (*p.InternalListTaskQueueResponse, error)
func (*MatchingTaskStore) UpdateTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) UpdateTaskQueue( request *p.InternalUpdateTaskQueueRequest, ) (*p.UpdateTaskQueueResponse, error)
UpdateTaskQueue update task queue
type MetadataStore ¶ added in v1.13.0
type MetadataStore struct {
// contains filtered or unexported fields
}
func (*MetadataStore) Close ¶ added in v1.13.0
func (m *MetadataStore) Close()
func (*MetadataStore) CreateNamespace ¶ added in v1.13.0
func (m *MetadataStore) CreateNamespace(request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
CreateNamespace create a namespace Cassandra does not support conditional updates across multiple tables. For this reason we have to first insert into 'Namespaces' table and then do a conditional insert into namespaces_by_name table. If the conditional write fails we delete the orphaned entry from namespaces table. There is a chance delete entry could fail and we never delete the orphaned entry from namespaces table. We might need a background job to delete those orphaned record.
func (*MetadataStore) CreateNamespaceInV2Table ¶ added in v1.13.0
func (m *MetadataStore) CreateNamespaceInV2Table(request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
CreateNamespaceInV2Table is the temporary function used by namespace v1 -> v2 migration
func (*MetadataStore) DeleteNamespace ¶ added in v1.13.0
func (m *MetadataStore) DeleteNamespace(request *p.DeleteNamespaceRequest) error
func (*MetadataStore) DeleteNamespaceByName ¶ added in v1.13.0
func (m *MetadataStore) DeleteNamespaceByName(request *p.DeleteNamespaceByNameRequest) error
func (*MetadataStore) GetMetadata ¶ added in v1.13.0
func (m *MetadataStore) GetMetadata() (*p.GetMetadataResponse, error)
func (*MetadataStore) GetName ¶ added in v1.13.0
func (m *MetadataStore) GetName() string
func (*MetadataStore) GetNamespace ¶ added in v1.13.0
func (m *MetadataStore) GetNamespace(request *p.GetNamespaceRequest) (*p.InternalGetNamespaceResponse, error)
func (*MetadataStore) ListNamespaces ¶ added in v1.13.0
func (m *MetadataStore) ListNamespaces(request *p.ListNamespacesRequest) (*p.InternalListNamespacesResponse, error)
func (*MetadataStore) UpdateNamespace ¶ added in v1.13.0
func (m *MetadataStore) UpdateNamespace(request *p.InternalUpdateNamespaceRequest) error
type MutableStateStore ¶ added in v1.13.0
func NewMutableStateStore ¶ added in v1.13.0
func NewMutableStateStore( session gocql.Session, logger log.Logger, ) *MutableStateStore
func (*MutableStateStore) ConflictResolveWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) ConflictResolveWorkflowExecution( request *p.InternalConflictResolveWorkflowExecutionRequest, ) error
func (*MutableStateStore) CreateWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) CreateWorkflowExecution( request *p.InternalCreateWorkflowExecutionRequest, ) (*p.InternalCreateWorkflowExecutionResponse, error)
func (*MutableStateStore) DeleteCurrentWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) DeleteCurrentWorkflowExecution( request *p.DeleteCurrentWorkflowExecutionRequest, ) error
func (*MutableStateStore) DeleteWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) DeleteWorkflowExecution( request *p.DeleteWorkflowExecutionRequest, ) error
func (*MutableStateStore) GetCurrentExecution ¶ added in v1.13.0
func (d *MutableStateStore) GetCurrentExecution( request *p.GetCurrentExecutionRequest, ) (*p.InternalGetCurrentExecutionResponse, error)
func (*MutableStateStore) GetWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) GetWorkflowExecution( request *p.GetWorkflowExecutionRequest, ) (*p.InternalGetWorkflowExecutionResponse, error)
func (*MutableStateStore) ListConcreteExecutions ¶ added in v1.13.0
func (d *MutableStateStore) ListConcreteExecutions( request *p.ListConcreteExecutionsRequest, ) (*p.InternalListConcreteExecutionsResponse, error)
func (*MutableStateStore) UpdateWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) UpdateWorkflowExecution( request *p.InternalUpdateWorkflowExecutionRequest, ) error
type MutableStateTaskStore ¶ added in v1.13.0
func NewMutableStateTaskStore ¶ added in v1.13.0
func NewMutableStateTaskStore( session gocql.Session, logger log.Logger, ) *MutableStateTaskStore
func (*MutableStateTaskStore) AddTasks ¶ added in v1.13.0
func (d *MutableStateTaskStore) AddTasks( request *p.InternalAddTasksRequest, ) error
func (*MutableStateTaskStore) CompleteReplicationTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) CompleteReplicationTask( request *p.CompleteReplicationTaskRequest, ) error
func (*MutableStateTaskStore) CompleteTimerTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) CompleteTimerTask( request *p.CompleteTimerTaskRequest, ) error
func (*MutableStateTaskStore) CompleteTransferTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) CompleteTransferTask( request *p.CompleteTransferTaskRequest, ) error
func (*MutableStateTaskStore) CompleteVisibilityTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) CompleteVisibilityTask( request *p.CompleteVisibilityTaskRequest, ) error
func (*MutableStateTaskStore) DeleteReplicationTaskFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ( request *p.DeleteReplicationTaskFromDLQRequest, ) error
func (*MutableStateTaskStore) GetReplicationTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetReplicationTask( request *p.GetReplicationTaskRequest, ) (*p.GetReplicationTaskResponse, error)
func (*MutableStateTaskStore) GetReplicationTasks ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetReplicationTasks( request *p.GetReplicationTasksRequest, ) (*p.GetReplicationTasksResponse, error)
func (*MutableStateTaskStore) GetReplicationTasksFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ( request *p.GetReplicationTasksFromDLQRequest, ) (*p.GetReplicationTasksFromDLQResponse, error)
func (*MutableStateTaskStore) GetTimerIndexTasks ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetTimerIndexTasks( request *p.GetTimerIndexTasksRequest, ) (*p.GetTimerIndexTasksResponse, error)
func (*MutableStateTaskStore) GetTimerTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetTimerTask( request *p.GetTimerTaskRequest, ) (*p.GetTimerTaskResponse, error)
func (*MutableStateTaskStore) GetTransferTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetTransferTask( request *p.GetTransferTaskRequest, ) (*p.GetTransferTaskResponse, error)
func (*MutableStateTaskStore) GetTransferTasks ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetTransferTasks( request *p.GetTransferTasksRequest, ) (*p.GetTransferTasksResponse, error)
func (*MutableStateTaskStore) GetVisibilityTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetVisibilityTask( request *p.GetVisibilityTaskRequest, ) (*p.GetVisibilityTaskResponse, error)
func (*MutableStateTaskStore) GetVisibilityTasks ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetVisibilityTasks( request *p.GetVisibilityTasksRequest, ) (*p.GetVisibilityTasksResponse, error)
func (*MutableStateTaskStore) PutReplicationTaskToDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) PutReplicationTaskToDLQ( request *p.PutReplicationTaskToDLQRequest, ) error
func (*MutableStateTaskStore) RangeCompleteReplicationTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeCompleteReplicationTask( request *p.RangeCompleteReplicationTaskRequest, ) error
func (*MutableStateTaskStore) RangeCompleteTimerTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeCompleteTimerTask( request *p.RangeCompleteTimerTaskRequest, ) error
func (*MutableStateTaskStore) RangeCompleteTransferTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeCompleteTransferTask( request *p.RangeCompleteTransferTaskRequest, ) error
func (*MutableStateTaskStore) RangeCompleteVisibilityTask ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeCompleteVisibilityTask( request *p.RangeCompleteVisibilityTaskRequest, ) error
func (*MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ( request *p.RangeDeleteReplicationTaskFromDLQRequest, ) error
type PersistedTypeMismatchError ¶ added in v0.27.0
type PersistedTypeMismatchError struct {
Msg string
}
PersistedTypeMismatchError is an error type returned when a persisted cassandra value does not match the expected type.
func (PersistedTypeMismatchError) Error ¶ added in v0.27.0
func (f PersistedTypeMismatchError) Error() string
type QueueStore ¶ added in v1.13.0
type QueueStore struct {
// contains filtered or unexported fields
}
func (*QueueStore) Close ¶ added in v1.13.0
func (q *QueueStore) Close()
func (*QueueStore) DeleteMessageFromDLQ ¶ added in v1.13.0
func (q *QueueStore) DeleteMessageFromDLQ( messageID int64, ) error
func (*QueueStore) DeleteMessagesBefore ¶ added in v1.13.0
func (q *QueueStore) DeleteMessagesBefore( messageID int64, ) error
func (*QueueStore) EnqueueMessage ¶ added in v1.13.0
func (q *QueueStore) EnqueueMessage( blob commonpb.DataBlob, ) error
func (*QueueStore) EnqueueMessageToDLQ ¶ added in v1.13.0
func (q *QueueStore) EnqueueMessageToDLQ( blob commonpb.DataBlob, ) (int64, error)
func (*QueueStore) GetAckLevels ¶ added in v1.13.0
func (q *QueueStore) GetAckLevels() (*persistence.InternalQueueMetadata, error)
func (*QueueStore) GetDLQAckLevels ¶ added in v1.13.0
func (q *QueueStore) GetDLQAckLevels() (*persistence.InternalQueueMetadata, error)
func (*QueueStore) RangeDeleteMessagesFromDLQ ¶ added in v1.13.0
func (q *QueueStore) RangeDeleteMessagesFromDLQ( firstMessageID int64, lastMessageID int64, ) error
func (*QueueStore) ReadMessages ¶ added in v1.13.0
func (q *QueueStore) ReadMessages( lastMessageID int64, maxCount int, ) ([]*persistence.QueueMessage, error)
func (*QueueStore) ReadMessagesFromDLQ ¶ added in v1.13.0
func (q *QueueStore) ReadMessagesFromDLQ( firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte, ) ([]*persistence.QueueMessage, []byte, error)
func (*QueueStore) UpdateAckLevel ¶ added in v1.13.0
func (q *QueueStore) UpdateAckLevel(metadata *persistence.InternalQueueMetadata) error
func (*QueueStore) UpdateDLQAckLevel ¶ added in v1.13.0
func (q *QueueStore) UpdateDLQAckLevel(metadata *persistence.InternalQueueMetadata) error
type SchemaVersionReader ¶ added in v1.5.7
type SchemaVersionReader struct {
// contains filtered or unexported fields
}
func NewSchemaVersionReader ¶ added in v1.5.7
func NewSchemaVersionReader(session gocql.Session) *SchemaVersionReader
func (*SchemaVersionReader) ReadSchemaVersion ¶ added in v1.5.7
func (svr *SchemaVersionReader) ReadSchemaVersion(keyspace string) (string, error)
ReadSchemaVersion returns the current schema version for the Keyspace
type ShardStore ¶ added in v1.13.0
func NewShardStore ¶ added in v1.13.0
func (*ShardStore) Close ¶ added in v1.13.0
func (d *ShardStore) Close()
func (*ShardStore) CreateShard ¶ added in v1.13.0
func (d *ShardStore) CreateShard( request *p.InternalCreateShardRequest, ) error
func (*ShardStore) GetClusterName ¶ added in v1.13.0
func (d *ShardStore) GetClusterName() string
func (*ShardStore) GetName ¶ added in v1.13.0
func (d *ShardStore) GetName() string
func (*ShardStore) GetShard ¶ added in v1.13.0
func (d *ShardStore) GetShard( request *p.InternalGetShardRequest, ) (*p.InternalGetShardResponse, error)
func (*ShardStore) UpdateShard ¶ added in v1.13.0
func (d *ShardStore) UpdateShard( request *p.InternalUpdateShardRequest, ) error
type TestCluster ¶
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster allows executing cassandra operations in testing.
func NewTestCluster ¶
func NewTestCluster(keyspace, username, password, host string, port int, schemaDir string, faultInjection *config.FaultInjection, logger log.Logger) *TestCluster
NewTestCluster returns a new cassandra test cluster
func (*TestCluster) Config ¶
func (s *TestCluster) Config() config.Persistence
Config returns the persistence config for connecting to this test cluster
func (*TestCluster) CreateDatabase ¶
func (s *TestCluster) CreateDatabase()
CreateDatabase from PersistenceTestCluster interface
func (*TestCluster) CreateSession ¶
func (s *TestCluster) CreateSession( keyspace string, )
CreateSession from PersistenceTestCluster interface
func (*TestCluster) DatabaseName ¶
func (s *TestCluster) DatabaseName() string
DatabaseName from PersistenceTestCluster interface
func (*TestCluster) DropDatabase ¶
func (s *TestCluster) DropDatabase()
DropDatabase from PersistenceTestCluster interface
func (*TestCluster) LoadSchema ¶
func (s *TestCluster) LoadSchema(schemaFile string)
LoadSchema from PersistenceTestCluster interface
func (*TestCluster) SetupTestDatabase ¶
func (s *TestCluster) SetupTestDatabase()
SetupTestDatabase from PersistenceTestCluster interface
func (*TestCluster) TearDownTestDatabase ¶
func (s *TestCluster) TearDownTestDatabase()
TearDownTestDatabase from PersistenceTestCluster interface