Documentation ¶
Index ¶
- Constants
- func DoDedup(shardID uint64, dedupKey []byte, dedupMap map[string]uint64) (bool, error)
- type Cluster
- type CommittedCallback
- type DummyRemoteQueryExecutionCallback
- type DummyShardListenerFactory
- type ForwardRow
- type ForwardWriteHandler
- type KReceiver
- type KVPair
- type KVReceiver
- type LeaderChangeCallback
- type QueryExecutionInfo
- type RemoteQueryExecutionCallback
- type RemoteQueryResult
- type RemoteWriteHandler
- type ShardCallback
- type ShardListener
- type ShardListenerFactory
- type Snapshot
- type ToDeleteBatch
- type WriteBatch
- func (wb *WriteBatch) AddCommittedCallback(callback CommittedCallback)
- func (wb *WriteBatch) AddDelete(k []byte)
- func (wb *WriteBatch) AddPut(k []byte, v []byte)
- func (wb *WriteBatch) AfterCommit() error
- func (wb *WriteBatch) ForEachDelete(kReceiver KReceiver) error
- func (wb *WriteBatch) ForEachPut(kvReceiver KVReceiver) error
- func (wb *WriteBatch) HasPuts() bool
- func (wb *WriteBatch) HasWrites() bool
- func (wb *WriteBatch) Serialize(buff []byte) []byte
Constants ¶
const ( // SystemSchemaShardID is a shard for storing system tables. Note that this actually writes metadata to // the first data shard. SystemSchemaShardID uint64 = 1000 // DataShardIDBase is the lowest value of a data shard id DataShardIDBase uint64 = 1000 )
Variables ¶
This section is empty.
Functions ¶
func DoDedup ¶
DoDedup checks for a duplicate key.
We use duplicate detection in PranaDB for two things:
*Screening out duplicate Kafka messages ingested in sources*
When we ingest a batch of messages from Kafka, we shard each message to find the destination shard, then for each destination shard we write a batch into the receiver table for that shard. Failure can occur after having successfully written to one or more destination shards but not all. We acknowledge the Kafka batch only after successfully writing to all shards. On recovery a new batch of messages from the last committed offset are consumed again, sharded and forward batches written to destination shards. We need to ignore any messages that have already been written.
*Screening out duplicate rows which are moved from one shard to another*
Sometimes in PranaDB, when processing rows through a materialized view we need to forward some data from one shard to another. A classic example is when calculating an aggregation. Imagine a materialized view with a simple aggregation. Data is being processed on a shard, the batch of rows reaches an aggregator they are forwarded to the shards that own the data for the aggregation key (defined by the group by columns).
*How the mechanism works*
In both the above cases we need a way of writing rows to another shard such that rows will be ignored if they have been seen before.
When we provide rows for writing into the shard state machine for the receiver table we also provide a dedup key for each row, this is composed of:
originator_id: 16 bytes sequence: 8 bytes
originator_id uniquely defines where the row came from - in the case of ingesting rows from Kafka it is made up of:
source_id: 8 bytes partition_id: 8 bytes
When receiving a row from another shard (e.g. in the case of forwarding a partial aggregation) it is made of:
internal aggregation table id: 8 bytes shard id (being forwarded from): 8 bytes
When the shard state machine receives a row with a particular dedup key, it looks up the last sequence value it received from the same originator_id. If the previous sequence value >= the current one, that means it's a duplicate row and is ignored.
When rows are actually written into the receiver table, the shard state machine writes the key as:
|shard_id|receiver_table_id|receiver_sequence|remote_consumer_id
Where do we get the sequence field in the dedup key from?
In the case of forwarding rows we have just consumed from Kafka, sequence is simply the offset in the Kafka partition.
In the case of forwarding rows in an aggregation to the owner of the key, the receiver sequence is used.
Types ¶
type Cluster ¶
type Cluster interface { ExecuteForwardBatch(shardID uint64, batch []byte) error // WriteBatch writes a batch reliably to storage WriteBatch(batch *WriteBatch, localOnly bool) error // WriteForwardBatch writes a batch reliably for forwarding to another shard WriteForwardBatch(batch *WriteBatch, localOnly bool) error // WriteBatchLocally writes a batch directly using the KV store without going through Raft WriteBatchLocally(batch *WriteBatch) error LocalGet(key []byte) ([]byte, error) LinearizableGet(shardID uint64, key []byte) ([]byte, error) // LocalScan scans the local store // endKeyPrefix is exclusive LocalScan(startKeyPrefix []byte, endKeyPrefix []byte, limit int) ([]KVPair, error) CreateSnapshot() (Snapshot, error) LocalScanWithSnapshot(snapshot Snapshot, startKeyPrefix []byte, endKeyPrefix []byte, limit int) ([]KVPair, error) GetNodeID() int GetAllShardIDs() []uint64 // GetLocalShardIDs returns the ids of the shards on the local node - this includes replicas GetLocalShardIDs() []uint64 // GenerateClusterSequence generates a cluster wide unique sequence number GenerateClusterSequence(sequenceName string) (uint64, error) SetRemoteQueryExecutionCallback(callback RemoteQueryExecutionCallback) RegisterShardListenerFactory(factory ShardListenerFactory) ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo, rowsFactory *common.RowsFactory) (*common.Rows, error) DeleteAllDataInRangeForAllShardsLocally(startPrefix []byte, endPrefix []byte) error DeleteAllDataInRangeForShardLocally(shardID uint64, startPrefix []byte, endPrefix []byte) error GetLock(prefix string) (bool, error) ReleaseLock(prefix string) (bool, error) AddToDeleteBatch(batch *ToDeleteBatch) error RemoveToDeleteBatch(batch *ToDeleteBatch) error Start() error Stop() error PostStartChecks(queryExec common.SimpleQueryExec) error SyncStore() error GetLeadersMap() (map[uint64]uint64, error) RegisterStartFill(expectedLeaders map[uint64]uint64, interruptor *interruptor.Interruptor) error RegisterEndFill() }
type CommittedCallback ¶
type CommittedCallback func() error
type DummyRemoteQueryExecutionCallback ¶
type DummyRemoteQueryExecutionCallback struct { }
func (*DummyRemoteQueryExecutionCallback) ExecuteRemotePullQuery ¶
func (d *DummyRemoteQueryExecutionCallback) ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo) (*common.Rows, error)
type DummyShardListenerFactory ¶
type DummyShardListenerFactory struct { }
func (*DummyShardListenerFactory) CreateShardListener ¶
func (d *DummyShardListenerFactory) CreateShardListener(shardID uint64) ShardListener
type ForwardRow ¶ added in v0.1.6
type ForwardWriteHandler ¶ added in v0.1.6
type KVReceiver ¶
type LeaderChangeCallback ¶
type QueryExecutionInfo ¶
type QueryExecutionInfo struct { ExecutionID string SchemaName string Query string Limit uint32 ShardID uint64 SystemQuery bool PsArgTypes []common.ColumnType PsArgs []interface{} PreparedStatement bool }
func (*QueryExecutionInfo) Deserialize ¶
func (q *QueryExecutionInfo) Deserialize(buff []byte) error
type RemoteQueryExecutionCallback ¶
type RemoteQueryExecutionCallback interface {
ExecuteRemotePullQuery(queryInfo *QueryExecutionInfo) (*common.Rows, error)
}
type RemoteQueryResult ¶
type RemoteWriteHandler ¶
type RemoteWriteHandler interface {
RemoteWriteOccurred(shardID uint64)
}
RemoteWriteHandler will be called when a remote write is done to a shard
type ShardCallback ¶
type ShardCallback interface {
Write(batch WriteBatch) error
}
type ShardListener ¶
type ShardListener interface { RemoteWriteOccurred(forwardRows []ForwardRow) Close() }
type ShardListenerFactory ¶
type ShardListenerFactory interface {
CreateShardListener(shardID uint64) ShardListener
}
type ToDeleteBatch ¶
type WriteBatch ¶
type WriteBatch struct { ShardID uint64 Puts []byte Deletes []byte NumPuts int NumDeletes int // contains filtered or unexported fields }
WriteBatch represents some Puts and deletes that will be written atomically by the underlying storage implementation
func NewWriteBatch ¶
func NewWriteBatch(shardID uint64) *WriteBatch
func (*WriteBatch) AddCommittedCallback ¶
func (wb *WriteBatch) AddCommittedCallback(callback CommittedCallback)
AddCommittedCallback adds a callback which will be called when the batch is committed
func (*WriteBatch) AddDelete ¶
func (wb *WriteBatch) AddDelete(k []byte)
func (*WriteBatch) AddPut ¶
func (wb *WriteBatch) AddPut(k []byte, v []byte)
func (*WriteBatch) AfterCommit ¶
func (wb *WriteBatch) AfterCommit() error
AfterCommit This should be called after committing the batch - it causes any committed callbacks to be run
func (*WriteBatch) ForEachDelete ¶
func (wb *WriteBatch) ForEachDelete(kReceiver KReceiver) error
func (*WriteBatch) ForEachPut ¶
func (wb *WriteBatch) ForEachPut(kvReceiver KVReceiver) error
func (*WriteBatch) HasPuts ¶ added in v0.1.6
func (wb *WriteBatch) HasPuts() bool
func (*WriteBatch) HasWrites ¶
func (wb *WriteBatch) HasWrites() bool
func (*WriteBatch) Serialize ¶
func (wb *WriteBatch) Serialize(buff []byte) []byte