ingestion

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidCommit indicates that the result could not be committed,
	// either due to the block being older than the latest committed block,
	// or the parent of the block does not match the latest committed block.
	//
	// Both cases should be resolvable by restarting the Ingestor from the
	// latest committed block.
	ErrInvalidCommit = errors.New("invalid commit")
)

Functions

func GetEventsIngestorName

func GetEventsIngestorName(version model.KinVersion) string

GetEventsIngestorName returns the events ingestor name for the specified version.

func GetHistoryIngestorName

func GetHistoryIngestorName(version model.KinVersion) string

GetHistoryIngestorName returns the history ingestor name for the specified version.

func Run

Run runs an ingestion flow in a blocking fashion. Run only returns if the context is cancelled.

The ingestion will only occur when the lock has been acquired. The specified lock can be scoped to the individual ingestor, or for a set of ingestors.

Types

type Committer

type Committer interface {
	// Commit commits the specified block as committed, if 'parent' was the
	// value of the previously committed block.
	//
	// If no previous commit exists, the commit is considered valid.
	Commit(ctx context.Context, ingestor string, parent, block Pointer) error

	// Latest returns the latest committed block pointer.
	//
	// Nil is returned with a nil error if no previous commit exists.
	Latest(ctx context.Context, ingestor string) (Pointer, error)
}

Committer marks processed blocks as committed, with the following assumptions:

  1. The block must be successfully written to history via a history.Writer.
  2. The block must be committed in the same order as the block chain.

Implementations should enforce (2) by ensuring that for each commit C[i], C[i].parent is the same as C[i-1].block (should a previous commit exist), and that C[i].block is greater than C[i].parent (and therefore greater than C[i-1].block).

type DistributedLock

type DistributedLock interface {
	// Lock blocks until the lock is acquired, the context is cancelled, or an
	// unexpected error occured. In the latter two cases, an error is returned.
	Lock(ctx context.Context) error

	// Unlock releases the lock, if acquired. Unlock() is idempotent.
	Unlock() error
}

DistributedLock is a distributed lock used for coordinating which node should be actively ingesting. While the Commit() ensures that no bad commits can be made (by the invariants described on the Committer interface), the DistributedLock helps avoid double processing of blocks, and churn from ErrInvalidCommit.

type Ingestor

type Ingestor interface {
	// Name is the name of the ingestor that will be used as the key when
	// committing blocks. As a result, it should be some unique identifier
	// referencing the blockchain being processed.
	Name() string

	// Ingest ingests blocks starting at the immediate block after the parent pointer.
	Ingest(ctx context.Context, w history.Writer, parent Pointer) (ResultQueue, error)
}

Ingestor ingests blocks from a blockchain, writing them to a history.Writer.

type Pointer

type Pointer []byte

Pointer is an opaque value that points to a 'block' or 'ledger' for a given blockchain. The value should be lexicographically comparable to entries produced by the respective blockchain's model.Entry's ordering keys.

For example, all transactions produced in blocks 0 -> N should be lexicographically less than the pointer to block N+1.

type Result

type Result struct {
	// Err is a non-retriable error that occurred when processing
	// the block.
	Err error

	// Parent points to the parent block that was processed.
	Parent Pointer

	// Block points to the block that was processed.
	Block Pointer
}

Result is the result of processing a block.

type ResultQueue

type ResultQueue <-chan <-chan Result

ResultQueue behaves as a Queue<Future<Result>>, where the order of the queue is that of the order of blocks being produced.

This structure allows for async processing of blocks, while ensuring that the commit pointer is advanced sequentially.

Directories

Path Synopsis
dynamodb
committer
Package dynamodb implements a dynamodb backed ingestion.Committer.
Package dynamodb implements a dynamodb backed ingestion.Committer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL