consumer

package
v0.83.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2019 License: MIT Imports: 32 Imported by: 9

Documentation

Overview

Package consumer is a library for distributed, stateful consumption of Gazette journals.

Index

Constants

This section is empty.

Variables

View Source
var ErrResolverStopped = errors.New("resolver has stopped serving local shards")

ErrResolverStopped is returned by Resolver if a ShardID resolves to a local shard, but the Resolver has already been asked to stop serving local shards. This happens when the consumer is shutting down abnormally (eg, due to Etcd lease keep-alive failure) but its local KeySpace doesn't reflect a re-assignment of the Shard, and we therefore cannot hope to proxy. Resolve fails in this case to encourage the immediate completion of related RPCs, as we're probably also trying to drain the gRPC server.

Functions

func ApplyShards

func ApplyShards(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

ApplyShards invokes the Apply RPC.

func ApplyShardsInBatches

func ApplyShardsInBatches(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest, size int) (*pc.ApplyResponse, error)

ApplyShardsInBatches applies changes to shards which may be larger than the configured etcd transaction size size. The changes in |req| will be sent serially in batches of size |size|. If |size| is 0 all changes will be attempted as part of a single transaction. This function will return the response of the final ShardClient.Apply call. Response validation or !OK status from Apply RPC are mapped to error.

func FetchHints

func FetchHints(ctx context.Context, sc pc.ShardClient, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

FetchHints invokes the Hints RPC, and maps a validation or !OK status to an error.

func ListShards

func ListShards(ctx context.Context, sc pc.ShardClient, req *pc.ListRequest) (*pc.ListResponse, error)

ListShards invokes the List RPC, and maps a validation or !OK status to an error.

func NewKeySpace

func NewKeySpace(prefix string) *keyspace.KeySpace

NewKeySpace returns a KeySpace suitable for use with an Allocator. It decodes allocator Items as ShardSpec messages, Members as ConsumerSpecs, and Assignments as RecoveryStatus enums.

func StatShard

func StatShard(ctx context.Context, rc pc.RoutedShardClient, req *pc.StatRequest) (*pc.StatResponse, error)

StatShard invokes the Stat RPC, and maps a validation or !OK status to an error.

Types

type Application

type Application interface {
	// NewStore constructs a Store for the Shard around the initialize file-
	// system *Recorder. If the ShardSpec does not have a configured recovery
	// log, then *Recorder will be nil.
	NewStore(Shard, *recoverylog.Recorder) (Store, error)
	// NewMessage returns a zero-valued Message of an appropriate representation
	// for the JournalSpec.
	NewMessage(*pb.JournalSpec) (message.Message, error)
	// ConsumeMessage consumes a read-committed message within the scope of a
	// transaction. It should use the provided Publisher to PublishUncommitted
	// messages to other journals. Doing so enables Gazette to properly sequence
	// messages and ensure they are either acknowledged or rolled-back atomically
	// with this consumer transaction.
	ConsumeMessage(Shard, Store, message.Envelope, *message.Publisher) error
	// FinalizeTxn indicates a consumer transaction is ending, and that the
	// Application must flush any in-memory transaction state or caches, and
	// issued any relevant journal writes. At completion all messages must have
	// been published to the provided Publisher, any raw []byte content must
	// have been written to the shard JournalClient(), and all state must be
	// captured by the Store.
	FinalizeTxn(Shard, Store, *message.Publisher) error
}

Application is the interface provided by user applications running as Gazette consumers. Only unrecoverable errors should be returned by Application. A returned error will abort processing of an assigned Shard, and will update the assignment's ReplicaStatus to FAILED.

Gazette consumers process messages within pipelined transactions. A transaction begins upon the first call to ConsumeMessage(), which is invoked for each read-committed message of a source journal. In the course of the transaction many more messages may be passed to ConsumeMessage(). When consuming a message the Application is free to:

1) Begin or continue a transaction with its Store. 2) Publish exactly-once Messages to other journals via the provided Publisher. 3) Append raw at-least-once []bytes via the shard's JournalClient(). 4) To keep in-memory-only aggregates such as counts.

Messages published via PublishUncommitted will be visible to read-committed readers once the consumer transaction completes. Read-uncommitted readers will see them while the transaction continues to run. Similarly writes issued directly through the shard JournalClient() are also readable while the transaction runs.

Eventually, either because of a read stall or the maximum duration is reached, the transaction is closed to new messages and FinalizeTxn() is called. At this point the Application must publish any remaining messages or begin related journal writes, and must flush any in-memory caches or aggregates into its Store transaction (the Application does _not_ need to wait for journal writes to complete).

StartCommit() of the Store is then called with a Checkpoint. For correct exactly-once processing semantics, the Checkpoint must be written in a single transaction alongside all other Store mutations made within the scope of this consumer transaction:

  • Eg for RocksDB, this means all mutations and the Checkpoint must be written within a single "write batch".
  • For a SQL store, verification of the write fence, INSERTS, UPDATES, and the Checkpoint itself are written within a single BEGIN/COMMIT transaction.

Note that other, non-transactional Store mutations are permitted, but will have a weaker at-least-once processing guarantee with respect to Store state. This can make sense for applications filling caches in BigTable, Memcache, or other systems support transactions only over a single key (ie, check-and-set). In this case the Store should apply all mutations, followed by a fenced CAS Checkpoint update. So long as the Checkpoint itself is fenced properly, messages will continue to have exactly-once semantics (though it gets murky if published messages are conditioned on Store states that update at-least-once).

Once the commit completes, acknowledgements of messages published during the transaction are written to applicable journals, which informs downstream readers that those messages have committed and may now be consumed.

Transactions are fully pipelined: once StartCommit() has returned, the next consumer transaction immediately begins processing even though the prior transaction continues to commit in the background (and could even fail). In fact, at this point there's no guarantee that any journal writes of the previous transaction have even _started_, including those to the recovery log.

To make this work safely and correctly, transactions use barriers to ensure that background operations are started and complete in the correct order: for example, that the prior transaction doesn't commit until all its writes to other journals have also completed, and that writes of message acknowledgements don't start until mutations & the checkpoint have committed to the Store.

While those operations complete in the background, the next transaction will consume new messages concurrently. Its one constraint is that it may not itself start to commit until its predecessor transaction has fully completed. This means that transactions will almost always exhibit some degree of batching, which will depend on the rate of incoming messages, the latency to Gazette, and the latency and speed of a utilized external store. If the prior commit takes so long that the successor transaction reaches its maximum duration, then that successor will stall without processing further messages until its predecessor commits. This is the _only_ case under which a consumer can be blocked from processing ready messages.

type BeginFinisher

type BeginFinisher interface {
	// BeginTxn is called to notify the Application that a transaction is beginning
	// (and a call to ConsumeMessage will be immediately forthcoming), allowing
	// the Application to perform any preparatory work. For consumers doing
	// extensive aggregation, it may be beneficial to focus available compute
	// resource on a small number of transactions while completely stalling
	// others: this can be accomplished by blocking in BeginTxn until a semaphore
	// is acquired. A call to BeginTx is always paired with a call to FinishTxn.
	BeginTxn(Shard, Store) error
	// FinishedTxn is notified that a previously begun transaction has started to
	// commit, or has errored. It allows the Application to perform related cleanup
	// (eg, releasing a previously acquired semaphore). Note transactions are
	// pipelined, and commit operations of this transaction may still be ongoing.
	// FinishedTxn can await the provided OpFuture for its final status.
	FinishedTxn(Shard, Store, OpFuture)
}

BeginFinisher is an optional interface of Application which is informed when consumer transactions begin or finish.

type JSONFileStore

type JSONFileStore struct {
	// State is a user-provided instance which is un/marshal-able to JSON.
	State interface{}
	// contains filtered or unexported fields
}

JSONFileStore is a simple Store which materializes itself as a JSON-encoded file. The store is careful to flush to a new temporary file which is then moved to the well-known location: eg, a process failure cannot result in a recovery of a partially written JSON file.

func NewJSONFileStore

func NewJSONFileStore(rec *recoverylog.Recorder, state interface{}) (*JSONFileStore, error)

NewJSONFileStore returns a new JSONFileStore. |state| is the runtime instance of the Store's state, which is decoded into, encoded from, and retained as JSONFileState.State.

func (*JSONFileStore) Destroy

func (s *JSONFileStore) Destroy()

Destroy the JSONFileStore directory and state file.

func (*JSONFileStore) RestoreCheckpoint added in v0.83.1

func (s *JSONFileStore) RestoreCheckpoint(Shard) (pc.Checkpoint, error)

func (*JSONFileStore) StartCommit added in v0.83.1

func (s *JSONFileStore) StartCommit(_ Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

type OpFuture added in v0.83.1

type OpFuture = client.OpFuture

OpFuture represents an operation which is executing in the background. Aliased for brevity from `client` package.

type OpFutures added in v0.83.1

type OpFutures = client.OpFutures

type Resolution

type Resolution struct {
	Status pc.Status
	// Header captures the resolved consumer ProcessId, effective Etcd Revision,
	// and Route of the shard resolution.
	Header pb.Header
	// Spec of the Shard at the current Etcd revision.
	Spec *pc.ShardSpec
	// Shard processing context, or nil if this process is not primary for the ShardID.
	Shard Shard
	// Store of the Shard, or nil if this process is not primary for the ShardID.
	Store Store
	// Done releases Shard & Store, and must be called when no longer needed.
	// Iff Shard & Store are nil, so is Done.
	Done func()
}

Resolution result of a ShardID.

type ResolveArgs

type ResolveArgs struct {
	Context context.Context
	// ShardID to be resolved.
	ShardID pc.ShardID
	// Whether we may resolve to another consumer peer.
	MayProxy bool
	// Optional Header attached to the request from a proxy-ing peer.
	ProxyHeader *pb.Header
	// ReadThrough is journals and offsets which must be reflected in a
	// completed transaction before Resolve returns, blocking if required.
	// Offsets of journals not read by this shard are ignored.
	ReadThrough pb.Offsets
}

ResolveArgs which parametrize resolution of ShardIDs.

type Resolver

type Resolver struct {
	// contains filtered or unexported fields
}

Resolver maps ShardIDs to responsible consumer processes, and manages the set of local shards which are assigned to this process.

func NewResolver

func NewResolver(state *allocator.State, newShard func(keyspace.KeyValue) *shard) *Resolver

NewResolver returns a Resolver derived from the allocator.State, which will use the |newShard| closure to instantiate a *shard for each local Assignment of the local ConsumerSpec.

func (*Resolver) Resolve

func (r *Resolver) Resolve(args ResolveArgs) (res Resolution, err error)

Resolve a ShardID to its Resolution.

type SQLStore added in v0.83.1

type SQLStore struct {
	DB *sql.DB

	// Cache is a provided for application use in the temporary storage of
	// in-memory state associated with a SQLStore. Eg, Cache might store records
	// which have been read and modified this transaction, and which will be
	// written out during FinalizeTxn.
	//
	// The representation of Cache is up to the application; it is not directly
	// used by SQLStore.
	Cache interface{}
	// contains filtered or unexported fields
}

SQLStore is a Store which utilizes an external relational database having a database/sql compatible driver. For each consumer transaction, SQLStore begins a corresponding SQL transaction which may be obtained via Transaction, through which reads and writes to the store should be issued. A table "gazette_checkpoints" is also required, to which consumer checkpoints are persisted, and having a schema like:

CREATE TABLE gazette_checkpoints (
  shard_fqn  TEXT    PRIMARY KEY NOT NULL,
  fence      INTEGER NOT NULL,
  checkpoint BLOB    NOT NULL
);

The user is responsible for creating this table. Exact data types may vary with the store dialect, but "shard_fqn" must be its PRIMARY KEY. StartCommit writes the checkpoint to this table before committing the transaction.

func NewSQLStore added in v0.83.1

func NewSQLStore(db *sql.DB) *SQLStore

NewSQLStore returns a new SQLStore using the *DB.

func (*SQLStore) Destroy added in v0.83.1

func (s *SQLStore) Destroy()

Destroy rolls back an existing transaction.

func (*SQLStore) RestoreCheckpoint added in v0.83.1

func (s *SQLStore) RestoreCheckpoint(shard Shard) (cp pc.Checkpoint, _ error)

RestoreCheckpoint issues a SQL transaction which SELECTS the most recent Checkpoint of this shard FQN and also increments its fence.

func (*SQLStore) StartCommit added in v0.83.1

func (s *SQLStore) StartCommit(shard Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

StartCommit starts a background commit of the current transaction. While it runs, a new transaction may be started. The returned OpFuture will fail if the "fence" column has been modified from the result previously read by RestoreCheckpoint.

func (*SQLStore) Transaction added in v0.83.1

func (s *SQLStore) Transaction(ctx context.Context, txOpts *sql.TxOptions) (_ *sql.Tx, err error)

Transaction returns or (if not already begun) begins a SQL transaction. Optional *TxOptions have an effect only if Transaction begins a new SQL transaction.

type Service

type Service struct {
	// Application served by the Service.
	App Application
	// Resolver of Service shards.
	Resolver *Resolver
	// Distributed allocator state of the service.
	State *allocator.State
	// Loopback connection which defaults to the local server, but is wired with
	// a pb.DispatchBalancer. Consumer applications may use Loopback to proxy
	// application-specific RPCs to peer consumer instance, after performing
	// shard resolution.
	Loopback *grpc.ClientConn
	// Journal client for use by consumer applications.
	Journals pb.RoutedJournalClient
	// Etcd client for use by consumer applications.
	Etcd *clientv3.Client
	// contains filtered or unexported fields
}

Service is the top-level runtime concern of a Gazette Consumer process. It drives local shard processing in response to allocator.State, powers shard resolution, and is also an implementation of ShardServer.

func NewService

func NewService(app Application, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service

NewService constructs a new Service of the Application, driven by allocator.State.

func (*Service) Apply

func (srv *Service) Apply(ctx context.Context, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

Apply dispatches the ShardServer.Apply API.

func (*Service) GetHints

func (srv *Service) GetHints(ctx context.Context, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

GetHints dispatches the ShardServer.Hints API.

func (*Service) List

func (srv *Service) List(ctx context.Context, req *pc.ListRequest) (*pc.ListResponse, error)

List dispatches the ShardServer.List API.

func (*Service) QueueTasks

func (svc *Service) QueueTasks(tasks *task.Group, server *server.Server)

Watch the Service KeySpace and serve any local assignments reflected therein, until the Context is cancelled or an error occurs. Watch shuts down all local shards prior to return regardless of error status.

func (*Service) Stat

func (srv *Service) Stat(ctx context.Context, req *pc.StatRequest) (*pc.StatResponse, error)

Stat dispatches the ShardServer.Stat API.

func (*Service) Stopping

func (svc *Service) Stopping() <-chan struct{}

Stopping returns a channel which signals when the Service is in the process of shutting down. Consumer applications with long-lived RPCs should use this signal to begin graceful cleanup of outstanding RPCs.

type Shard

type Shard interface {
	// Context of this shard. Notably, the Context will be cancelled when this
	// process is no longer responsible for the shard.
	Context() context.Context
	// Fully-qualified name of this shard, which composes the consumer application
	// prefix with this shard's ID. Specifically, this is the Etcd item key of the
	// ShardSpec, which ensures a Shard FQN can only conflict if another consumer
	// deployment, backed by a different Etcd cluster, reuses the same application
	// root and ShardID.
	FQN() string
	// Spec of the shard.
	Spec() *pc.ShardSpec
	// Assignment of the shard to this process.
	Assignment() keyspace.KeyValue
	// JournalClient to be used for raw journal []byte appends made on behalf
	// of this Shard. Consistent use of this client enables Gazette to ensure
	// that all writes issued within a consumer transaction have completed prior
	// to that transaction being committed. Put differently, if a consumed message
	// triggers a raw append to a journal, Gazette can guarantee that append will
	// occur at-least-once no matter how this Shard or brokers may fail.
	JournalClient() client.AsyncJournalClient
	// Progress of the Shard as-of its most recent completed transaction.
	// |readThrough| is offsets of source journals which have been read
	// through. |publishAt| is journals and offsets this Shard has published
	// through, including acknowledgements. If a read message A results in this
	// Shard publishing messages B, and A falls within |readThrough|, then all
	// messages B (& their acknowledgements) fall within |publishAt|.
	//
	// While |readThrough| is comprehensive and persists across Shard faults,
	// note that |publishAt| is *advisory* and not necessarily complete: it
	// includes only journals written to since this Shard was assigned to
	// this process.
	Progress() (readThrough, publishAt pb.Offsets)
}

Shard is the processing context of a ShardSpec which is assigned to the local consumer process.

type Store

type Store interface {
	// StartCommit starts a background, atomic "commit" to the Store of state
	// updates from this transaction along with the Checkpoint. If Store uses an
	// external transactional system, then StartCommit() must fail if another
	// process has invoked RestoreCheckpoint() after *this* Store instance did
	// so. Put differently, Store cannot commit a Checkpoint that another Store
	// may never see (because it continues an earlier Checkpoint that it restored).
	//
	// In general terms, this means StartCommit() must verify a "write fence"
	// previously installed by RestoreCheckpoint() as part of its transaction.
	// Embedded Stores which use recovery logs can rely on the write fence of
	// the log itself.
	//
	// StartCommit may immediately begin a transaction with the external system
	// (if one hasn't already been started from ConsumeMessage()), but cannot
	// allow it to commit until all |waitFor| OpFutures have completed
	// successfully. A failure of one of these OpFutures must also fail this
	// commit.
	//
	// StartCommit() will never be called if the OpFuture returned by a previous
	// StartCommit() hasn't yet completed. However, ConsumeMessage() *will* be
	// called while a previous StartCommit() continues to run. Stores should
	// account for this, typically by starting a new transaction which runs
	// alongside the old.
	StartCommit(_ Shard, _ pc.Checkpoint, waitFor OpFutures) OpFuture
	// RestoreCheckpoint recovers the most recent Checkpoint previously committed
	// to the Store. It is called just once, at Shard start-up. If an external
	// system is used, it should install a transactional "write fence" to ensure
	// that an older Store instance of another process cannot successfully
	// StartCommit() after this RestoreCheckpoint() returns.
	RestoreCheckpoint(Shard) (pc.Checkpoint, error)
	// Destroy releases all resources associated with the Store (eg, local files).
	// It is guaranteed that the Store is no longer being used or referenced at
	// the onset of this call.
	Destroy()
}

Store is a stateful storage backend. Often Stores are implemented as embedded databases which record their file operations into a provided Recorder. Stores which instead utilize an external transactional system (eg, an RDBMS) are also supported. Application implementations control the selection, initialization, and usage of an appropriate Store backend for their use case.

Directories

Path Synopsis
Package protocol defines the consumer datamodel, validation behaviors, and gRPC APIs which are shared across clients and consumer application servers.
Package protocol defines the consumer datamodel, validation behaviors, and gRPC APIs which are shared across clients and consumer application servers.
Package recoverylog specifies a finite state machine for recording and replaying observed filesystem operations into a Gazette journal.
Package recoverylog specifies a finite state machine for recording and replaying observed filesystem operations into a Gazette journal.
Package shardspace provides mechanisms for mapping a collection of ShardSpecs into a minimally-described, semi hierarchical structure, and for mapping back again.
Package shardspace provides mechanisms for mapping a collection of ShardSpecs into a minimally-described, semi hierarchical structure, and for mapping back again.
Package store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance.
Package store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL