Documentation ¶
Index ¶
- Constants
- Variables
- func FromYDBDateTime(t time.Time) time.Time
- func GetYDBAddress() string
- func GetYDBPort() int
- func NewClusterMetadataStore(client *xydb.Client, logger log.Logger) (p.ClusterMetadataStore, error)
- func NewMetadataStore(currentClusterName string, client *xydb.Client, logger log.Logger) (p.MetadataStore, error)
- func NewQueueStore(queueType persistence.QueueType, client *xydb.Client, logger log.Logger) (persistence.Queue, error)
- func NewYDBAbstractDataStoreFactory() client.AbstractDataStoreFactory
- func NewYDBClientFromConfig(cfg config.CustomDatastoreConfig, r resolver.ServiceResolver, ...) (*xydb.Client, error)
- func OptionsToYDBConfig(options map[string]any) (xydb.Config, error)
- func ToShardIDColumnValue(shardID int32) uint32
- func ToYDBDateTime(t time.Time) time.Time
- type ClusterMetadataStore
- func (m *ClusterMetadataStore) Close()
- func (m *ClusterMetadataStore) DeleteClusterMetadata(ctx context.Context, request *p.InternalDeleteClusterMetadataRequest) error
- func (m *ClusterMetadataStore) GetClusterMembers(ctx context.Context, request *p.GetClusterMembersRequest) (resp *p.GetClusterMembersResponse, err error)
- func (m *ClusterMetadataStore) GetClusterMetadata(ctx context.Context, request *p.InternalGetClusterMetadataRequest) (resp *p.InternalGetClusterMetadataResponse, err error)
- func (m *ClusterMetadataStore) GetName() string
- func (m *ClusterMetadataStore) ListClusterMetadata(ctx context.Context, request *p.InternalListClusterMetadataRequest) (resp *p.InternalListClusterMetadataResponse, err error)
- func (m *ClusterMetadataStore) PruneClusterMembership(ctx context.Context, request *p.PruneClusterMembershipRequest) error
- func (m *ClusterMetadataStore) SaveClusterMetadata(ctx context.Context, request *p.InternalSaveClusterMetadataRequest) (rv bool, err error)
- func (m *ClusterMetadataStore) UpsertClusterMembership(ctx context.Context, request *p.UpsertClusterMembershipRequest) error
- type ExecutionStore
- func (d *ExecutionStore) Close()
- func (d *ExecutionStore) ConflictResolveWorkflowExecution(ctx context.Context, ...) error
- func (d *ExecutionStore) CreateWorkflowExecution(ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest) (*p.InternalCreateWorkflowExecutionResponse, error)
- func (d *ExecutionStore) GetName() string
- func (d *ExecutionStore) UpdateWorkflowExecution(ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest) error
- type Factory
- func (f *Factory) Close()
- func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
- func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
- func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
- func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error)
- func (f *Factory) NewQueueV2() (p.QueueV2, error)
- func (f *Factory) NewShardStore() (p.ShardStore, error)
- func (f *Factory) NewTaskStore() (p.TaskStore, error)
- type HistoryStore
- func (h *HistoryStore) AppendHistoryNodes(ctx context.Context, request *p.InternalAppendHistoryNodesRequest) error
- func (h *HistoryStore) DeleteHistoryBranch(ctx context.Context, request *p.InternalDeleteHistoryBranchRequest) error
- func (h *HistoryStore) DeleteHistoryNodes(ctx context.Context, request *p.InternalDeleteHistoryNodesRequest) (err error)
- func (h *HistoryStore) ForkHistoryBranch(ctx context.Context, request *p.InternalForkHistoryBranchRequest) (err error)
- func (h *HistoryStore) GetAllHistoryTreeBranches(ctx context.Context, request *p.GetAllHistoryTreeBranchesRequest) (resp *p.InternalGetAllHistoryTreeBranchesResponse, err error)
- func (h *HistoryStore) GetHistoryTree(ctx context.Context, request *p.GetHistoryTreeRequest) (resp *p.InternalGetHistoryTreeResponse, err error)
- func (h *HistoryStore) ReadHistoryBranch(ctx context.Context, request *p.InternalReadHistoryBranchRequest) (resp *p.InternalReadHistoryBranchResponse, err error)
- type MatchingTaskStore
- func (d *MatchingTaskStore) Close()
- func (d *MatchingTaskStore) CompleteTask(ctx context.Context, request *p.CompleteTaskRequest) error
- func (d *MatchingTaskStore) CompleteTasksLessThan(ctx context.Context, request *p.CompleteTasksLessThanRequest) (int, error)
- func (d *MatchingTaskStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (count int, err error)
- func (d *MatchingTaskStore) CreateTaskQueue(ctx context.Context, request *p.InternalCreateTaskQueueRequest) error
- func (d *MatchingTaskStore) CreateTasks(ctx context.Context, request *p.InternalCreateTasksRequest) (*p.CreateTasksResponse, error)
- func (d *MatchingTaskStore) DeleteTaskQueue(ctx context.Context, request *p.DeleteTaskQueueRequest) error
- func (d *MatchingTaskStore) GetName() string
- func (d *MatchingTaskStore) GetTaskQueue(ctx context.Context, request *p.InternalGetTaskQueueRequest) (resp *p.InternalGetTaskQueueResponse, err error)
- func (d *MatchingTaskStore) GetTaskQueueUserData(ctx context.Context, request *p.GetTaskQueueUserDataRequest) (resp *p.InternalGetTaskQueueUserDataResponse, err error)
- func (d *MatchingTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) (rv []string, err error)
- func (d *MatchingTaskStore) GetTasks(ctx context.Context, request *p.GetTasksRequest) (resp *p.InternalGetTasksResponse, err error)
- func (d *MatchingTaskStore) ListTaskQueue(_ context.Context, _ *p.ListTaskQueueRequest) (*p.InternalListTaskQueueResponse, error)
- func (d *MatchingTaskStore) ListTaskQueueUserDataEntries(ctx context.Context, request *p.ListTaskQueueUserDataEntriesRequest) (resp *p.InternalListTaskQueueUserDataEntriesResponse, err error)
- func (d *MatchingTaskStore) UpdateTaskQueue(ctx context.Context, request *p.InternalUpdateTaskQueueRequest) (*p.UpdateTaskQueueResponse, error)
- func (d *MatchingTaskStore) UpdateTaskQueueUserData(ctx context.Context, request *p.InternalUpdateTaskQueueUserDataRequest) (err error)
- type MetadataStore
- func (m *MetadataStore) Close()
- func (m *MetadataStore) CreateNamespace(ctx context.Context, request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
- func (m *MetadataStore) DeleteNamespace(ctx context.Context, request *p.DeleteNamespaceRequest) (err error)
- func (m *MetadataStore) DeleteNamespaceByName(ctx context.Context, request *p.DeleteNamespaceByNameRequest) error
- func (m *MetadataStore) GetMetadata(ctx context.Context) (resp *p.GetMetadataResponse, err error)
- func (m *MetadataStore) GetName() string
- func (m *MetadataStore) GetNamespace(ctx context.Context, request *p.GetNamespaceRequest) (resp *p.InternalGetNamespaceResponse, err error)
- func (m *MetadataStore) ListNamespaces(ctx context.Context, request *p.InternalListNamespacesRequest) (resp *p.InternalListNamespacesResponse, err error)
- func (m *MetadataStore) RenameNamespace(ctx context.Context, request *p.InternalRenameNamespaceRequest) error
- func (m *MetadataStore) UpdateNamespace(ctx context.Context, request *p.InternalUpdateNamespaceRequest) error
- type MutableStateStore
- func (d *MutableStateStore) DeleteCurrentWorkflowExecution(ctx context.Context, request *p.DeleteCurrentWorkflowExecutionRequest) error
- func (d *MutableStateStore) DeleteWorkflowExecution(ctx context.Context, request *p.DeleteWorkflowExecutionRequest) error
- func (d *MutableStateStore) GetCurrentExecution(ctx context.Context, request *p.GetCurrentExecutionRequest) (resp *p.InternalGetCurrentExecutionResponse, err error)
- func (d *MutableStateStore) GetWorkflowExecution(ctx context.Context, request *p.GetWorkflowExecutionRequest) (resp *p.InternalGetWorkflowExecutionResponse, err error)
- func (d *MutableStateStore) ListConcreteExecutions(ctx context.Context, request *p.ListConcreteExecutionsRequest) (*p.InternalListConcreteExecutionsResponse, error)
- func (d *MutableStateStore) SetWorkflowExecution(ctx context.Context, request *p.InternalSetWorkflowExecutionRequest) (err error)
- type MutableStateTaskStore
- func (d *MutableStateTaskStore) AddHistoryTasks(ctx context.Context, request *p.InternalAddHistoryTasksRequest) error
- func (d *MutableStateTaskStore) CompleteHistoryTask(ctx context.Context, request *p.CompleteHistoryTaskRequest) (err error)
- func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ(ctx context.Context, request *p.DeleteReplicationTaskFromDLQRequest) error
- func (d *MutableStateTaskStore) GetHistoryTasks(ctx context.Context, request *p.GetHistoryTasksRequest) (resp *p.InternalGetHistoryTasksResponse, err error)
- func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ(ctx context.Context, request *p.GetReplicationTasksFromDLQRequest) (resp *p.InternalGetHistoryTasksResponse, err error)
- func (d *MutableStateTaskStore) IsReplicationDLQEmpty(ctx context.Context, request *p.GetReplicationTasksFromDLQRequest) (empty bool, err error)
- func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(ctx context.Context, request *p.PutReplicationTaskToDLQRequest) (err error)
- func (d *MutableStateTaskStore) RangeCompleteHistoryTasks(ctx context.Context, request *p.RangeCompleteHistoryTasksRequest) (err error)
- func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *p.RangeDeleteReplicationTaskFromDLQRequest) error
- func (d *MutableStateTaskStore) RegisterHistoryTaskReader(_ context.Context, _ *p.RegisterHistoryTaskReaderRequest) error
- func (d *MutableStateTaskStore) UnregisterHistoryTaskReader(_ context.Context, _ *p.UnregisterHistoryTaskReaderRequest)
- func (d *MutableStateTaskStore) UpdateHistoryTaskReaderProgress(_ context.Context, _ *p.UpdateHistoryTaskReaderProgressRequest)
- type QueueStore
- func (q *QueueStore) Close()
- func (q *QueueStore) DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
- func (q *QueueStore) DeleteMessagesBefore(ctx context.Context, messageID int64) error
- func (q *QueueStore) EnqueueMessage(ctx context.Context, blob *commonpb.DataBlob) error
- func (q *QueueStore) EnqueueMessageToDLQ(ctx context.Context, blob *commonpb.DataBlob) (int64, error)
- func (q *QueueStore) GetAckLevels(ctx context.Context) (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) GetDLQAckLevels(ctx context.Context) (*persistence.InternalQueueMetadata, error)
- func (q *QueueStore) Init(ctx context.Context, blob *commonpb.DataBlob) error
- func (q *QueueStore) RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
- func (q *QueueStore) ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) (rv []*persistence.QueueMessage, err error)
- func (q *QueueStore) ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, ...) (m []*persistence.QueueMessage, nextPageToken []byte, err error)
- func (q *QueueStore) UpdateAckLevel(ctx context.Context, metadata *persistence.InternalQueueMetadata) (err error)
- func (q *QueueStore) UpdateDLQAckLevel(ctx context.Context, metadata *persistence.InternalQueueMetadata) (err error)
- type ShardStore
- func (d *ShardStore) AssertShardOwnership(ctx context.Context, request *p.AssertShardOwnershipRequest) error
- func (d *ShardStore) Close()
- func (d *ShardStore) GetClusterName() string
- func (d *ShardStore) GetName() string
- func (d *ShardStore) GetOrCreateShard(ctx context.Context, request *p.InternalGetOrCreateShardRequest) (resp *p.InternalGetOrCreateShardResponse, err error)
- func (d *ShardStore) UpdateShard(ctx context.Context, request *p.InternalUpdateShardRequest) (err error)
- type TestCluster
Constants ¶
const ( // NOTE: transaction ID is *= -1 in DB MinTxnID int64 = math.MaxInt64 MaxTxnID int64 = math.MinInt64 + 1 // int overflow )
const ( // YDBSeeds env YDBSeeds = "YDB_SEEDS" // YDBPort env YDBPort = "YDB_PORT" // YDBDefaultPort YDB default port YDBDefaultPort = 2136 Localhost = "127.0.0.1" )
Variables ¶
var (
NumHistoryShards = 1024
)
Functions ¶
func FromYDBDateTime ¶
FromYDBDateTime converts YDB datetime and returns go time
func NewClusterMetadataStore ¶
func NewClusterMetadataStore( client *xydb.Client, logger log.Logger, ) (p.ClusterMetadataStore, error)
NewClusterMetadataStore is used to create an instance of ClusterMetadataStore implementation
func NewMetadataStore ¶
func NewMetadataStore( currentClusterName string, client *xydb.Client, logger log.Logger, ) (p.MetadataStore, error)
NewMetadataStore is used to create an instance of the Namespace MetadataStore implementation
func NewQueueStore ¶
func NewQueueStore( queueType persistence.QueueType, client *xydb.Client, logger log.Logger, ) (persistence.Queue, error)
func NewYDBAbstractDataStoreFactory ¶
func NewYDBAbstractDataStoreFactory() client.AbstractDataStoreFactory
func NewYDBClientFromConfig ¶
func NewYDBClientFromConfig(cfg config.CustomDatastoreConfig, r resolver.ServiceResolver, logger log.Logger) (*xydb.Client, error)
func ToShardIDColumnValue ¶
Types ¶
type ClusterMetadataStore ¶
type ClusterMetadataStore struct {
// contains filtered or unexported fields
}
func (*ClusterMetadataStore) Close ¶
func (m *ClusterMetadataStore) Close()
func (*ClusterMetadataStore) DeleteClusterMetadata ¶
func (m *ClusterMetadataStore) DeleteClusterMetadata( ctx context.Context, request *p.InternalDeleteClusterMetadataRequest, ) error
func (*ClusterMetadataStore) GetClusterMembers ¶
func (m *ClusterMetadataStore) GetClusterMembers( ctx context.Context, request *p.GetClusterMembersRequest, ) (resp *p.GetClusterMembersResponse, err error)
func (*ClusterMetadataStore) GetClusterMetadata ¶
func (m *ClusterMetadataStore) GetClusterMetadata( ctx context.Context, request *p.InternalGetClusterMetadataRequest, ) (resp *p.InternalGetClusterMetadataResponse, err error)
func (*ClusterMetadataStore) GetName ¶
func (m *ClusterMetadataStore) GetName() string
func (*ClusterMetadataStore) ListClusterMetadata ¶
func (m *ClusterMetadataStore) ListClusterMetadata( ctx context.Context, request *p.InternalListClusterMetadataRequest, ) (resp *p.InternalListClusterMetadataResponse, err error)
func (*ClusterMetadataStore) PruneClusterMembership ¶
func (m *ClusterMetadataStore) PruneClusterMembership( ctx context.Context, request *p.PruneClusterMembershipRequest, ) error
func (*ClusterMetadataStore) SaveClusterMetadata ¶
func (m *ClusterMetadataStore) SaveClusterMetadata( ctx context.Context, request *p.InternalSaveClusterMetadataRequest, ) (rv bool, err error)
func (*ClusterMetadataStore) UpsertClusterMembership ¶
func (m *ClusterMetadataStore) UpsertClusterMembership( ctx context.Context, request *p.UpsertClusterMembershipRequest, ) error
type ExecutionStore ¶
type ExecutionStore struct { *HistoryStore *MutableStateStore *MutableStateTaskStore // contains filtered or unexported fields }
func NewExecutionStore ¶
func (*ExecutionStore) Close ¶
func (d *ExecutionStore) Close()
func (*ExecutionStore) ConflictResolveWorkflowExecution ¶
func (d *ExecutionStore) ConflictResolveWorkflowExecution( ctx context.Context, request *p.InternalConflictResolveWorkflowExecutionRequest, ) error
func (*ExecutionStore) CreateWorkflowExecution ¶
func (d *ExecutionStore) CreateWorkflowExecution( ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest, ) (*p.InternalCreateWorkflowExecutionResponse, error)
func (*ExecutionStore) GetName ¶
func (d *ExecutionStore) GetName() string
func (*ExecutionStore) UpdateWorkflowExecution ¶
func (d *ExecutionStore) UpdateWorkflowExecution( ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest, ) error
type Factory ¶
Factory vends datastore implementations backed by YDB
func NewFactory ¶
func NewFactory( cfg config.CustomDatastoreConfig, r resolver.ServiceResolver, clusterName string, logger log.Logger, metricsHandler metrics.Handler, ) *Factory
NewFactory returns an instance of a factory object which can be used to create data stores that are backed by YDB
func NewFactoryFromYDBConfig ¶
func (*Factory) NewClusterMetadataStore ¶
func (f *Factory) NewClusterMetadataStore() (p.ClusterMetadataStore, error)
NewClusterMetadataStore returns a metadata store
func (*Factory) NewExecutionStore ¶
func (f *Factory) NewExecutionStore() (p.ExecutionStore, error)
NewExecutionStore returns a new ExecutionStore.
func (*Factory) NewMetadataStore ¶
func (f *Factory) NewMetadataStore() (p.MetadataStore, error)
NewMetadataStore returns a metadata store
func (*Factory) NewShardStore ¶
func (f *Factory) NewShardStore() (p.ShardStore, error)
NewShardStore returns a new shard store
type HistoryStore ¶
func NewHistoryStore ¶
func NewHistoryStore( client *xydb.Client, logger log.Logger, ) *HistoryStore
func (*HistoryStore) AppendHistoryNodes ¶
func (h *HistoryStore) AppendHistoryNodes( ctx context.Context, request *p.InternalAppendHistoryNodesRequest, ) error
AppendHistoryNodes upsert a batch of events as a single node to a history branch
func (*HistoryStore) DeleteHistoryBranch ¶
func (h *HistoryStore) DeleteHistoryBranch( ctx context.Context, request *p.InternalDeleteHistoryBranchRequest, ) error
DeleteHistoryBranch removes a branch
func (*HistoryStore) DeleteHistoryNodes ¶
func (h *HistoryStore) DeleteHistoryNodes( ctx context.Context, request *p.InternalDeleteHistoryNodesRequest, ) (err error)
DeleteHistoryNodes delete a history node
func (*HistoryStore) ForkHistoryBranch ¶
func (h *HistoryStore) ForkHistoryBranch( ctx context.Context, request *p.InternalForkHistoryBranchRequest, ) (err error)
ForkHistoryBranch forks a new branch from an existing branch
func (*HistoryStore) GetAllHistoryTreeBranches ¶
func (h *HistoryStore) GetAllHistoryTreeBranches( ctx context.Context, request *p.GetAllHistoryTreeBranchesRequest, ) (resp *p.InternalGetAllHistoryTreeBranchesResponse, err error)
func (*HistoryStore) GetHistoryTree ¶
func (h *HistoryStore) GetHistoryTree( ctx context.Context, request *p.GetHistoryTreeRequest, ) (resp *p.InternalGetHistoryTreeResponse, err error)
GetHistoryTree returns all branch information of a tree
func (*HistoryStore) ReadHistoryBranch ¶
func (h *HistoryStore) ReadHistoryBranch( ctx context.Context, request *p.InternalReadHistoryBranchRequest, ) (resp *p.InternalReadHistoryBranchResponse, err error)
ReadHistoryBranch returns history node data for a branch
type MatchingTaskStore ¶
type MatchingTaskStore struct {
// contains filtered or unexported fields
}
func NewMatchingTaskStore ¶
func NewMatchingTaskStore( client *xydb.Client, logger log.Logger, ) *MatchingTaskStore
func (*MatchingTaskStore) Close ¶
func (d *MatchingTaskStore) Close()
func (*MatchingTaskStore) CompleteTask ¶
func (d *MatchingTaskStore) CompleteTask( ctx context.Context, request *p.CompleteTaskRequest, ) error
CompleteTask delete a task
func (*MatchingTaskStore) CompleteTasksLessThan ¶
func (d *MatchingTaskStore) CompleteTasksLessThan( ctx context.Context, request *p.CompleteTasksLessThanRequest, ) (int, error)
CompleteTasksLessThan deletes all tasks less than the given task id. This API ignores the Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will be returned to the caller
func (*MatchingTaskStore) CountTaskQueuesByBuildId ¶
func (d *MatchingTaskStore) CountTaskQueuesByBuildId(ctx context.Context, request *p.CountTaskQueuesByBuildIdRequest) (count int, err error)
func (*MatchingTaskStore) CreateTaskQueue ¶
func (d *MatchingTaskStore) CreateTaskQueue( ctx context.Context, request *p.InternalCreateTaskQueueRequest, ) error
func (*MatchingTaskStore) CreateTasks ¶
func (d *MatchingTaskStore) CreateTasks( ctx context.Context, request *p.InternalCreateTasksRequest, ) (*p.CreateTasksResponse, error)
CreateTasks add tasks
func (*MatchingTaskStore) DeleteTaskQueue ¶
func (d *MatchingTaskStore) DeleteTaskQueue( ctx context.Context, request *p.DeleteTaskQueueRequest, ) error
func (*MatchingTaskStore) GetName ¶
func (d *MatchingTaskStore) GetName() string
func (*MatchingTaskStore) GetTaskQueue ¶
func (d *MatchingTaskStore) GetTaskQueue( ctx context.Context, request *p.InternalGetTaskQueueRequest, ) (resp *p.InternalGetTaskQueueResponse, err error)
func (*MatchingTaskStore) GetTaskQueueUserData ¶
func (d *MatchingTaskStore) GetTaskQueueUserData( ctx context.Context, request *p.GetTaskQueueUserDataRequest, ) (resp *p.InternalGetTaskQueueUserDataResponse, err error)
func (*MatchingTaskStore) GetTaskQueuesByBuildId ¶
func (d *MatchingTaskStore) GetTaskQueuesByBuildId(ctx context.Context, request *p.GetTaskQueuesByBuildIdRequest) (rv []string, err error)
func (*MatchingTaskStore) GetTasks ¶
func (d *MatchingTaskStore) GetTasks( ctx context.Context, request *p.GetTasksRequest, ) (resp *p.InternalGetTasksResponse, err error)
GetTasks get a task
func (*MatchingTaskStore) ListTaskQueue ¶
func (d *MatchingTaskStore) ListTaskQueue( _ context.Context, _ *p.ListTaskQueueRequest, ) (*p.InternalListTaskQueueResponse, error)
func (*MatchingTaskStore) ListTaskQueueUserDataEntries ¶
func (d *MatchingTaskStore) ListTaskQueueUserDataEntries(ctx context.Context, request *p.ListTaskQueueUserDataEntriesRequest) (resp *p.InternalListTaskQueueUserDataEntriesResponse, err error)
func (*MatchingTaskStore) UpdateTaskQueue ¶
func (d *MatchingTaskStore) UpdateTaskQueue( ctx context.Context, request *p.InternalUpdateTaskQueueRequest, ) (*p.UpdateTaskQueueResponse, error)
UpdateTaskQueue update task queue
func (*MatchingTaskStore) UpdateTaskQueueUserData ¶
func (d *MatchingTaskStore) UpdateTaskQueueUserData( ctx context.Context, request *p.InternalUpdateTaskQueueUserDataRequest, ) (err error)
type MetadataStore ¶
type MetadataStore struct {
// contains filtered or unexported fields
}
func (*MetadataStore) Close ¶
func (m *MetadataStore) Close()
func (*MetadataStore) CreateNamespace ¶
func (m *MetadataStore) CreateNamespace(ctx context.Context, request *p.InternalCreateNamespaceRequest) (*p.CreateNamespaceResponse, error)
CreateNamespace create a namespace
func (*MetadataStore) DeleteNamespace ¶
func (m *MetadataStore) DeleteNamespace(ctx context.Context, request *p.DeleteNamespaceRequest) (err error)
func (*MetadataStore) DeleteNamespaceByName ¶
func (m *MetadataStore) DeleteNamespaceByName(ctx context.Context, request *p.DeleteNamespaceByNameRequest) error
func (*MetadataStore) GetMetadata ¶
func (m *MetadataStore) GetMetadata(ctx context.Context) (resp *p.GetMetadataResponse, err error)
func (*MetadataStore) GetName ¶
func (m *MetadataStore) GetName() string
func (*MetadataStore) GetNamespace ¶
func (m *MetadataStore) GetNamespace(ctx context.Context, request *p.GetNamespaceRequest) (resp *p.InternalGetNamespaceResponse, err error)
func (*MetadataStore) ListNamespaces ¶
func (m *MetadataStore) ListNamespaces(ctx context.Context, request *p.InternalListNamespacesRequest) (resp *p.InternalListNamespacesResponse, err error)
func (*MetadataStore) RenameNamespace ¶
func (m *MetadataStore) RenameNamespace(ctx context.Context, request *p.InternalRenameNamespaceRequest) error
func (*MetadataStore) UpdateNamespace ¶
func (m *MetadataStore) UpdateNamespace( ctx context.Context, request *p.InternalUpdateNamespaceRequest, ) error
type MutableStateStore ¶
type MutableStateStore struct {
// contains filtered or unexported fields
}
func NewMutableStateStore ¶
func NewMutableStateStore( client *xydb.Client, logger log.Logger, ) *MutableStateStore
func (*MutableStateStore) DeleteCurrentWorkflowExecution ¶
func (d *MutableStateStore) DeleteCurrentWorkflowExecution( ctx context.Context, request *p.DeleteCurrentWorkflowExecutionRequest, ) error
func (*MutableStateStore) DeleteWorkflowExecution ¶
func (d *MutableStateStore) DeleteWorkflowExecution( ctx context.Context, request *p.DeleteWorkflowExecutionRequest, ) error
func (*MutableStateStore) GetCurrentExecution ¶
func (d *MutableStateStore) GetCurrentExecution( ctx context.Context, request *p.GetCurrentExecutionRequest, ) (resp *p.InternalGetCurrentExecutionResponse, err error)
func (*MutableStateStore) GetWorkflowExecution ¶
func (d *MutableStateStore) GetWorkflowExecution( ctx context.Context, request *p.GetWorkflowExecutionRequest, ) (resp *p.InternalGetWorkflowExecutionResponse, err error)
func (*MutableStateStore) ListConcreteExecutions ¶
func (d *MutableStateStore) ListConcreteExecutions( ctx context.Context, request *p.ListConcreteExecutionsRequest, ) (*p.InternalListConcreteExecutionsResponse, error)
func (*MutableStateStore) SetWorkflowExecution ¶
func (d *MutableStateStore) SetWorkflowExecution(ctx context.Context, request *p.InternalSetWorkflowExecutionRequest) (err error)
type MutableStateTaskStore ¶
type MutableStateTaskStore struct {
// contains filtered or unexported fields
}
func NewMutableStateTaskStore ¶
func NewMutableStateTaskStore( client *xydb.Client, logger log.Logger, ) *MutableStateTaskStore
func (*MutableStateTaskStore) AddHistoryTasks ¶
func (d *MutableStateTaskStore) AddHistoryTasks( ctx context.Context, request *p.InternalAddHistoryTasksRequest, ) error
func (*MutableStateTaskStore) CompleteHistoryTask ¶
func (d *MutableStateTaskStore) CompleteHistoryTask( ctx context.Context, request *p.CompleteHistoryTaskRequest, ) (err error)
func (*MutableStateTaskStore) DeleteReplicationTaskFromDLQ ¶
func (d *MutableStateTaskStore) DeleteReplicationTaskFromDLQ( ctx context.Context, request *p.DeleteReplicationTaskFromDLQRequest, ) error
func (*MutableStateTaskStore) GetHistoryTasks ¶
func (d *MutableStateTaskStore) GetHistoryTasks( ctx context.Context, request *p.GetHistoryTasksRequest, ) (resp *p.InternalGetHistoryTasksResponse, err error)
func (*MutableStateTaskStore) GetReplicationTasksFromDLQ ¶
func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ( ctx context.Context, request *p.GetReplicationTasksFromDLQRequest, ) (resp *p.InternalGetHistoryTasksResponse, err error)
func (*MutableStateTaskStore) IsReplicationDLQEmpty ¶
func (d *MutableStateTaskStore) IsReplicationDLQEmpty( ctx context.Context, request *p.GetReplicationTasksFromDLQRequest, ) (empty bool, err error)
func (*MutableStateTaskStore) PutReplicationTaskToDLQ ¶
func (d *MutableStateTaskStore) PutReplicationTaskToDLQ( ctx context.Context, request *p.PutReplicationTaskToDLQRequest, ) (err error)
func (*MutableStateTaskStore) RangeCompleteHistoryTasks ¶
func (d *MutableStateTaskStore) RangeCompleteHistoryTasks( ctx context.Context, request *p.RangeCompleteHistoryTasksRequest, ) (err error)
func (*MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ ¶
func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *p.RangeDeleteReplicationTaskFromDLQRequest, ) error
func (*MutableStateTaskStore) RegisterHistoryTaskReader ¶
func (d *MutableStateTaskStore) RegisterHistoryTaskReader( _ context.Context, _ *p.RegisterHistoryTaskReaderRequest, ) error
func (*MutableStateTaskStore) UnregisterHistoryTaskReader ¶
func (d *MutableStateTaskStore) UnregisterHistoryTaskReader( _ context.Context, _ *p.UnregisterHistoryTaskReaderRequest, )
func (*MutableStateTaskStore) UpdateHistoryTaskReaderProgress ¶
func (d *MutableStateTaskStore) UpdateHistoryTaskReaderProgress( _ context.Context, _ *p.UpdateHistoryTaskReaderProgressRequest, )
type QueueStore ¶
type QueueStore struct {
// contains filtered or unexported fields
}
func (*QueueStore) Close ¶
func (q *QueueStore) Close()
func (*QueueStore) DeleteMessageFromDLQ ¶
func (q *QueueStore) DeleteMessageFromDLQ( ctx context.Context, messageID int64, ) error
func (*QueueStore) DeleteMessagesBefore ¶
func (q *QueueStore) DeleteMessagesBefore( ctx context.Context, messageID int64, ) error
func (*QueueStore) EnqueueMessage ¶
func (*QueueStore) EnqueueMessageToDLQ ¶
func (*QueueStore) GetAckLevels ¶
func (q *QueueStore) GetAckLevels( ctx context.Context, ) (*persistence.InternalQueueMetadata, error)
func (*QueueStore) GetDLQAckLevels ¶
func (q *QueueStore) GetDLQAckLevels( ctx context.Context, ) (*persistence.InternalQueueMetadata, error)
func (*QueueStore) RangeDeleteMessagesFromDLQ ¶
func (*QueueStore) ReadMessages ¶
func (q *QueueStore) ReadMessages( ctx context.Context, lastMessageID int64, maxCount int, ) (rv []*persistence.QueueMessage, err error)
func (*QueueStore) ReadMessagesFromDLQ ¶
func (q *QueueStore) ReadMessagesFromDLQ( ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte, ) (m []*persistence.QueueMessage, nextPageToken []byte, err error)
func (*QueueStore) UpdateAckLevel ¶
func (q *QueueStore) UpdateAckLevel( ctx context.Context, metadata *persistence.InternalQueueMetadata, ) (err error)
func (*QueueStore) UpdateDLQAckLevel ¶
func (q *QueueStore) UpdateDLQAckLevel( ctx context.Context, metadata *persistence.InternalQueueMetadata, ) (err error)
type ShardStore ¶
type ShardStore struct {
// contains filtered or unexported fields
}
func NewShardStore ¶
func (*ShardStore) AssertShardOwnership ¶
func (d *ShardStore) AssertShardOwnership(ctx context.Context, request *p.AssertShardOwnershipRequest) error
func (*ShardStore) Close ¶
func (d *ShardStore) Close()
func (*ShardStore) GetClusterName ¶
func (d *ShardStore) GetClusterName() string
func (*ShardStore) GetName ¶
func (d *ShardStore) GetName() string
func (*ShardStore) GetOrCreateShard ¶
func (d *ShardStore) GetOrCreateShard( ctx context.Context, request *p.InternalGetOrCreateShardRequest, ) (resp *p.InternalGetOrCreateShardResponse, err error)
func (*ShardStore) UpdateShard ¶
func (d *ShardStore) UpdateShard(ctx context.Context, request *p.InternalUpdateShardRequest) (err error)
type TestCluster ¶
type TestCluster struct { Cfg xydb.Config Logger log.Logger Client *xydb.Client // contains filtered or unexported fields }
TestCluster allows executing YDB operations in testing.
func NewTestCluster ¶
func NewTestCluster(database, username, password, host string, port int, schemaDir string, faultInjection *config.FaultInjection, logger log.Logger) *TestCluster
NewTestCluster returns a new YDB test cluster
func (*TestCluster) Config ¶
func (s *TestCluster) Config() config.Persistence
Config returns the persistence config for connecting to this test cluster
func (*TestCluster) DatabaseName ¶
func (s *TestCluster) DatabaseName() string
func (*TestCluster) SetupTestDatabase ¶
func (s *TestCluster) SetupTestDatabase()
func (*TestCluster) TearDownTestDatabase ¶
func (s *TestCluster) TearDownTestDatabase()