cassandra

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2022 License: MIT Imports: 36 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrGetSchemaVersion = errors.New("failed to get current schema version from cassandra")
)

Functions

func CheckCompatibleVersion added in v1.12.0

func CheckCompatibleVersion(
	cfg config.Cassandra,
	r resolver.ServiceResolver,
	expectedVersion string,
) error

CheckCompatibleVersion check the version compatibility

func CreateCassandraKeyspace

func CreateCassandraKeyspace(s gocql.Session, keyspace string, replicas int, overwrite bool, logger log.Logger) (err error)

CreateCassandraKeyspace creates the keyspace using this session for given replica count

func DropCassandraKeyspace

func DropCassandraKeyspace(s gocql.Session, keyspace string, logger log.Logger) (err error)

DropCassandraKeyspace drops the given keyspace, if it exists

func GetTaskTTL added in v0.27.0

func GetTaskTTL(expireTime *time.Time) int64

func NewClusterMetadataStore added in v1.13.0

func NewClusterMetadataStore(
	session gocql.Session,
	logger log.Logger,
) (p.ClusterMetadataStore, error)

NewClusterMetadataStore is used to create an instance of ClusterMetadataStore implementation

func NewMetadataStore added in v1.13.0

func NewMetadataStore(
	currentClusterName string,
	session gocql.Session,
	logger log.Logger,
) (p.MetadataStore, error)

NewMetadataStore is used to create an instance of the Namespace MetadataStore implementation

func NewQueueStore added in v1.13.0

func NewQueueStore(
	queueType persistence.QueueType,
	session gocql.Session,
	logger log.Logger,
) (persistence.Queue, error)

func VerifyCompatibleVersion added in v1.5.7

func VerifyCompatibleVersion(
	cfg config.Persistence,
	r resolver.ServiceResolver,
) error

VerifyCompatibleVersion ensures that the installed version of temporal and visibility keyspaces is greater than or equal to the expected version. In most cases, the versions should match. However if after a schema upgrade there is a code rollback, the code version (expected version) would fall lower than the actual version in cassandra.

Types

type ClusterMetadataStore added in v1.13.0

type ClusterMetadataStore struct {
	// contains filtered or unexported fields
}

func (*ClusterMetadataStore) Close added in v1.13.0

func (m *ClusterMetadataStore) Close()

func (*ClusterMetadataStore) DeleteClusterMetadata added in v1.14.0

func (m *ClusterMetadataStore) DeleteClusterMetadata(
	_ context.Context,
	request *p.InternalDeleteClusterMetadataRequest,
) error

func (*ClusterMetadataStore) GetClusterMembers added in v1.13.0

func (*ClusterMetadataStore) GetClusterMetadata added in v1.13.0

func (*ClusterMetadataStore) GetName added in v1.13.0

func (m *ClusterMetadataStore) GetName() string

func (*ClusterMetadataStore) ListClusterMetadata added in v1.14.0

func (*ClusterMetadataStore) PruneClusterMembership added in v1.13.0

func (m *ClusterMetadataStore) PruneClusterMembership(
	_ context.Context,
	request *p.PruneClusterMembershipRequest,
) error

func (*ClusterMetadataStore) SaveClusterMetadata added in v1.13.0

func (m *ClusterMetadataStore) SaveClusterMetadata(
	_ context.Context,
	request *p.InternalSaveClusterMetadataRequest,
) (bool, error)

func (*ClusterMetadataStore) UpsertClusterMembership added in v1.13.0

func (m *ClusterMetadataStore) UpsertClusterMembership(
	_ context.Context,
	request *p.UpsertClusterMembershipRequest,
) error

type ExecutionStore added in v1.13.0

type ExecutionStore struct {
	*HistoryStore
	*MutableStateStore
	*MutableStateTaskStore
}

func NewExecutionStore added in v1.12.0

func NewExecutionStore(
	session gocql.Session,
	logger log.Logger,
) *ExecutionStore

func (*ExecutionStore) Close added in v1.13.0

func (d *ExecutionStore) Close()

func (*ExecutionStore) ConflictResolveWorkflowExecution added in v1.13.0

func (d *ExecutionStore) ConflictResolveWorkflowExecution(
	ctx context.Context,
	request *p.InternalConflictResolveWorkflowExecutionRequest,
) error

func (*ExecutionStore) CreateWorkflowExecution added in v1.13.0

func (*ExecutionStore) GetName added in v1.13.0

func (d *ExecutionStore) GetName() string

func (*ExecutionStore) UpdateWorkflowExecution added in v1.13.0

func (d *ExecutionStore) UpdateWorkflowExecution(
	ctx context.Context,
	request *p.InternalUpdateWorkflowExecutionRequest,
) error

type Factory

type Factory struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Factory vends datastore implementations backed by cassandra

func NewFactory

func NewFactory(
	cfg config.Cassandra,
	r resolver.ServiceResolver,
	clusterName string,
	logger log.Logger,
) *Factory

NewFactory returns an instance of a factory object which can be used to create data stores that are backed by cassandra

func NewFactoryFromSession added in v1.12.0

func NewFactoryFromSession(
	cfg config.Cassandra,
	clusterName string,
	logger log.Logger,
	session gocql.Session,
) *Factory

NewFactoryFromSession returns an instance of a factory object from the given session.

func (*Factory) Close

func (f *Factory) Close()

Close closes the factory

func (*Factory) NewClusterMetadataStore added in v0.27.0

func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)

NewClusterMetadataStore returns a metadata store

func (*Factory) NewExecutionStore

func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)

NewExecutionStore returns a new ExecutionStore.

func (*Factory) NewMetadataStore

func (f *Factory) NewMetadataStore() (p.MetadataStore, error)

NewMetadataStore returns a metadata store

func (*Factory) NewQueue added in v0.27.0

func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error)

NewQueue returns a new queue backed by cassandra

func (*Factory) NewShardStore

func (f *Factory) NewShardStore() (p.ShardStore, error)

NewShardStore returns a new shard store

func (*Factory) NewTaskStore

func (f *Factory) NewTaskStore() (p.TaskStore, error)

NewTaskStore returns a new task store

type FieldNotFoundError added in v0.27.0

type FieldNotFoundError struct {
	Msg string
}

FieldNotFoundError is an error type returned when an untyped query return does not contain the expected fields.

func (FieldNotFoundError) Error added in v0.27.0

func (f FieldNotFoundError) Error() string

type HistoryStore added in v1.13.0

type HistoryStore struct {
	Session gocql.Session
	Logger  log.Logger
}

func NewHistoryStore added in v1.13.0

func NewHistoryStore(
	session gocql.Session,
	logger log.Logger,
) *HistoryStore

func (*HistoryStore) AppendHistoryNodes added in v1.13.0

func (h *HistoryStore) AppendHistoryNodes(
	_ context.Context,
	request *p.InternalAppendHistoryNodesRequest,
) error

AppendHistoryNodes upsert a batch of events as a single node to a history branch Note that it's not allowed to append above the branch's ancestors' nodes, which means nodeID >= ForkNodeID

func (*HistoryStore) DeleteHistoryBranch added in v1.13.0

func (h *HistoryStore) DeleteHistoryBranch(
	_ context.Context,
	request *p.InternalDeleteHistoryBranchRequest,
) error

DeleteHistoryBranch removes a branch

func (*HistoryStore) DeleteHistoryNodes added in v1.13.0

func (h *HistoryStore) DeleteHistoryNodes(
	_ context.Context,
	request *p.InternalDeleteHistoryNodesRequest,
) error

DeleteHistoryNodes delete a history node

func (*HistoryStore) ForkHistoryBranch added in v1.13.0

func (h *HistoryStore) ForkHistoryBranch(
	_ context.Context,
	request *p.InternalForkHistoryBranchRequest,
) error

ForkHistoryBranch forks a new branch from an existing branch Note that application must provide a void forking nodeID, it must be a valid nodeID in that branch. A valid forking nodeID can be an ancestor from the existing branch. For example, we have branch B1 with three nodes(1[1,2], 3[3,4,5] and 6[6,7,8]. 1, 3 and 6 are nodeIDs (first eventID of the batch). So B1 looks like this:

     1[1,2]
     /
   3[3,4,5]
  /
6[6,7,8]

Assuming we have branch B2 which contains one ancestor B1 stopping at 6 (exclusive). So B2 inherit nodeID 1 and 3 from B1, and have its own nodeID 6 and 8. Branch B2 looks like this:

  1[1,2]
  /
3[3,4,5]
 \
  6[6,7]
  \
   8[8]

Now we want to fork a new branch B3 from B2. The only valid forking nodeIDs are 3,6 or 8. 1 is not valid because we can't fork from first node. 2/4/5 is NOT valid either because they are inside a batch.

Case #1: If we fork from nodeID 6, then B3 will have an ancestor B1 which stops at 6(exclusive). As we append a batch of events[6,7,8,9] to B3, it will look like :

  1[1,2]
  /
3[3,4,5]
 \
6[6,7,8,9]

Case #2: If we fork from node 8, then B3 will have two ancestors: B1 stops at 6(exclusive) and ancestor B2 stops at 8(exclusive) As we append a batch of events[8,9] to B3, it will look like:

     1[1,2]
     /
   3[3,4,5]
  /
6[6,7]
 \
 8[8,9]

func (*HistoryStore) GetAllHistoryTreeBranches added in v1.13.0

func (*HistoryStore) GetHistoryTree added in v1.13.0

GetHistoryTree returns all branch information of a tree

func (*HistoryStore) ReadHistoryBranch added in v1.13.0

ReadHistoryBranch returns history node data for a branch NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator

type MatchingTaskStore added in v1.13.0

type MatchingTaskStore struct {
	Session gocql.Session
	Logger  log.Logger
}

func NewMatchingTaskStore added in v1.13.0

func NewMatchingTaskStore(
	session gocql.Session,
	logger log.Logger,
) *MatchingTaskStore

func (*MatchingTaskStore) Close added in v1.13.0

func (d *MatchingTaskStore) Close()

func (*MatchingTaskStore) CompleteTask added in v1.13.0

func (d *MatchingTaskStore) CompleteTask(
	_ context.Context,
	request *p.CompleteTaskRequest,
) error

CompleteTask delete a task

func (*MatchingTaskStore) CompleteTasksLessThan added in v1.13.0

func (d *MatchingTaskStore) CompleteTasksLessThan(
	_ context.Context,
	request *p.CompleteTasksLessThanRequest,
) (int, error)

CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will be returned to the caller

func (*MatchingTaskStore) CreateTaskQueue added in v1.13.0

func (d *MatchingTaskStore) CreateTaskQueue(
	_ context.Context,
	request *p.InternalCreateTaskQueueRequest,
) error

func (*MatchingTaskStore) CreateTasks added in v1.13.0

CreateTasks add tasks

func (*MatchingTaskStore) DeleteTaskQueue added in v1.13.0

func (d *MatchingTaskStore) DeleteTaskQueue(
	_ context.Context,
	request *p.DeleteTaskQueueRequest,
) error

func (*MatchingTaskStore) GetName added in v1.13.0

func (d *MatchingTaskStore) GetName() string

func (*MatchingTaskStore) GetTaskQueue added in v1.13.0

func (*MatchingTaskStore) GetTasks added in v1.13.0

GetTasks get a task

func (*MatchingTaskStore) ListTaskQueue added in v1.13.0

func (*MatchingTaskStore) UpdateTaskQueue added in v1.13.0

UpdateTaskQueue update task queue

type MetadataStore added in v1.13.0

type MetadataStore struct {
	// contains filtered or unexported fields
}

func (*MetadataStore) Close added in v1.13.0

func (m *MetadataStore) Close()

func (*MetadataStore) CreateNamespace added in v1.13.0

CreateNamespace create a namespace Cassandra does not support conditional updates across multiple tables. For this reason we have to first insert into 'Namespaces' table and then do a conditional insert into namespaces_by_name table. If the conditional write fails we delete the orphaned entry from namespaces table. There is a chance delete entry could fail and we never delete the orphaned entry from namespaces table. We might need a background job to delete those orphaned record.

func (*MetadataStore) CreateNamespaceInV2Table added in v1.13.0

func (m *MetadataStore) CreateNamespaceInV2Table(
	_ context.Context,
	request *p.InternalCreateNamespaceRequest,
) (*p.CreateNamespaceResponse, error)

CreateNamespaceInV2Table is the temporary function used by namespace v1 -> v2 migration

func (*MetadataStore) DeleteNamespace added in v1.13.0

func (m *MetadataStore) DeleteNamespace(
	_ context.Context,
	request *p.DeleteNamespaceRequest,
) error

func (*MetadataStore) DeleteNamespaceByName added in v1.13.0

func (m *MetadataStore) DeleteNamespaceByName(
	_ context.Context,
	request *p.DeleteNamespaceByNameRequest,
) error

func (*MetadataStore) GetMetadata added in v1.13.0

func (m *MetadataStore) GetMetadata(
	_ context.Context,
) (*p.GetMetadataResponse, error)

func (*MetadataStore) GetName added in v1.13.0

func (m *MetadataStore) GetName() string

func (*MetadataStore) GetNamespace added in v1.13.0

func (*MetadataStore) ListNamespaces added in v1.13.0

func (*MetadataStore) RenameNamespace added in v1.16.0

func (m *MetadataStore) RenameNamespace(
	_ context.Context,
	request *p.InternalRenameNamespaceRequest,
) error

RenameNamespace should be used with caution. Not every namespace can be renamed because namespace name are stored in the database. It may leave database in inconsistent state and must be retried until success. Step 1. Update row in `namespaces_by_id` table with the new name. Step 2. Batch of:

Insert row into `namespaces` table with new name and new `notification_version`.
Delete row from `namespaces` table with old name.
Update `notification_version` in metadata row.

NOTE: `namespaces_by_id` is currently used only for `DescribeNamespace` API and namespace Id collision check.

func (*MetadataStore) UpdateNamespace added in v1.13.0

func (m *MetadataStore) UpdateNamespace(
	_ context.Context,
	request *p.InternalUpdateNamespaceRequest,
) error

type MutableStateStore added in v1.13.0

type MutableStateStore struct {
	Session gocql.Session
	Logger  log.Logger
}

func NewMutableStateStore added in v1.13.0

func NewMutableStateStore(
	session gocql.Session,
	logger log.Logger,
) *MutableStateStore

func (*MutableStateStore) ConflictResolveWorkflowExecution added in v1.13.0

func (d *MutableStateStore) ConflictResolveWorkflowExecution(
	_ context.Context,
	request *p.InternalConflictResolveWorkflowExecutionRequest,
) error

func (*MutableStateStore) CreateWorkflowExecution added in v1.13.0

func (*MutableStateStore) DeleteCurrentWorkflowExecution added in v1.13.0

func (d *MutableStateStore) DeleteCurrentWorkflowExecution(
	_ context.Context,
	request *p.DeleteCurrentWorkflowExecutionRequest,
) error

func (*MutableStateStore) DeleteWorkflowExecution added in v1.13.0

func (d *MutableStateStore) DeleteWorkflowExecution(
	_ context.Context,
	request *p.DeleteWorkflowExecutionRequest,
) error

func (*MutableStateStore) GetCurrentExecution added in v1.13.0

func (*MutableStateStore) GetWorkflowExecution added in v1.13.0

func (*MutableStateStore) ListConcreteExecutions added in v1.13.0

func (*MutableStateStore) SetWorkflowExecution added in v1.16.0

func (d *MutableStateStore) SetWorkflowExecution(
	_ context.Context,
	request *p.InternalSetWorkflowExecutionRequest,
) error

func (*MutableStateStore) UpdateWorkflowExecution added in v1.13.0

func (d *MutableStateStore) UpdateWorkflowExecution(
	_ context.Context,
	request *p.InternalUpdateWorkflowExecutionRequest,
) error

type MutableStateTaskStore added in v1.13.0

type MutableStateTaskStore struct {
	Session gocql.Session
	Logger  log.Logger
}

func NewMutableStateTaskStore added in v1.13.0

func NewMutableStateTaskStore(
	session gocql.Session,
	logger log.Logger,
) *MutableStateTaskStore

func (*MutableStateTaskStore) AddHistoryTasks added in v1.16.0

func (d *MutableStateTaskStore) AddHistoryTasks(
	_ context.Context,
	request *p.InternalAddHistoryTasksRequest,
) error

func (*MutableStateTaskStore) CompleteHistoryTask added in v1.16.0

func (d *MutableStateTaskStore) CompleteHistoryTask(
	_ context.Context,
	request *p.CompleteHistoryTaskRequest,
) error

func (*MutableStateTaskStore) DeleteReplicationTaskFromDLQ added in v1.13.0

func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ(
	_ context.Context,
	request *p.DeleteReplicationTaskFromDLQRequest,
) error

func (*MutableStateTaskStore) GetHistoryTask added in v1.16.0

func (*MutableStateTaskStore) GetHistoryTasks added in v1.16.0

func (*MutableStateTaskStore) GetReplicationTasksFromDLQ added in v1.13.0

func (*MutableStateTaskStore) PutReplicationTaskToDLQ added in v1.13.0

func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(
	_ context.Context,
	request *p.PutReplicationTaskToDLQRequest,
) error

func (*MutableStateTaskStore) RangeCompleteHistoryTasks added in v1.16.0

func (d *MutableStateTaskStore) RangeCompleteHistoryTasks(
	_ context.Context,
	request *p.RangeCompleteHistoryTasksRequest,
) error

func (*MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ added in v1.13.0

func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(
	_ context.Context,
	request *p.RangeDeleteReplicationTaskFromDLQRequest,
) error

type PersistedTypeMismatchError added in v0.27.0

type PersistedTypeMismatchError struct {
	Msg string
}

PersistedTypeMismatchError is an error type returned when a persisted cassandra value does not match the expected type.

func (PersistedTypeMismatchError) Error added in v0.27.0

type QueueStore added in v1.13.0

type QueueStore struct {
	// contains filtered or unexported fields
}

func (*QueueStore) Close added in v1.13.0

func (q *QueueStore) Close()

func (*QueueStore) DeleteMessageFromDLQ added in v1.13.0

func (q *QueueStore) DeleteMessageFromDLQ(
	_ context.Context,
	messageID int64,
) error

func (*QueueStore) DeleteMessagesBefore added in v1.13.0

func (q *QueueStore) DeleteMessagesBefore(
	_ context.Context,
	messageID int64,
) error

func (*QueueStore) EnqueueMessage added in v1.13.0

func (q *QueueStore) EnqueueMessage(
	_ context.Context,
	blob commonpb.DataBlob,
) error

func (*QueueStore) EnqueueMessageToDLQ added in v1.13.0

func (q *QueueStore) EnqueueMessageToDLQ(
	_ context.Context,
	blob commonpb.DataBlob,
) (int64, error)

func (*QueueStore) GetAckLevels added in v1.13.0

func (*QueueStore) GetDLQAckLevels added in v1.13.0

func (q *QueueStore) GetDLQAckLevels(
	_ context.Context,
) (*persistence.InternalQueueMetadata, error)

func (*QueueStore) Init added in v1.13.0

func (q *QueueStore) Init(
	_ context.Context,
	blob *commonpb.DataBlob,
) error

func (*QueueStore) RangeDeleteMessagesFromDLQ added in v1.13.0

func (q *QueueStore) RangeDeleteMessagesFromDLQ(
	_ context.Context,
	firstMessageID int64,
	lastMessageID int64,
) error

func (*QueueStore) ReadMessages added in v1.13.0

func (q *QueueStore) ReadMessages(
	_ context.Context,
	lastMessageID int64,
	maxCount int,
) ([]*persistence.QueueMessage, error)

func (*QueueStore) ReadMessagesFromDLQ added in v1.13.0

func (q *QueueStore) ReadMessagesFromDLQ(
	_ context.Context,
	firstMessageID int64,
	lastMessageID int64,
	pageSize int,
	pageToken []byte,
) ([]*persistence.QueueMessage, []byte, error)

func (*QueueStore) UpdateAckLevel added in v1.13.0

func (q *QueueStore) UpdateAckLevel(
	_ context.Context,
	metadata *persistence.InternalQueueMetadata,
) error

func (*QueueStore) UpdateDLQAckLevel added in v1.13.0

func (q *QueueStore) UpdateDLQAckLevel(
	_ context.Context,
	metadata *persistence.InternalQueueMetadata,
) error

type SchemaVersionReader added in v1.5.7

type SchemaVersionReader struct {
	// contains filtered or unexported fields
}

func NewSchemaVersionReader added in v1.5.7

func NewSchemaVersionReader(session gocql.Session) *SchemaVersionReader

func (*SchemaVersionReader) ReadSchemaVersion added in v1.5.7

func (svr *SchemaVersionReader) ReadSchemaVersion(keyspace string) (string, error)

ReadSchemaVersion returns the current schema version for the Keyspace

type ShardStore added in v1.13.0

type ShardStore struct {
	ClusterName string
	Session     gocql.Session
	Logger      log.Logger
}

func NewShardStore added in v1.13.0

func NewShardStore(
	clusterName string,
	session gocql.Session,
	logger log.Logger,
) *ShardStore

func (*ShardStore) Close added in v1.13.0

func (d *ShardStore) Close()

func (*ShardStore) GetClusterName added in v1.13.0

func (d *ShardStore) GetClusterName() string

func (*ShardStore) GetName added in v1.13.0

func (d *ShardStore) GetName() string

func (*ShardStore) GetOrCreateShard added in v1.14.0

func (*ShardStore) UpdateShard added in v1.13.0

func (d *ShardStore) UpdateShard(
	_ context.Context,
	request *p.InternalUpdateShardRequest,
) error

type TestCluster

type TestCluster struct {
	// contains filtered or unexported fields
}

TestCluster allows executing cassandra operations in testing.

func NewTestCluster

func NewTestCluster(keyspace, username, password, host string, port int, schemaDir string, faultInjection *config.FaultInjection, logger log.Logger) *TestCluster

NewTestCluster returns a new cassandra test cluster

func (*TestCluster) Config

func (s *TestCluster) Config() config.Persistence

Config returns the persistence config for connecting to this test cluster

func (*TestCluster) CreateDatabase

func (s *TestCluster) CreateDatabase()

CreateDatabase from PersistenceTestCluster interface

func (*TestCluster) CreateSession

func (s *TestCluster) CreateSession(
	keyspace string,
)

CreateSession from PersistenceTestCluster interface

func (*TestCluster) DatabaseName

func (s *TestCluster) DatabaseName() string

DatabaseName from PersistenceTestCluster interface

func (*TestCluster) DropDatabase

func (s *TestCluster) DropDatabase()

DropDatabase from PersistenceTestCluster interface

func (*TestCluster) LoadSchema

func (s *TestCluster) LoadSchema(schemaFile string)

LoadSchema from PersistenceTestCluster interface

func (*TestCluster) SetupTestDatabase

func (s *TestCluster) SetupTestDatabase()

SetupTestDatabase from PersistenceTestCluster interface

func (*TestCluster) TearDownTestDatabase

func (s *TestCluster) TearDownTestDatabase()

TearDownTestDatabase from PersistenceTestCluster interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL