heavyclient

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2019 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package heavyclient contains heavy replication client code.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type JetClient

type JetClient struct {
	// contains filtered or unexported fields
}

JetClient heavy replication client. Replicates records for one jet.

func NewJetClient

func NewJetClient(
	replicaStorage storage.ReplicaStorage,
	mb core.MessageBus,
	pulseStorage core.PulseStorage,
	pulseTracker storage.PulseTracker,
	cleaner storage.Cleaner,
	db storage.DBContext,
	jetID core.RecordID,
	opts Options,
) *JetClient

NewJetClient heavy replication client constructor.

First argument defines what jet it serve.

func (*JetClient) HeavySync

func (c *JetClient) HeavySync(
	ctx context.Context,
	pn core.PulseNumber,
	retry bool,
) error

HeavySync syncs records from light to heavy node, returns last synced pulse and error.

It syncs records from start to end of provided pulse numbers.

func (*JetClient) Stop

func (c *JetClient) Stop(ctx context.Context)

Stop stops heavy client replication

type Options

type Options struct {
	SyncMessageLimit int
	PulsesDeltaLimit int
	BackoffConf      configuration.Backoff
}

Options contains heavy client configuration params.

type Pool

type Pool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Pool manages state of heavy sync clients (one client per jet id).

func NewPool

func NewPool(
	bus core.MessageBus,
	pulseStorage core.PulseStorage,
	tracker storage.PulseTracker,
	replicaStorage storage.ReplicaStorage,
	cleaner storage.Cleaner,
	db storage.DBContext,
	clientDefaults Options,
) *Pool

NewPool constructor of new pool.

func (*Pool) AddPulsesToSyncClient

func (scp *Pool) AddPulsesToSyncClient(
	ctx context.Context,
	jetID core.RecordID,
	shouldrun bool,
	pns ...core.PulseNumber,
) *JetClient

AddPulsesToSyncClient add pulse numbers to the end of jet's heavy client queue.

Bool flag 'shouldrun' controls should heavy client be started (if not already) or not.

func (*Pool) AllClients added in v0.8.0

func (scp *Pool) AllClients(ctx context.Context) []*JetClient

AllClients returns slice with all clients in Pool.

func (*Pool) LightCleanup added in v0.8.0

func (scp *Pool) LightCleanup(
	ctx context.Context,
	untilPN core.PulseNumber,
	rsp recentstorage.Provider,
	jetIndexesRemoved map[core.RecordID][]core.RecordID,
) error

LightCleanup starts async cleanup on all heavy synchronization clients (per jet cleanup).

Waits until all cleanup will done and mesaures time.

Under hood it uses singleflight on Jet prefix to avoid clashing on the same key space.

func (*Pool) Stop

func (scp *Pool) Stop(ctx context.Context)

Stop send stop signals to all managed heavy clients and waits when until all of them will stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL