Documentation ¶
Index ¶
- Constants
- Variables
- func CheckCompatibleVersion(cfg config.Cassandra, r resolver.ServiceResolver, expectedVersion string) error
- func CreateCassandraKeyspace(s gocql.Session, keyspace string, replicas int, overwrite bool, ...) (err error)
- func DropCassandraKeyspace(s gocql.Session, keyspace string, logger log.Logger) (err error)
- func GetTaskTTL(expireTime *timestamppb.Timestamp) int64
- func NewClusterMetadataStore(session gocql.Session, logger log.Logger) (p.ClusterMetadataStore, error)
- func NewMetadataStore(currentClusterName string, session gocql.Session, logger log.Logger) (p.MetadataStore, error)
- func NewNexusEndpointStore(session gocql.Session, logger log.Logger) p.NexusEndpointStore
- func NewQueueStore(queueType persistence.QueueType, session gocql.Session, logger log.Logger) (persistence.Queue, error)
- func NewQueueV2Store(session gocql.Session, logger log.Logger) persistence.QueueV2
- func VerifyCompatibleVersion(cfg config.Persistence, r resolver.ServiceResolver) error
- type ClusterMetadataStore
- func (m *ClusterMetadataStore) Close()
- func (m *ClusterMetadataStore) DeleteClusterMetadata(ctx context.Context, request *p.InternalDeleteClusterMetadataRequest) error
- func (m *ClusterMetadataStore) GetClusterMembers(ctx context.Context, request *p.GetClusterMembersRequest) (*p.GetClusterMembersResponse, error)
- func (m *ClusterMetadataStore) GetClusterMetadata(ctx context.Context, request *p.InternalGetClusterMetadataRequest) (*p.InternalGetClusterMetadataResponse, error)
- func (m *ClusterMetadataStore) GetName() string
- func (m *ClusterMetadataStore) ListClusterMetadata(ctx context.Context, request *p.InternalListClusterMetadataRequest) (*p.InternalListClusterMetadataResponse, error)
- func (m *ClusterMetadataStore) PruneClusterMembership(_ context.Context, request *p.PruneClusterMembershipRequest) error
- func (m *ClusterMetadataStore) SaveClusterMetadata(ctx context.Context, request *p.InternalSaveClusterMetadataRequest) (bool, error)
- func (m *ClusterMetadataStore) UpsertClusterMembership(ctx context.Context, request *p.UpsertClusterMembershipRequest) error
- type ExecutionStore
- func (d *ExecutionStore) Close()
- func (d *ExecutionStore) ConflictResolveWorkflowExecution(ctx context.Context, ...) error
- func (d *ExecutionStore) CreateWorkflowExecution(ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest) (*p.InternalCreateWorkflowExecutionResponse, error)
- func (d *ExecutionStore) GetName() string
- func (d *ExecutionStore) UpdateWorkflowExecution(ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest) error
- type Factory
- func (f *Factory) Close()
- func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
- func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
- func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
- func (f *Factory) NewNexusEndpointStore() (p.NexusEndpointStore, error)
- func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error)
- func (f *Factory) NewQueueV2() (p.QueueV2, error)
- func (f *Factory) NewShardStore() (p.ShardStore, error)
- func (f *Factory) NewTaskStore() (p.TaskStore, error)
- type FieldNotFoundError
- type HistoryStore
- func (h *HistoryStore) AppendHistoryNodes(ctx context.Context, request *p.InternalAppendHistoryNodesRequest) error
- func (h *HistoryStore) DeleteHistoryBranch(ctx context.Context, request *p.InternalDeleteHistoryBranchRequest) error
- func (h *HistoryStore) DeleteHistoryNodes(ctx context.Context, request *p.InternalDeleteHistoryNodesRequest) error
- func (h *HistoryStore) ForkHistoryBranch(ctx context.Context, request *p.InternalForkHistoryBranchRequest) error
- func (h *HistoryStore) GetAllHistoryTreeBranches(ctx context.Context, request *p.GetAllHistoryTreeBranchesRequest) (*p.InternalGetAllHistoryTreeBranchesResponse, error)
- func (h *HistoryStore) GetHistoryTreeContainingBranch(ctx context.Context, request *p.InternalGetHistoryTreeContainingBranchRequest) (*p.InternalGetHistoryTreeContainingBranchResponse, error)
- func (h *HistoryStore) ReadHistoryBranch(ctx context.Context, request *p.InternalReadHistoryBranchRequest) (*p.InternalReadHistoryBranchResponse, error)
- type MatchingTaskStore
- func (d *MatchingTaskStore) Close()
- func (d *MatchingTaskStore) CompleteTasksLessThan(ctx context.Context, request *p.CompleteTasksLessThanRequest) (int, error)
- func (d *MatchingTaskStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (int, error)
- func (d *MatchingTaskStore) CreateTaskQueue(ctx context.Context, request *p.InternalCreateTaskQueueRequest) error
- func (d *MatchingTaskStore) CreateTasks(ctx context.Context, request *p.InternalCreateTasksRequest) (*p.CreateTasksResponse, error)
- func (d *MatchingTaskStore) DeleteTaskQueue(ctx context.Context, request *p.DeleteTaskQueueRequest) error
- func (d *MatchingTaskStore) GetName() string
- func (d *MatchingTaskStore) GetTaskQueue(ctx context.Context, request *p.InternalGetTaskQueueRequest) (*p.InternalGetTaskQueueResponse, error)
- func (d *MatchingTaskStore) GetTaskQueueUserData(ctx context.Context, request *p.GetTaskQueueUserDataRequest) (*p.InternalGetTaskQueueUserDataResponse, error)
- func (d *MatchingTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) ([]string, error)
- func (d *MatchingTaskStore) GetTasks(ctx context.Context, request *p.GetTasksRequest) (*p.InternalGetTasksResponse, error)
- func (d *MatchingTaskStore) ListTaskQueue(_ context.Context, _ *p.ListTaskQueueRequest) (*p.InternalListTaskQueueResponse, error)
- func (d *MatchingTaskStore) ListTaskQueueUserDataEntries(ctx context.Context, request *p.ListTaskQueueUserDataEntriesRequest) (*p.InternalListTaskQueueUserDataEntriesResponse, error)
- func (d *MatchingTaskStore) UpdateTaskQueue(ctx context.Context, request *p.InternalUpdateTaskQueueRequest) (*p.UpdateTaskQueueResponse, error)
- func (d *MatchingTaskStore) UpdateTaskQueueUserData(ctx context.Context, request *p.InternalUpdateTaskQueueUserDataRequest) error
- type MetadataStore
- func (m *MetadataStore) Close()
- func (m *MetadataStore) CreateNamespace(ctx context.Context, request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
- func (m *MetadataStore) CreateNamespaceInV2Table(ctx context.Context, request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
- func (m *MetadataStore) DeleteNamespace(ctx context.Context, request *p.DeleteNamespaceRequest) error
- func (m *MetadataStore) DeleteNamespaceByName(ctx context.Context, request *p.DeleteNamespaceByNameRequest) error
- func (m *MetadataStore) GetMetadata(ctx context.Context) (*p.GetMetadataResponse, error)
- func (m *MetadataStore) GetName() string
- func (m *MetadataStore) GetNamespace(ctx context.Context, request *p.GetNamespaceRequest) (*p.InternalGetNamespaceResponse, error)
- func (m *MetadataStore) ListNamespaces(ctx context.Context, request *p.InternalListNamespacesRequest) (*p.InternalListNamespacesResponse, error)
- func (m *MetadataStore) RenameNamespace(ctx context.Context, request *p.InternalRenameNamespaceRequest) error
- func (m *MetadataStore) UpdateNamespace(ctx context.Context, request *p.InternalUpdateNamespaceRequest) error
- type MutableStateStore
- func (d *MutableStateStore) ConflictResolveWorkflowExecution(ctx context.Context, ...) error
- func (d *MutableStateStore) CreateWorkflowExecution(ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest) (*p.InternalCreateWorkflowExecutionResponse, error)
- func (d *MutableStateStore) DeleteCurrentWorkflowExecution(ctx context.Context, request *p.DeleteCurrentWorkflowExecutionRequest) error
- func (d *MutableStateStore) DeleteWorkflowExecution(ctx context.Context, request *p.DeleteWorkflowExecutionRequest) error
- func (d *MutableStateStore) GetCurrentExecution(ctx context.Context, request *p.GetCurrentExecutionRequest) (*p.InternalGetCurrentExecutionResponse, error)
- func (d *MutableStateStore) GetWorkflowExecution(ctx context.Context, request *p.GetWorkflowExecutionRequest) (*p.InternalGetWorkflowExecutionResponse, error)
- func (d *MutableStateStore) ListConcreteExecutions(ctx context.Context, request *p.ListConcreteExecutionsRequest) (*p.InternalListConcreteExecutionsResponse, error)
- func (d *MutableStateStore) SetWorkflowExecution(ctx context.Context, request *p.InternalSetWorkflowExecutionRequest) error
- func (d *MutableStateStore) UpdateWorkflowExecution(ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest) error
- type MutableStateTaskStore
- func (d *MutableStateTaskStore) AddHistoryTasks(ctx context.Context, request *p.InternalAddHistoryTasksRequest) error
- func (d *MutableStateTaskStore) CompleteHistoryTask(ctx context.Context, request *p.CompleteHistoryTaskRequest) error
- func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ(ctx context.Context, request *p.DeleteReplicationTaskFromDLQRequest) error
- func (d *MutableStateTaskStore) GetHistoryTasks(ctx context.Context, request *p.GetHistoryTasksRequest) (*p.InternalGetHistoryTasksResponse, error)
- func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ(ctx context.Context, request *p.GetReplicationTasksFromDLQRequest) (*p.InternalGetHistoryTasksResponse, error)
- func (d *MutableStateTaskStore) IsReplicationDLQEmpty(ctx context.Context, request *p.GetReplicationTasksFromDLQRequest) (bool, error)
- func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(ctx context.Context, request *p.PutReplicationTaskToDLQRequest) error
- func (d *MutableStateTaskStore) RangeCompleteHistoryTasks(ctx context.Context, request *p.RangeCompleteHistoryTasksRequest) error
- func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *p.RangeDeleteReplicationTaskFromDLQRequest) error
- type NexusEndpointStore
- func (s *NexusEndpointStore) Close()
- func (s *NexusEndpointStore) CreateOrUpdateNexusEndpoint(ctx context.Context, request *p.InternalCreateOrUpdateNexusEndpointRequest) error
- func (s *NexusEndpointStore) DeleteNexusEndpoint(ctx context.Context, request *p.DeleteNexusEndpointRequest) error
- func (s *NexusEndpointStore) GetName() string
- func (s *NexusEndpointStore) GetNexusEndpoint(ctx context.Context, request *p.GetNexusEndpointRequest) (*p.InternalNexusEndpoint, error)
- func (s *NexusEndpointStore) ListNexusEndpoints(ctx context.Context, request *p.ListNexusEndpointsRequest) (*p.InternalListNexusEndpointsResponse, error)
- type PersistedTypeMismatchError
- type Queue
- type QueueStore
- func (q *QueueStore) Close()
- func (q *QueueStore) DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
- func (q *QueueStore) DeleteMessagesBefore(ctx context.Context, messageID int64) error
- func (q *QueueStore) EnqueueMessage(ctx context.Context, blob *commonpb.DataBlob) error
- func (q *QueueStore) EnqueueMessageToDLQ(ctx context.Context, blob *commonpb.DataBlob) (int64, error)
- func (q *QueueStore) GetAckLevels(ctx context.Context) (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) GetDLQAckLevels(ctx context.Context) (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) Init(ctx context.Context, blob *commonpb.DataBlob) error
- func (q *QueueStore) RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
- func (q *QueueStore) ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*persistence.QueueMessage, error)
- func (q *QueueStore) ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, ...) ([]*persistence.QueueMessage, []byte, error)
- func (q *QueueStore) UpdateAckLevel(ctx context.Context, metadata *persistence.InternalQueueMetadata) error
- func (q *QueueStore) UpdateDLQAckLevel(ctx context.Context, metadata *persistence.InternalQueueMetadata) error
- type SchemaVersionReader
- type ShardStore
- func (d *ShardStore) AssertShardOwnership(ctx context.Context, request *p.AssertShardOwnershipRequest) error
- func (d *ShardStore) Close()
- func (d *ShardStore) GetClusterName() string
- func (d *ShardStore) GetName() string
- func (d *ShardStore) GetOrCreateShard(ctx context.Context, request *p.InternalGetOrCreateShardRequest) (*p.InternalGetOrCreateShardResponse, error)
- func (d *ShardStore) UpdateShard(ctx context.Context, request *p.InternalUpdateShardRequest) error
- type TestCluster
- func (s *TestCluster) Config() config.Persistence
- func (s *TestCluster) CreateDatabase()
- func (s *TestCluster) CreateSession(keyspace string)
- func (s *TestCluster) DatabaseName() string
- func (s *TestCluster) DropDatabase()
- func (s *TestCluster) GetSession() commongocql.Session
- func (s *TestCluster) LoadSchema(schemaFile string)
- func (s *TestCluster) SetupTestDatabase()
- func (s *TestCluster) TearDownTestDatabase()
Constants ¶
const ( TemplateEnqueueMessageQuery = `` /* 155-byte string literal not displayed */ TemplateGetMessagesQuery = `` /* 188-byte string literal not displayed */ TemplateGetMaxMessageIDQuery = `` /* 134-byte string literal not displayed */ TemplateCreateQueueQuery = `` /* 126-byte string literal not displayed */ TemplateGetQueueQuery = `SELECT metadata_payload, metadata_encoding, version FROM queues WHERE queue_type = ? AND queue_name = ?` TemplateRangeDeleteMessagesQuery = `` /* 130-byte string literal not displayed */ TemplateUpdateQueueMetadataQuery = `` /* 129-byte string literal not displayed */ )
Variables ¶
var ( // ErrEnqueueMessageConflict is returned when a message with the same ID already exists in the queue. This is // possible when there are concurrent writes to the queue because we enqueue a message using two queries: // // 1. SELECT MAX(ID) to get the next message ID (for a given queue partition) // 2. INSERT (ID, message) with IF NOT EXISTS // // See the following example: // // Client A Client B Cassandra DB // | | | // |--1. SELECT MAX(ID) FROM queue_messages----------------------->| // | | | // |<-2. Return X--------------------------------------------------| // | | | // | |--3. SELECT MAX(ID) FROM queue_messages---->| // | | | // | |<-4. Return X-------------------------------| // | | | // |--5. INSERT INTO queue_messages (ID = X)---------------------->| // | | | // |<-6. Acknowledge-----------------------------------------------| // | | | // | |--7. INSERT INTO queue_messages (ID = X)--->| // | | | // | |<-8. Conflict/Error-------------------------| // | | | ErrEnqueueMessageConflict = &persistence.ConditionFailedError{ Msg: "conflict inserting queue message, likely due to concurrent writes", } // ErrUpdateQueueConflict is returned when a queue is updated with the wrong version. This happens when there are // concurrent writes to the queue because we update a queue using two queries, similar to the enqueue message query. // // 1. SELECT (queue, version) FROM queues // 2. UPDATE queue, version IF version = version from step 1 // // See the following example: // // Client A Client B Cassandra DB // | | | // |--1. SELECT (queue, version) FROM queues---------------------->| // | | | // |<-2. Return (queue, v1)----------------------------------------| // | | | // | |--3. SELECT (queue, version) FROM queues--->| // | | | // | |<-4. Return (queue, v1)---------------------| // | | | // |--5. UPDATE queue, version IF version = v1-------------------->| // | | | // |<-6. Acknowledge-----------------------------------------------| // | | | // | |--7. UPDATE queue, version IF version = v1->| // | | | // | |<-8. Conflict/Error-------------------------| // | | | ErrUpdateQueueConflict = &persistence.ConditionFailedError{ Msg: "conflict updating queue, likely due to concurrent writes", } )
Functions ¶
func CheckCompatibleVersion ¶ added in v1.12.0
func CheckCompatibleVersion( cfg config.Cassandra, r resolver.ServiceResolver, expectedVersion string, ) error
CheckCompatibleVersion check the version compatibility
func CreateCassandraKeyspace ¶
func CreateCassandraKeyspace(s gocql.Session, keyspace string, replicas int, overwrite bool, logger log.Logger) (err error)
CreateCassandraKeyspace creates the keyspace using this session for given replica count
func DropCassandraKeyspace ¶
DropCassandraKeyspace drops the given keyspace, if it exists
func GetTaskTTL ¶ added in v0.27.0
func GetTaskTTL(expireTime *timestamppb.Timestamp) int64
func NewClusterMetadataStore ¶ added in v1.13.0
func NewClusterMetadataStore( session gocql.Session, logger log.Logger, ) (p.ClusterMetadataStore, error)
NewClusterMetadataStore is used to create an instance of ClusterMetadataStore implementation
func NewMetadataStore ¶ added in v1.13.0
func NewMetadataStore( currentClusterName string, session gocql.Session, logger log.Logger, ) (p.MetadataStore, error)
NewMetadataStore is used to create an instance of the Namespace MetadataStore implementation
func NewNexusEndpointStore ¶ added in v1.25.0
func NewQueueStore ¶ added in v1.13.0
func NewQueueStore( queueType persistence.QueueType, session gocql.Session, logger log.Logger, ) (persistence.Queue, error)
func NewQueueV2Store ¶ added in v1.23.0
func VerifyCompatibleVersion ¶ added in v1.5.7
func VerifyCompatibleVersion( cfg config.Persistence, r resolver.ServiceResolver, ) error
VerifyCompatibleVersion ensures that the installed version of temporal and visibility keyspaces is greater than or equal to the expected version. In most cases, the versions should match. However if after a schema upgrade there is a code rollback, the code version (expected version) would fall lower than the actual version in cassandra.
Types ¶
type ClusterMetadataStore ¶ added in v1.13.0
type ClusterMetadataStore struct {
// contains filtered or unexported fields
}
func (*ClusterMetadataStore) Close ¶ added in v1.13.0
func (m *ClusterMetadataStore) Close()
func (*ClusterMetadataStore) DeleteClusterMetadata ¶ added in v1.14.0
func (m *ClusterMetadataStore) DeleteClusterMetadata( ctx context.Context, request *p.InternalDeleteClusterMetadataRequest, ) error
func (*ClusterMetadataStore) GetClusterMembers ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetClusterMembers( ctx context.Context, request *p.GetClusterMembersRequest, ) (*p.GetClusterMembersResponse, error)
func (*ClusterMetadataStore) GetClusterMetadata ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetClusterMetadata( ctx context.Context, request *p.InternalGetClusterMetadataRequest, ) (*p.InternalGetClusterMetadataResponse, error)
func (*ClusterMetadataStore) GetName ¶ added in v1.13.0
func (m *ClusterMetadataStore) GetName() string
func (*ClusterMetadataStore) ListClusterMetadata ¶ added in v1.14.0
func (m *ClusterMetadataStore) ListClusterMetadata( ctx context.Context, request *p.InternalListClusterMetadataRequest, ) (*p.InternalListClusterMetadataResponse, error)
func (*ClusterMetadataStore) PruneClusterMembership ¶ added in v1.13.0
func (m *ClusterMetadataStore) PruneClusterMembership( _ context.Context, request *p.PruneClusterMembershipRequest, ) error
func (*ClusterMetadataStore) SaveClusterMetadata ¶ added in v1.13.0
func (m *ClusterMetadataStore) SaveClusterMetadata( ctx context.Context, request *p.InternalSaveClusterMetadataRequest, ) (bool, error)
func (*ClusterMetadataStore) UpsertClusterMembership ¶ added in v1.13.0
func (m *ClusterMetadataStore) UpsertClusterMembership( ctx context.Context, request *p.UpsertClusterMembershipRequest, ) error
type ExecutionStore ¶ added in v1.13.0
type ExecutionStore struct { *HistoryStore *MutableStateStore *MutableStateTaskStore }
func NewExecutionStore ¶ added in v1.12.0
func NewExecutionStore(session gocql.Session) *ExecutionStore
func (*ExecutionStore) Close ¶ added in v1.13.0
func (d *ExecutionStore) Close()
func (*ExecutionStore) ConflictResolveWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) ConflictResolveWorkflowExecution( ctx context.Context, request *p.InternalConflictResolveWorkflowExecutionRequest, ) error
func (*ExecutionStore) CreateWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) CreateWorkflowExecution( ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest, ) (*p.InternalCreateWorkflowExecutionResponse, error)
func (*ExecutionStore) GetName ¶ added in v1.13.0
func (d *ExecutionStore) GetName() string
func (*ExecutionStore) UpdateWorkflowExecution ¶ added in v1.13.0
func (d *ExecutionStore) UpdateWorkflowExecution( ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest, ) error
type Factory ¶
Factory vends datastore implementations backed by cassandra
func NewFactory ¶
func NewFactory( cfg config.Cassandra, r resolver.ServiceResolver, clusterName string, logger log.Logger, metricsHandler metrics.Handler, ) *Factory
NewFactory returns an instance of a factory object which can be used to create data stores that are backed by cassandra
func NewFactoryFromSession ¶ added in v1.12.0
func NewFactoryFromSession( cfg config.Cassandra, clusterName string, logger log.Logger, session commongocql.Session, ) *Factory
NewFactoryFromSession returns an instance of a factory object from the given session.
func (*Factory) NewClusterMetadataStore ¶ added in v0.27.0
func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
NewClusterMetadataStore returns a metadata store
func (*Factory) NewExecutionStore ¶
func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
NewExecutionStore returns a new ExecutionStore.
func (*Factory) NewMetadataStore ¶
func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
NewMetadataStore returns a metadata store
func (*Factory) NewNexusEndpointStore ¶ added in v1.25.0
func (f *Factory) NewNexusEndpointStore() (p.NexusEndpointStore, error)
NewNexusEndpointStore returns a new NexusEndpointStore
func (*Factory) NewQueueV2 ¶ added in v1.23.0
NewQueueV2 returns a new data-access object for queues and messages stored in Cassandra. It will never return an error.
func (*Factory) NewShardStore ¶
func (f *Factory) NewShardStore() (p.ShardStore, error)
NewShardStore returns a new shard store
type FieldNotFoundError ¶ added in v0.27.0
type FieldNotFoundError struct {
Msg string
}
FieldNotFoundError is an error type returned when an untyped query return does not contain the expected fields.
func (FieldNotFoundError) Error ¶ added in v0.27.0
func (f FieldNotFoundError) Error() string
type HistoryStore ¶ added in v1.13.0
type HistoryStore struct { Session gocql.Session p.HistoryBranchUtilImpl }
func NewHistoryStore ¶ added in v1.13.0
func NewHistoryStore(session gocql.Session) *HistoryStore
func (*HistoryStore) AppendHistoryNodes ¶ added in v1.13.0
func (h *HistoryStore) AppendHistoryNodes( ctx context.Context, request *p.InternalAppendHistoryNodesRequest, ) error
AppendHistoryNodes upsert a batch of events as a single node to a history branch Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID
func (*HistoryStore) DeleteHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) DeleteHistoryBranch( ctx context.Context, request *p.InternalDeleteHistoryBranchRequest, ) error
DeleteHistoryBranch removes a branch
func (*HistoryStore) DeleteHistoryNodes ¶ added in v1.13.0
func (h *HistoryStore) DeleteHistoryNodes( ctx context.Context, request *p.InternalDeleteHistoryNodesRequest, ) error
DeleteHistoryNodes delete a history node
func (*HistoryStore) ForkHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) ForkHistoryBranch( ctx context.Context, request *p.InternalForkHistoryBranchRequest, ) error
ForkHistoryBranch forks a new branch from an existing branch Note that application must provide a void forking nodeID, it must be a valid nodeID in that branch. A valid forking nodeID can be an ancestor from the existing branch. For example, we have branch B1 with three nodes(1[1,2], 3[3,4,5] and 6[6,7,8]. 1, 3 and 6 are nodeIDs (first eventID of the batch). So B1 looks like this:
1[1,2] / 3[3,4,5] / 6[6,7,8]
Assuming we have branch B2 which contains one ancestor B1 stopping at 6 (exclusive). So B2 inherit nodeID 1 and 3 from B1, and have its own nodeID 6 and 8. Branch B2 looks like this:
1[1,2] / 3[3,4,5] \ 6[6,7] \ 8[8]
Now we want to fork a new branch B3 from B2. The only valid forking nodeIDs are 3,6 or 8. 1 is not valid because we can't fork from first node. 2/4/5 is NOT valid either because they are inside a batch.
Case #1: If we fork from nodeID 6, then B3 will have an ancestor B1 which stops at 6(exclusive). As we append a batch of events[6,7,8,9] to B3, it will look like :
1[1,2] / 3[3,4,5] \ 6[6,7,8,9]
Case #2: If we fork from node 8, then B3 will have two ancestors: B1 stops at 6(exclusive) and ancestor B2 stops at 8(exclusive) As we append a batch of events[8,9] to B3, it will look like:
1[1,2] / 3[3,4,5] / 6[6,7] \ 8[8,9]
func (*HistoryStore) GetAllHistoryTreeBranches ¶ added in v1.13.0
func (h *HistoryStore) GetAllHistoryTreeBranches( ctx context.Context, request *p.GetAllHistoryTreeBranchesRequest, ) (*p.InternalGetAllHistoryTreeBranchesResponse, error)
func (*HistoryStore) GetHistoryTreeContainingBranch ¶ added in v1.24.0
func (h *HistoryStore) GetHistoryTreeContainingBranch( ctx context.Context, request *p.InternalGetHistoryTreeContainingBranchRequest, ) (*p.InternalGetHistoryTreeContainingBranchResponse, error)
GetHistoryTreeContainingBranch returns all branch information of a tree
func (*HistoryStore) ReadHistoryBranch ¶ added in v1.13.0
func (h *HistoryStore) ReadHistoryBranch( ctx context.Context, request *p.InternalReadHistoryBranchRequest, ) (*p.InternalReadHistoryBranchResponse, error)
ReadHistoryBranch returns history node data for a branch NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
type MatchingTaskStore ¶ added in v1.13.0
func NewMatchingTaskStore ¶ added in v1.13.0
func NewMatchingTaskStore( session gocql.Session, logger log.Logger, ) *MatchingTaskStore
func (*MatchingTaskStore) Close ¶ added in v1.13.0
func (d *MatchingTaskStore) Close()
func (*MatchingTaskStore) CompleteTasksLessThan ¶ added in v1.13.0
func (d *MatchingTaskStore) CompleteTasksLessThan( ctx context.Context, request *p.CompleteTasksLessThanRequest, ) (int, error)
CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will be returned to the caller
func (*MatchingTaskStore) CountTaskQueuesByBuildId ¶ added in v1.21.0
func (d *MatchingTaskStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (int, error)
func (*MatchingTaskStore) CreateTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) CreateTaskQueue( ctx context.Context, request *p.InternalCreateTaskQueueRequest, ) error
func (*MatchingTaskStore) CreateTasks ¶ added in v1.13.0
func (d *MatchingTaskStore) CreateTasks( ctx context.Context, request *p.InternalCreateTasksRequest, ) (*p.CreateTasksResponse, error)
CreateTasks add tasks
func (*MatchingTaskStore) DeleteTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) DeleteTaskQueue( ctx context.Context, request *p.DeleteTaskQueueRequest, ) error
func (*MatchingTaskStore) GetName ¶ added in v1.13.0
func (d *MatchingTaskStore) GetName() string
func (*MatchingTaskStore) GetTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) GetTaskQueue( ctx context.Context, request *p.InternalGetTaskQueueRequest, ) (*p.InternalGetTaskQueueResponse, error)
func (*MatchingTaskStore) GetTaskQueueUserData ¶ added in v1.21.0
func (d *MatchingTaskStore) GetTaskQueueUserData( ctx context.Context, request *p.GetTaskQueueUserDataRequest, ) (*p.InternalGetTaskQueueUserDataResponse, error)
func (*MatchingTaskStore) GetTaskQueuesByBuildId ¶ added in v1.21.0
func (d *MatchingTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) ([]string, error)
func (*MatchingTaskStore) GetTasks ¶ added in v1.13.0
func (d *MatchingTaskStore) GetTasks( ctx context.Context, request *p.GetTasksRequest, ) (*p.InternalGetTasksResponse, error)
GetTasks get a task
func (*MatchingTaskStore) ListTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) ListTaskQueue( _ context.Context, _ *p.ListTaskQueueRequest, ) (*p.InternalListTaskQueueResponse, error)
func (*MatchingTaskStore) ListTaskQueueUserDataEntries ¶ added in v1.21.0
func (d *MatchingTaskStore) ListTaskQueueUserDataEntries(ctx context.Context, request *p.ListTaskQueueUserDataEntriesRequest) (*p.InternalListTaskQueueUserDataEntriesResponse, error)
func (*MatchingTaskStore) UpdateTaskQueue ¶ added in v1.13.0
func (d *MatchingTaskStore) UpdateTaskQueue( ctx context.Context, request *p.InternalUpdateTaskQueueRequest, ) (*p.UpdateTaskQueueResponse, error)
UpdateTaskQueue update task queue
func (*MatchingTaskStore) UpdateTaskQueueUserData ¶ added in v1.21.0
func (d *MatchingTaskStore) UpdateTaskQueueUserData( ctx context.Context, request *p.InternalUpdateTaskQueueUserDataRequest, ) error
type MetadataStore ¶ added in v1.13.0
type MetadataStore struct {
// contains filtered or unexported fields
}
func (*MetadataStore) Close ¶ added in v1.13.0
func (m *MetadataStore) Close()
func (*MetadataStore) CreateNamespace ¶ added in v1.13.0
func (m *MetadataStore) CreateNamespace( ctx context.Context, request *p.InternalCreateNamespaceRequest, ) (*p.CreateNamespaceResponse, error)
CreateNamespace create a namespace Cassandra does not support conditional updates across multiple tables. For this reason we have to first insert into 'Namespaces' table and then do a conditional insert into namespaces_by_name table. If the conditional write fails we delete the orphaned entry from namespaces table. There is a chance delete entry could fail and we never delete the orphaned entry from namespaces table. We might need a background job to delete those orphaned record.
func (*MetadataStore) CreateNamespaceInV2Table ¶ added in v1.13.0
func (m *MetadataStore) CreateNamespaceInV2Table( ctx context.Context, request *p.InternalCreateNamespaceRequest, ) (*p.CreateNamespaceResponse, error)
CreateNamespaceInV2Table is the temporary function used by namespace v1 -> v2 migration
func (*MetadataStore) DeleteNamespace ¶ added in v1.13.0
func (m *MetadataStore) DeleteNamespace( ctx context.Context, request *p.DeleteNamespaceRequest, ) error
func (*MetadataStore) DeleteNamespaceByName ¶ added in v1.13.0
func (m *MetadataStore) DeleteNamespaceByName( ctx context.Context, request *p.DeleteNamespaceByNameRequest, ) error
func (*MetadataStore) GetMetadata ¶ added in v1.13.0
func (m *MetadataStore) GetMetadata( ctx context.Context, ) (*p.GetMetadataResponse, error)
func (*MetadataStore) GetName ¶ added in v1.13.0
func (m *MetadataStore) GetName() string
func (*MetadataStore) GetNamespace ¶ added in v1.13.0
func (m *MetadataStore) GetNamespace( ctx context.Context, request *p.GetNamespaceRequest, ) (*p.InternalGetNamespaceResponse, error)
func (*MetadataStore) ListNamespaces ¶ added in v1.13.0
func (m *MetadataStore) ListNamespaces( ctx context.Context, request *p.InternalListNamespacesRequest, ) (*p.InternalListNamespacesResponse, error)
func (*MetadataStore) RenameNamespace ¶ added in v1.16.0
func (m *MetadataStore) RenameNamespace( ctx context.Context, request *p.InternalRenameNamespaceRequest, ) error
RenameNamespace should be used with caution. Not every namespace can be renamed because namespace name are stored in the database. It may leave database in inconsistent state and must be retried until success. Step 1. Update row in `namespaces_by_id` table with the new name. Step 2. Batch of:
Insert row into `namespaces` table with new name and new `notification_version`. Delete row from `namespaces` table with old name. Update `notification_version` in metadata row.
NOTE: `namespaces_by_id` is currently used only for `DescribeNamespace` API and namespace Id collision check.
func (*MetadataStore) UpdateNamespace ¶ added in v1.13.0
func (m *MetadataStore) UpdateNamespace( ctx context.Context, request *p.InternalUpdateNamespaceRequest, ) error
type MutableStateStore ¶ added in v1.13.0
func NewMutableStateStore ¶ added in v1.13.0
func NewMutableStateStore(session gocql.Session) *MutableStateStore
func (*MutableStateStore) ConflictResolveWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) ConflictResolveWorkflowExecution( ctx context.Context, request *p.InternalConflictResolveWorkflowExecutionRequest, ) error
func (*MutableStateStore) CreateWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) CreateWorkflowExecution( ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest, ) (*p.InternalCreateWorkflowExecutionResponse, error)
func (*MutableStateStore) DeleteCurrentWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) DeleteCurrentWorkflowExecution( ctx context.Context, request *p.DeleteCurrentWorkflowExecutionRequest, ) error
func (*MutableStateStore) DeleteWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) DeleteWorkflowExecution( ctx context.Context, request *p.DeleteWorkflowExecutionRequest, ) error
func (*MutableStateStore) GetCurrentExecution ¶ added in v1.13.0
func (d *MutableStateStore) GetCurrentExecution( ctx context.Context, request *p.GetCurrentExecutionRequest, ) (*p.InternalGetCurrentExecutionResponse, error)
func (*MutableStateStore) GetWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) GetWorkflowExecution( ctx context.Context, request *p.GetWorkflowExecutionRequest, ) (*p.InternalGetWorkflowExecutionResponse, error)
func (*MutableStateStore) ListConcreteExecutions ¶ added in v1.13.0
func (d *MutableStateStore) ListConcreteExecutions( ctx context.Context, request *p.ListConcreteExecutionsRequest, ) (*p.InternalListConcreteExecutionsResponse, error)
func (*MutableStateStore) SetWorkflowExecution ¶ added in v1.16.0
func (d *MutableStateStore) SetWorkflowExecution( ctx context.Context, request *p.InternalSetWorkflowExecutionRequest, ) error
func (*MutableStateStore) UpdateWorkflowExecution ¶ added in v1.13.0
func (d *MutableStateStore) UpdateWorkflowExecution( ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest, ) error
type MutableStateTaskStore ¶ added in v1.13.0
func NewMutableStateTaskStore ¶ added in v1.13.0
func NewMutableStateTaskStore(session gocql.Session) *MutableStateTaskStore
func (*MutableStateTaskStore) AddHistoryTasks ¶ added in v1.16.0
func (d *MutableStateTaskStore) AddHistoryTasks( ctx context.Context, request *p.InternalAddHistoryTasksRequest, ) error
func (*MutableStateTaskStore) CompleteHistoryTask ¶ added in v1.16.0
func (d *MutableStateTaskStore) CompleteHistoryTask( ctx context.Context, request *p.CompleteHistoryTaskRequest, ) error
func (*MutableStateTaskStore) DeleteReplicationTaskFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ( ctx context.Context, request *p.DeleteReplicationTaskFromDLQRequest, ) error
func (*MutableStateTaskStore) GetHistoryTasks ¶ added in v1.16.0
func (d *MutableStateTaskStore) GetHistoryTasks( ctx context.Context, request *p.GetHistoryTasksRequest, ) (*p.InternalGetHistoryTasksResponse, error)
func (*MutableStateTaskStore) GetReplicationTasksFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ( ctx context.Context, request *p.GetReplicationTasksFromDLQRequest, ) (*p.InternalGetHistoryTasksResponse, error)
func (*MutableStateTaskStore) IsReplicationDLQEmpty ¶ added in v1.21.0
func (d *MutableStateTaskStore) IsReplicationDLQEmpty( ctx context.Context, request *p.GetReplicationTasksFromDLQRequest, ) (bool, error)
func (*MutableStateTaskStore) PutReplicationTaskToDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) PutReplicationTaskToDLQ( ctx context.Context, request *p.PutReplicationTaskToDLQRequest, ) error
func (*MutableStateTaskStore) RangeCompleteHistoryTasks ¶ added in v1.16.0
func (d *MutableStateTaskStore) RangeCompleteHistoryTasks( ctx context.Context, request *p.RangeCompleteHistoryTasksRequest, ) error
func (*MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ ¶ added in v1.13.0
func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *p.RangeDeleteReplicationTaskFromDLQRequest, ) error
type NexusEndpointStore ¶ added in v1.25.0
type NexusEndpointStore struct {
// contains filtered or unexported fields
}
func (*NexusEndpointStore) Close ¶ added in v1.25.0
func (s *NexusEndpointStore) Close()
func (*NexusEndpointStore) CreateOrUpdateNexusEndpoint ¶ added in v1.25.0
func (s *NexusEndpointStore) CreateOrUpdateNexusEndpoint( ctx context.Context, request *p.InternalCreateOrUpdateNexusEndpointRequest, ) error
func (*NexusEndpointStore) DeleteNexusEndpoint ¶ added in v1.25.0
func (s *NexusEndpointStore) DeleteNexusEndpoint( ctx context.Context, request *p.DeleteNexusEndpointRequest, ) error
func (*NexusEndpointStore) GetName ¶ added in v1.25.0
func (s *NexusEndpointStore) GetName() string
func (*NexusEndpointStore) GetNexusEndpoint ¶ added in v1.25.0
func (s *NexusEndpointStore) GetNexusEndpoint( ctx context.Context, request *p.GetNexusEndpointRequest, ) (*p.InternalNexusEndpoint, error)
func (*NexusEndpointStore) ListNexusEndpoints ¶ added in v1.25.0
func (s *NexusEndpointStore) ListNexusEndpoints( ctx context.Context, request *p.ListNexusEndpointsRequest, ) (*p.InternalListNexusEndpointsResponse, error)
type PersistedTypeMismatchError ¶ added in v0.27.0
type PersistedTypeMismatchError struct {
Msg string
}
PersistedTypeMismatchError is an error type returned when a persisted cassandra value does not match the expected type.
func (PersistedTypeMismatchError) Error ¶ added in v0.27.0
func (f PersistedTypeMismatchError) Error() string
type Queue ¶ added in v1.23.0
type Queue struct { Metadata *persistencespb.Queue Version int64 }
type QueueStore ¶ added in v1.13.0
type QueueStore struct {
// contains filtered or unexported fields
}
func (*QueueStore) Close ¶ added in v1.13.0
func (q *QueueStore) Close()
func (*QueueStore) DeleteMessageFromDLQ ¶ added in v1.13.0
func (q *QueueStore) DeleteMessageFromDLQ( ctx context.Context, messageID int64, ) error
func (*QueueStore) DeleteMessagesBefore ¶ added in v1.13.0
func (q *QueueStore) DeleteMessagesBefore( ctx context.Context, messageID int64, ) error
func (*QueueStore) EnqueueMessage ¶ added in v1.13.0
func (*QueueStore) EnqueueMessageToDLQ ¶ added in v1.13.0
func (*QueueStore) GetAckLevels ¶ added in v1.13.0
func (q *QueueStore) GetAckLevels( ctx context.Context, ) (*persistence.InternalQueueMetadata, error)
func (*QueueStore) GetDLQAckLevels ¶ added in v1.13.0
func (q *QueueStore) GetDLQAckLevels( ctx context.Context, ) (*persistence.InternalQueueMetadata, error)
func (*QueueStore) RangeDeleteMessagesFromDLQ ¶ added in v1.13.0
func (*QueueStore) ReadMessages ¶ added in v1.13.0
func (q *QueueStore) ReadMessages( ctx context.Context, lastMessageID int64, maxCount int, ) ([]*persistence.QueueMessage, error)
func (*QueueStore) ReadMessagesFromDLQ ¶ added in v1.13.0
func (q *QueueStore) ReadMessagesFromDLQ( ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte, ) ([]*persistence.QueueMessage, []byte, error)
func (*QueueStore) UpdateAckLevel ¶ added in v1.13.0
func (q *QueueStore) UpdateAckLevel( ctx context.Context, metadata *persistence.InternalQueueMetadata, ) error
func (*QueueStore) UpdateDLQAckLevel ¶ added in v1.13.0
func (q *QueueStore) UpdateDLQAckLevel( ctx context.Context, metadata *persistence.InternalQueueMetadata, ) error
type SchemaVersionReader ¶ added in v1.5.7
type SchemaVersionReader struct {
// contains filtered or unexported fields
}
func NewSchemaVersionReader ¶ added in v1.5.7
func NewSchemaVersionReader(session gocql.Session) *SchemaVersionReader
func (*SchemaVersionReader) ReadSchemaVersion ¶ added in v1.5.7
func (svr *SchemaVersionReader) ReadSchemaVersion(keyspace string) (string, error)
ReadSchemaVersion returns the current schema version for the Keyspace
type ShardStore ¶ added in v1.13.0
func NewShardStore ¶ added in v1.13.0
func (*ShardStore) AssertShardOwnership ¶ added in v1.17.0
func (d *ShardStore) AssertShardOwnership( ctx context.Context, request *p.AssertShardOwnershipRequest, ) error
func (*ShardStore) Close ¶ added in v1.13.0
func (d *ShardStore) Close()
func (*ShardStore) GetClusterName ¶ added in v1.13.0
func (d *ShardStore) GetClusterName() string
func (*ShardStore) GetName ¶ added in v1.13.0
func (d *ShardStore) GetName() string
func (*ShardStore) GetOrCreateShard ¶ added in v1.14.0
func (d *ShardStore) GetOrCreateShard( ctx context.Context, request *p.InternalGetOrCreateShardRequest, ) (*p.InternalGetOrCreateShardResponse, error)
func (*ShardStore) UpdateShard ¶ added in v1.13.0
func (d *ShardStore) UpdateShard( ctx context.Context, request *p.InternalUpdateShardRequest, ) error
type TestCluster ¶
type TestCluster struct {
// contains filtered or unexported fields
}
TestCluster allows executing cassandra operations in testing.
func NewTestCluster ¶
func NewTestCluster(keyspace, username, password, host string, port int, schemaDir string, faultInjection *config.FaultInjection, logger log.Logger) *TestCluster
NewTestCluster returns a new cassandra test cluster
func (*TestCluster) Config ¶
func (s *TestCluster) Config() config.Persistence
Config returns the persistence config for connecting to this test cluster
func (*TestCluster) CreateDatabase ¶
func (s *TestCluster) CreateDatabase()
CreateDatabase from PersistenceTestCluster interface
func (*TestCluster) CreateSession ¶
func (s *TestCluster) CreateSession( keyspace string, )
CreateSession from PersistenceTestCluster interface
func (*TestCluster) DatabaseName ¶
func (s *TestCluster) DatabaseName() string
DatabaseName from PersistenceTestCluster interface
func (*TestCluster) DropDatabase ¶
func (s *TestCluster) DropDatabase()
DropDatabase from PersistenceTestCluster interface
func (*TestCluster) GetSession ¶ added in v1.21.0
func (s *TestCluster) GetSession() commongocql.Session
func (*TestCluster) LoadSchema ¶
func (s *TestCluster) LoadSchema(schemaFile string)
LoadSchema from PersistenceTestCluster interface
func (*TestCluster) SetupTestDatabase ¶
func (s *TestCluster) SetupTestDatabase()
SetupTestDatabase from PersistenceTestCluster interface
func (*TestCluster) TearDownTestDatabase ¶
func (s *TestCluster) TearDownTestDatabase()
TearDownTestDatabase from PersistenceTestCluster interface
Source Files ¶
- cluster_metadata_store.go
- common.go
- errors.go
- execution_store.go
- factory.go
- helpers.go
- history_store.go
- matching_task_store.go
- metadata_store.go
- mutable_state_store.go
- mutable_state_task_store.go
- nexus_endpoint_store.go
- queue_store.go
- queue_v2_store.go
- schema_version_reader.go
- shard_store.go
- test.go
- util.go
- version_checker.go