Documentation ¶
Overview ¶
Package replication is a generated GoMock package.
Package replication is a generated GoMock package.
Index ¶
- Variables
- type Cache
- type DLQHandler
- type DynamicTaskReader
- type MetricsEmitterImpl
- type MockTaskExecutor
- type MockTaskExecutorMockRecorder
- type MockTaskFetcher
- func (m *MockTaskFetcher) EXPECT() *MockTaskFetcherMockRecorder
- func (m *MockTaskFetcher) GetRateLimiter() *quotas.DynamicRateLimiter
- func (m *MockTaskFetcher) GetRequestChan() chan<- *request
- func (m *MockTaskFetcher) GetSourceCluster() string
- func (m *MockTaskFetcher) Start()
- func (m *MockTaskFetcher) Stop()
- type MockTaskFetcherMockRecorder
- func (mr *MockTaskFetcherMockRecorder) GetRateLimiter() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) GetRequestChan() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) Start() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) Stop() *gomock.Call
- type MockTaskFetchers
- type MockTaskFetchersMockRecorder
- type TaskAckManager
- type TaskExecutor
- type TaskFetcher
- type TaskFetchers
- type TaskHydrator
- type TaskProcessor
- type TaskStore
Constants ¶
This section is empty.
Variables ¶
var ErrUnknownCluster = errors.New("unknown cluster")
ErrUnknownCluster is returned when given cluster is not defined in cluster metadata
var ( // ErrUnknownReplicationTask is the error to indicate unknown replication task type ErrUnknownReplicationTask = &types.BadRequestError{Message: "unknown replication task"} )
Functions ¶
This section is empty.
Types ¶
type Cache ¶ added in v0.25.0
type Cache struct {
// contains filtered or unexported fields
}
Cache is an in-memory implementation of a cache for storing hydrated replication messages. Messages can come out of order as long as their task ID is higher than last acknowledged message. Out of order is expected as different source clusters will share hydrated replication messages.
Cache utilizes heap to keep replication messages in order. This is needed for efficient acknowledgements in O(log N).
Cache capacity can be increased dynamically. Decrease will require a restart, as new tasks will not be accepted, but memory will not be reclaimed either.
Cache methods are thread safe. It is expected to have writers and readers from different go routines.
func NewCache ¶ added in v0.25.0
func NewCache(capacity dynamicconfig.IntPropertyFn) *Cache
NewCache create a new instance of replication cache
func (*Cache) Ack ¶ added in v0.25.0
Ack is used to acknowledge replication messages. Meaning they will be removed from the cache.
func (*Cache) Get ¶ added in v0.25.0
func (c *Cache) Get(taskID int64) *types.ReplicationTask
Get will return a stored task having a given taskID. If task is not cache, nil is returned.
type DLQHandler ¶
type DLQHandler interface { common.Daemon GetMessageCount( ctx context.Context, forceFetch bool, ) (map[string]int64, error) ReadMessages( ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte, ) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) PurgeMessages( ctx context.Context, sourceCluster string, lastMessageID int64, ) error MergeMessages( ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte, ) ([]byte, error) }
DLQHandler is the interface handles replication DLQ messages
func NewDLQHandler ¶
func NewDLQHandler( shard shard.Context, taskExecutors map[string]TaskExecutor, ) DLQHandler
NewDLQHandler initialize the replication message DLQ handler
type DynamicTaskReader ¶ added in v0.25.0
type DynamicTaskReader struct {
// contains filtered or unexported fields
}
DynamicTaskReader will read replication tasks from database using dynamic batch sizing depending on replication lag.
func NewDynamicTaskReader ¶ added in v0.25.0
func NewDynamicTaskReader( shardID int, executionManager persistence.ExecutionManager, timeSource clock.TimeSource, config *config.Config, ) *DynamicTaskReader
NewDynamicTaskReader creates new DynamicTaskReader
func (*DynamicTaskReader) Read ¶ added in v0.25.0
func (r *DynamicTaskReader) Read(ctx context.Context, readLevel int64, maxReadLevel int64) ([]*persistence.ReplicationTaskInfo, bool, error)
Read reads and returns replications tasks from readLevel to maxReadLevel. Batch size is determined dynamically. If replication lag is less than config.ReplicatorUpperLatency it will be proportionally smaller. Otherwise default batch size of config.ReplicatorProcessorFetchTasksBatchSize will be used.
type MetricsEmitterImpl ¶ added in v0.25.0
type MetricsEmitterImpl struct {
// contains filtered or unexported fields
}
MetricsEmitterImpl is responsible for emitting source side replication metrics occasionally.
func NewMetricsEmitter ¶ added in v0.25.0
func NewMetricsEmitter( shardID int, shardData metricsEmitterShardData, reader taskReader, metricsClient metrics.Client, ) *MetricsEmitterImpl
NewMetricsEmitter creates a new metrics emitter, which starts a goroutine to emit replication metrics occasionally.
func (*MetricsEmitterImpl) Start ¶ added in v0.25.0
func (m *MetricsEmitterImpl) Start()
func (*MetricsEmitterImpl) Stop ¶ added in v0.25.0
func (m *MetricsEmitterImpl) Stop()
type MockTaskExecutor ¶
type MockTaskExecutor struct {
// contains filtered or unexported fields
}
MockTaskExecutor is a mock of TaskExecutor interface.
func NewMockTaskExecutor ¶
func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor
NewMockTaskExecutor creates a new mock instance.
func (*MockTaskExecutor) EXPECT ¶
func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockTaskExecutorMockRecorder ¶
type MockTaskExecutorMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor.
type MockTaskFetcher ¶
type MockTaskFetcher struct {
// contains filtered or unexported fields
}
MockTaskFetcher is a mock of TaskFetcher interface.
func NewMockTaskFetcher ¶
func NewMockTaskFetcher(ctrl *gomock.Controller) *MockTaskFetcher
NewMockTaskFetcher creates a new mock instance.
func (*MockTaskFetcher) EXPECT ¶
func (m *MockTaskFetcher) EXPECT() *MockTaskFetcherMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockTaskFetcher) GetRateLimiter ¶ added in v0.14.0
func (m *MockTaskFetcher) GetRateLimiter() *quotas.DynamicRateLimiter
GetRateLimiter mocks base method.
func (*MockTaskFetcher) GetRequestChan ¶
func (m *MockTaskFetcher) GetRequestChan() chan<- *request
GetRequestChan mocks base method.
func (*MockTaskFetcher) GetSourceCluster ¶
func (m *MockTaskFetcher) GetSourceCluster() string
GetSourceCluster mocks base method.
type MockTaskFetcherMockRecorder ¶
type MockTaskFetcherMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskFetcherMockRecorder is the mock recorder for MockTaskFetcher.
func (*MockTaskFetcherMockRecorder) GetRateLimiter ¶ added in v0.14.0
func (mr *MockTaskFetcherMockRecorder) GetRateLimiter() *gomock.Call
GetRateLimiter indicates an expected call of GetRateLimiter.
func (*MockTaskFetcherMockRecorder) GetRequestChan ¶
func (mr *MockTaskFetcherMockRecorder) GetRequestChan() *gomock.Call
GetRequestChan indicates an expected call of GetRequestChan.
func (*MockTaskFetcherMockRecorder) GetSourceCluster ¶
func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call
GetSourceCluster indicates an expected call of GetSourceCluster.
func (*MockTaskFetcherMockRecorder) Start ¶
func (mr *MockTaskFetcherMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockTaskFetcherMockRecorder) Stop ¶
func (mr *MockTaskFetcherMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type MockTaskFetchers ¶
type MockTaskFetchers struct {
// contains filtered or unexported fields
}
MockTaskFetchers is a mock of TaskFetchers interface.
func NewMockTaskFetchers ¶
func NewMockTaskFetchers(ctrl *gomock.Controller) *MockTaskFetchers
NewMockTaskFetchers creates a new mock instance.
func (*MockTaskFetchers) EXPECT ¶
func (m *MockTaskFetchers) EXPECT() *MockTaskFetchersMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockTaskFetchers) GetFetchers ¶
func (m *MockTaskFetchers) GetFetchers() []TaskFetcher
GetFetchers mocks base method.
type MockTaskFetchersMockRecorder ¶
type MockTaskFetchersMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskFetchersMockRecorder is the mock recorder for MockTaskFetchers.
func (*MockTaskFetchersMockRecorder) GetFetchers ¶
func (mr *MockTaskFetchersMockRecorder) GetFetchers() *gomock.Call
GetFetchers indicates an expected call of GetFetchers.
func (*MockTaskFetchersMockRecorder) Start ¶
func (mr *MockTaskFetchersMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockTaskFetchersMockRecorder) Stop ¶
func (mr *MockTaskFetchersMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type TaskAckManager ¶ added in v0.15.0
type TaskAckManager struct {
// contains filtered or unexported fields
}
TaskAckManager is the ack manager for replication tasks
func NewTaskAckManager ¶ added in v0.15.0
func NewTaskAckManager( shardID int, ackLevels ackLevelStore, metricsClient metrics.Client, logger log.Logger, reader taskReader, store *TaskStore, ) TaskAckManager
NewTaskAckManager initializes a new replication task ack manager
func (*TaskAckManager) GetTasks ¶ added in v0.15.0
func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, lastReadTaskID int64) (*types.ReplicationMessages, error)
type TaskExecutor ¶
type TaskExecutor interface {
// contains filtered or unexported methods
}
TaskExecutor is the executor for replication task
func NewTaskExecutor ¶
func NewTaskExecutor( shard shard.Context, domainCache cache.DomainCache, historyResender ndc.HistoryResender, historyEngine engine.Engine, metricsClient metrics.Client, logger log.Logger, ) TaskExecutor
NewTaskExecutor creates an replication task executor The executor uses by 1) DLQ replication task handler 2) history replication task processor
type TaskFetcher ¶
type TaskFetcher interface { common.Daemon GetSourceCluster() string GetRequestChan() chan<- *request GetRateLimiter() *quotas.DynamicRateLimiter }
TaskFetcher is responsible for fetching replication messages from remote DC.
type TaskFetchers ¶
type TaskFetchers interface { common.Daemon GetFetchers() []TaskFetcher }
TaskFetchers is a group of fetchers, one per source DC.
type TaskHydrator ¶ added in v0.25.0
type TaskHydrator struct {
// contains filtered or unexported fields
}
TaskHydrator will enrich replication task with additional information from mutable state and history events. Mutable state and history providers can be either in-memory or persistence based implementations; depending whether we have available data already or need to load it.
func NewDeferredTaskHydrator ¶ added in v0.25.0
func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache *execution.Cache, domains domainCache) TaskHydrator
NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, but is rather loaded in a deferred way later from a database and cache.
func NewImmediateTaskHydrator ¶ added in v0.25.0
func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, blob, nextBlob *persistence.DataBlob) TaskHydrator
NewImmediateTaskHydrator will enrich replication tasks with additional information that is immediately available.
func (TaskHydrator) Hydrate ¶ added in v0.25.0
func (h TaskHydrator) Hydrate(ctx context.Context, task persistence.ReplicationTaskInfo) (retTask *types.ReplicationTask, retErr error)
Hydrate will enrich replication task with additional information from mutable state and history events.
type TaskProcessor ¶
TaskProcessor is responsible for processing replication tasks for a shard.
func NewTaskProcessor ¶
func NewTaskProcessor( shard shard.Context, historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, taskFetcher TaskFetcher, taskExecutor TaskExecutor, ) TaskProcessor
NewTaskProcessor creates a new replication task processor.
type TaskStore ¶ added in v0.25.0
type TaskStore struct {
// contains filtered or unexported fields
}
TaskStore is a component that hydrates and caches replication messages so that they can be reused across several polling source clusters. It also exposes public Put method. This allows pre-store already hydrated messages at the end of successful transaction, saving a DB call to fetch history events.
TaskStore uses a separate cache per each source cluster allowing messages to be fetched at different rates. Once a cache becomes full it will not accept further messages for that cluster. Later those messages be fetched from DB and hydrated again. A cache stores only a pointer to the message. It is hydrates once and shared across caches. Cluster acknowledging the message will remove it from that corresponding cache. Once all clusters acknowledge it, no more references will be held, and GC will eventually pick it up.
func NewTaskStore ¶ added in v0.25.0
func NewTaskStore( config *config.Config, clusterMetadata cluster.Metadata, domains domainCache, metricsClient metrics.Client, logger log.Logger, hydrator taskHydrator, ) *TaskStore
NewTaskStore create new instance of TaskStore
func (*TaskStore) Ack ¶ added in v0.25.0
Ack will acknowledge replication message for a given cluster. This will result in all messages removed from the cache up to a given lastTaskID.
func (*TaskStore) Get ¶ added in v0.25.0
func (m *TaskStore) Get(ctx context.Context, cluster string, info persistence.ReplicationTaskInfo) (*types.ReplicationTask, error)
Get will return a hydrated replication message for a given cluster based on raw task info. It will either return it immediately from cache or hydrate it, store in cache and then return.
Returned task may be nil. This may be due domain not existing in a given cluster or replication message is not longer relevant. Either case is valid and such replication message should be ignored and not returned in the response.
func (*TaskStore) Put ¶ added in v0.25.0
func (m *TaskStore) Put(task *types.ReplicationTask)
Put will try to store hydrated replication to all cluster caches. Tasks may not be relevant, as domain is not enabled in some clusters. Ignore task for that cluster. Some clusters may be already have full cache. Ignore task, it will be fetched and hydrated again later. Some clusters may already acknowledged such task. Ignore task, it is no longer relevant for such cluster.