cluster

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SystemSchemaShardID is a shard for storing system tables. Note that this actually writes metadata to
	// the first data shard.
	SystemSchemaShardID uint64 = 1000

	// DataShardIDBase is the lowest value of a data shard id
	DataShardIDBase uint64 = 1000
)

Variables

This section is empty.

Functions

func DoDedup

func DoDedup(shardID uint64, dedupKey []byte, dedupMap map[string]uint64) (bool, error)

DoDedup checks for a duplicate key.

We use duplicate detection in PranaDB for two things:

*Screening out duplicate Kafka messages ingested in sources*

When we ingest a batch of messages from Kafka, we shard each message to find the destination shard, then for each destination shard we write a batch into the receiver table for that shard. Failure can occur after having successfully written to one or more destination shards but not all. We acknowledge the Kafka batch only after successfully writing to all shards. On recovery a new batch of messages from the last committed offset are consumed again, sharded and forward batches written to destination shards. We need to ignore any messages that have already been written.

*Screening out duplicate rows which are moved from one shard to another*

Sometimes in PranaDB, when processing rows through a materialized view we need to forward some data from one shard to another. A classic example is when calculating an aggregation. Imagine a materialized view with a simple aggregation. Data is being processed on a shard, the batch of rows reaches an aggregator they are forwarded to the shards that own the data for the aggregation key (defined by the group by columns).

*How the mechanism works*

In both the above cases we need a way of writing rows to another shard such that rows will be ignored if they have been seen before.

When we provide rows for writing into the shard state machine for the receiver table we also provide a dedup key for each row, this is composed of:

originator_id: 16 bytes sequence: 8 bytes

originator_id uniquely defines where the row came from - in the case of ingesting rows from Kafka it is made up of:

source_id: 8 bytes partition_id: 8 bytes

When receiving a row from another shard (e.g. in the case of forwarding a partial aggregation) it is made of:

internal aggregation table id: 8 bytes shard id (being forwarded from): 8 bytes

When the shard state machine receives a row with a particular dedup key, it looks up the last sequence value it received from the same originator_id. If the previous sequence value >= the current one, that means it's a duplicate row and is ignored.

When rows are actually written into the receiver table, the shard state machine writes the key as:

|shard_id|receiver_table_id|receiver_sequence|remote_consumer_id

Where do we get the sequence field in the dedup key from?

In the case of forwarding rows we have just consumed from Kafka, sequence is simply the offset in the Kafka partition.

In the case of forwarding rows in an aggregation to the owner of the key, the receiver sequence is used.

Types

type Cluster

type Cluster interface {
	ExecuteForwardBatch(shardID uint64, batch []byte) error

	// WriteBatch writes a batch reliably to storage
	WriteBatch(batch *WriteBatch, localOnly bool) error

	// WriteForwardBatch writes a batch reliably for forwarding to another shard
	WriteForwardBatch(batch *WriteBatch, localOnly bool) error

	// WriteBatchLocally writes a batch directly using the KV store without going through Raft
	WriteBatchLocally(batch *WriteBatch) error

	LocalGet(key []byte) ([]byte, error)

	LinearizableGet(shardID uint64, key []byte) ([]byte, error)

	// LocalScan scans the local store
	// endKeyPrefix is exclusive
	LocalScan(startKeyPrefix []byte, endKeyPrefix []byte, limit int) ([]KVPair, error)

	CreateSnapshot() (Snapshot, error)

	LocalScanWithSnapshot(snapshot Snapshot, startKeyPrefix []byte, endKeyPrefix []byte, limit int) ([]KVPair, error)

	GetNodeID() int

	GetAllShardIDs() []uint64

	// GetLocalShardIDs returns the ids of the shards on the local node - this includes replicas
	GetLocalShardIDs() []uint64

	// GenerateClusterSequence generates a cluster wide unique sequence number
	GenerateClusterSequence(sequenceName string) (uint64, error)

	SetRemoteQueryExecutionCallback(callback RemoteQueryExecutionCallback)

	RegisterShardListenerFactory(factory ShardListenerFactory)

	ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo, rowsFactory *common.RowsFactory) (*common.Rows, error)

	DeleteAllDataInRangeForAllShardsLocally(startPrefix []byte, endPrefix []byte) error

	DeleteAllDataInRangeForShardLocally(shardID uint64, startPrefix []byte, endPrefix []byte) error

	GetLock(prefix string) (bool, error)

	ReleaseLock(prefix string) (bool, error)

	AddToDeleteBatch(batch *ToDeleteBatch) error

	RemoveToDeleteBatch(batch *ToDeleteBatch) error

	Start() error

	Stop() error

	PostStartChecks(queryExec common.SimpleQueryExec) error

	SyncStore() error

	GetLeadersMap() (map[uint64]uint64, error)

	RegisterStartFill(expectedLeaders map[uint64]uint64, interruptor *interruptor.Interruptor) error

	RegisterEndFill()
}

type CommittedCallback

type CommittedCallback func() error

type DummyRemoteQueryExecutionCallback

type DummyRemoteQueryExecutionCallback struct {
}

func (*DummyRemoteQueryExecutionCallback) ExecuteRemotePullQuery

func (d *DummyRemoteQueryExecutionCallback) ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo) (*common.Rows, error)

type DummyShardListenerFactory

type DummyShardListenerFactory struct {
}

func (*DummyShardListenerFactory) CreateShardListener

func (d *DummyShardListenerFactory) CreateShardListener(shardID uint64) ShardListener

type ForwardRow added in v0.1.6

type ForwardRow struct {
	ReceiverSequence uint64
	RemoteConsumerID uint64
	KeyBytes         []byte
	RowBytes         []byte
	WriteTime        uint64
}

type ForwardWriteHandler added in v0.1.6

type ForwardWriteHandler interface {
	HandleForwardWrite(shardID uint64, writeBatch []byte) error
}

type KReceiver

type KReceiver func([]byte) error

type KVPair

type KVPair struct {
	Key   []byte
	Value []byte
}

type KVReceiver

type KVReceiver func([]byte, []byte) error

type LeaderChangeCallback

type LeaderChangeCallback interface {
	LeaderChanged(shardID uint64, added bool)
}

type QueryExecutionInfo

type QueryExecutionInfo struct {
	ExecutionID       string
	SchemaName        string
	Query             string
	Limit             uint32
	ShardID           uint64
	SystemQuery       bool
	PsArgTypes        []common.ColumnType
	PsArgs            []interface{}
	PreparedStatement bool
}

func (*QueryExecutionInfo) Deserialize

func (q *QueryExecutionInfo) Deserialize(buff []byte) error

func (*QueryExecutionInfo) Serialize

func (q *QueryExecutionInfo) Serialize(buff []byte) ([]byte, error)

type RemoteQueryExecutionCallback

type RemoteQueryExecutionCallback interface {
	ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo) (*common.Rows, error)
}

type RemoteQueryResult

type RemoteQueryResult struct {
	Rows *common.Rows
	Err  error
}

type RemoteWriteHandler

type RemoteWriteHandler interface {
	RemoteWriteOccurred(shardID uint64)
}

RemoteWriteHandler will be called when a remote write is done to a shard

type ShardCallback

type ShardCallback interface {
	Write(batch WriteBatch) error
}

type ShardListener

type ShardListener interface {
	RemoteWriteOccurred(forwardRows []ForwardRow)
	Close()
}

type ShardListenerFactory

type ShardListenerFactory interface {
	CreateShardListener(shardID uint64) ShardListener
}

type Snapshot

type Snapshot interface {
	Close()
}

type ToDeleteBatch

type ToDeleteBatch struct {
	ConditionalTableID uint64
	Prefixes           [][]byte
}

type WriteBatch

type WriteBatch struct {
	ShardID    uint64
	Puts       []byte
	Deletes    []byte
	NumPuts    int
	NumDeletes int
	// contains filtered or unexported fields
}

WriteBatch represents some Puts and deletes that will be written atomically by the underlying storage implementation

func NewWriteBatch

func NewWriteBatch(shardID uint64) *WriteBatch

func (*WriteBatch) AddCommittedCallback

func (wb *WriteBatch) AddCommittedCallback(callback CommittedCallback)

AddCommittedCallback adds a callback which will be called when the batch is committed

func (*WriteBatch) AddDelete

func (wb *WriteBatch) AddDelete(k []byte)

func (*WriteBatch) AddPut

func (wb *WriteBatch) AddPut(k []byte, v []byte)

func (*WriteBatch) AfterCommit

func (wb *WriteBatch) AfterCommit() error

AfterCommit This should be called after committing the batch - it causes any committed callbacks to be run

func (*WriteBatch) ForEachDelete

func (wb *WriteBatch) ForEachDelete(kReceiver KReceiver) error

func (*WriteBatch) ForEachPut

func (wb *WriteBatch) ForEachPut(kvReceiver KVReceiver) error

func (*WriteBatch) HasPuts added in v0.1.6

func (wb *WriteBatch) HasPuts() bool

func (*WriteBatch) HasWrites

func (wb *WriteBatch) HasWrites() bool

func (*WriteBatch) Serialize

func (wb *WriteBatch) Serialize(buff []byte) []byte

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL