consumer

package
v2.0.208+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2019 License: MIT Imports: 33 Imported by: 0

Documentation

Overview

Code generated by mockery v1.0.0

Code generated by mockery v1.0.0

Package consumer is a generated protocol buffer package.

It is generated from these files:
	service.proto

It has these top-level messages:
	Empty
	ConsumerState

Index

Constants

View Source
const (
	// Peer is actively serving the Shard.
	Primary = "primary"
	// Peer is ready to immediately transition to Shard primary.
	Ready = "ready"
	// Peer is still rebuilding from the recovery log.
	Recovering = "recovering"
	// Peer is responsible for a consumer Shard it doesn't know about.
	// This typically happens when topics are removed from a consumer,
	// but remain in (and should be removed from) the consumer's Etcd directory.
	UnknownShard = "unknown-shard"
)

Variables

View Source
var (
	ErrNoSuchConsumerPartition = errors.New("no such consumer partition")
	ErrNoReadyPartitionClient  = errors.New("no ready consumer partition replica client")
)
View Source
var (
	ErrInvalidLengthService = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowService   = fmt.Errorf("proto: integer overflow")
)
View Source
var ConsumerState_Replica_Status_name = map[int32]string{
	0: "INVALID",
	1: "RECOVERING",
	2: "READY",
	3: "PRIMARY",
}
View Source
var ConsumerState_Replica_Status_value = map[string]int32{
	"INVALID":    0,
	"RECOVERING": 1,
	"READY":      2,
	"PRIMARY":    3,
}

Functions

func AppendOffsetKeyEncoding

func AppendOffsetKeyEncoding(b []byte, name journal.Name) []byte

AppendOffsetKeyEncoding encodes |name| into a database key representing a consumer journal offset checkpoint. A |name| of "" will generate a key which prefixes all other offset key encodings.

func AppendOffsetValueEncoding

func AppendOffsetValueEncoding(b []byte, offset int64) []byte

AppendOffsetValueEncoding encodes |offset| into a database value representing a consumer journal offset checkpoint.

func BlockUntilShardsAreServing

func BlockUntilShardsAreServing(inspector consensus.Inspector)

BlockUntilShardsAreServing uses |inspector| to observe the consumer allocator tree, and returns only after all shards are being served. This is test-support function.

func EnumerateShards

func EnumerateShards(c Consumer) map[ShardID]topic.Partition

EnumerateShards returns a mapping of unique ShardIDs and their Partitions implied by the Consumer and its set of consumed Topics.

func LoadOffsetsFromDB

func LoadOffsetsFromDB(db *rocks.DB, dbRO *rocks.ReadOptions) (map[journal.Name]int64, error)

Loads from |db| offsets previously serialized by storeAndClearOffsets.

func LoadOffsetsFromEtcd

func LoadOffsetsFromEtcd(tree *etcd.Node) (map[journal.Name]int64, error)

Loads legacy offsets stored in Etcd under |tree|.

func OffsetPath

func OffsetPath(consumerPath string, name journal.Name) string

func RegisterConsumerServer

func RegisterConsumerServer(s *grpc.Server, srv ConsumerServer)

func ResetShardsToJournalHeads

func ResetShardsToJournalHeads(runner *Runner) error

Test support function. Initializes all shards of |runner| to empty database which begin consumption from the current write-head of each topic journal.

func StoreOffsetsToEtcd deprecated

func StoreOffsetsToEtcd(rootPath string, offsets map[journal.Name]int64, keysAPI etcd.KeysAPI)

Deprecated: We are removing support for offsets written to Etcd. Rocksdb will be the sole source-of-truth for read offsets. Stores legacy |offsets| in Etcd.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client interacts with a Gazette Consumer to maintain an updated pool of GRPC client connections to live Consumer endpoints, along with a local snapshot of Consumer topology for selecting appropriate endpoints.

func NewClient

func NewClient(endpoint string) (*Client, error)

func (*Client) Close

func (c *Client) Close()

Close shuts down a Client.

func (*Client) Invalidate

func (c *Client) Invalidate()

Invalidate triggers an immediate refresh of Client state.

func (*Client) PartitionClient

func (c *Client) PartitionClient(partition journal.Name) (*grpc.ClientConn, ConsumerState_Shard, error)

PartitionClient maps |partition| to its live endpoint and ConsumerState_Shard.

func (*Client) State

func (c *Client) State() ConsumerState

type Consumer

type Consumer interface {
	// Topics this Consumer is consuming.
	Topics() []*topic.Description

	// Called when a message becomes available from one of the consumer’s
	// joined topics. If the returned error is non-nil, the Shard is assumed to
	// be in an unhealthy state and will be torn down.
	Consume(topic.Envelope, Shard, *topic.Publisher) error

	// Called when a consumer transaction is about to complete. If the Shard
	// Cache() contains any modified state, it must be persisted to Transaction()
	// during this call. As in Consume(), a returned error will result in the
	// tear-down of the Shard.
	Flush(Shard, *topic.Publisher) error
}

type ConsumerClient

type ConsumerClient interface {
	// CurrentConsumerState returns a snapshot of the current ConsumerState.
	CurrentConsumerState(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ConsumerState, error)
}

func NewConsumerClient

func NewConsumerClient(cc *grpc.ClientConn) ConsumerClient

type ConsumerServer

type ConsumerServer interface {
	// CurrentConsumerState returns a snapshot of the current ConsumerState.
	CurrentConsumerState(context.Context, *Empty) (*ConsumerState, error)
}

type ConsumerState

type ConsumerState struct {
	// Etcd path which roots this consumer.
	Root string `protobuf:"bytes,1,opt,name=root,proto3" json:"root,omitempty"`
	// Identifier which uniquely identifies this consumer instance.
	LocalRouteKey string `protobuf:"bytes,2,opt,name=local_route_key,json=localRouteKey,proto3" json:"local_route_key,omitempty"`
	// Degree of Shard stand-by replication employed by the consumer.
	ReplicaCount int32 `protobuf:"varint,3,opt,name=replica_count,json=replicaCount,proto3" json:"replica_count,omitempty"`
	// All live peer endpoints of the consumer, in sorted "host:port" network format.
	Endpoints []string `protobuf:"bytes,4,rep,name=endpoints" json:"endpoints,omitempty"`
	// All Shards of this Consumer, in sorted Shard id order.
	Shards []ConsumerState_Shard `protobuf:"bytes,5,rep,name=shards" json:"shards"`
}

ConsumerState is a snapshot of the state of a Gazette consumer, including all shards of the consumer and the current set of shard replicas.

func (*ConsumerState) Descriptor

func (*ConsumerState) Descriptor() ([]byte, []int)

func (*ConsumerState) GetEndpoints

func (m *ConsumerState) GetEndpoints() []string

func (*ConsumerState) GetLocalRouteKey

func (m *ConsumerState) GetLocalRouteKey() string

func (*ConsumerState) GetReplicaCount

func (m *ConsumerState) GetReplicaCount() int32

func (*ConsumerState) GetRoot

func (m *ConsumerState) GetRoot() string

func (*ConsumerState) GetShards

func (m *ConsumerState) GetShards() []ConsumerState_Shard

func (*ConsumerState) Marshal

func (m *ConsumerState) Marshal() (dAtA []byte, err error)

func (*ConsumerState) MarshalTo

func (m *ConsumerState) MarshalTo(dAtA []byte) (int, error)

func (*ConsumerState) ProtoMessage

func (*ConsumerState) ProtoMessage()

func (*ConsumerState) Reset

func (m *ConsumerState) Reset()

func (*ConsumerState) Size

func (m *ConsumerState) Size() (n int)

func (*ConsumerState) String

func (m *ConsumerState) String() string

func (*ConsumerState) Unmarshal

func (m *ConsumerState) Unmarshal(dAtA []byte) error

type ConsumerState_Replica

type ConsumerState_Replica struct {
	// Addressable endpoint of the replica, in "host:port" network format.
	Endpoint string                       `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
	Status   ConsumerState_Replica_Status `protobuf:"varint,2,opt,name=status,proto3,enum=consumer.ConsumerState_Replica_Status" json:"status,omitempty"`
}

func (*ConsumerState_Replica) Descriptor

func (*ConsumerState_Replica) Descriptor() ([]byte, []int)

func (*ConsumerState_Replica) GetEndpoint

func (m *ConsumerState_Replica) GetEndpoint() string

func (*ConsumerState_Replica) GetStatus

func (*ConsumerState_Replica) Marshal

func (m *ConsumerState_Replica) Marshal() (dAtA []byte, err error)

func (*ConsumerState_Replica) MarshalTo

func (m *ConsumerState_Replica) MarshalTo(dAtA []byte) (int, error)

func (*ConsumerState_Replica) ProtoMessage

func (*ConsumerState_Replica) ProtoMessage()

func (*ConsumerState_Replica) Reset

func (m *ConsumerState_Replica) Reset()

func (*ConsumerState_Replica) Size

func (m *ConsumerState_Replica) Size() (n int)

func (*ConsumerState_Replica) String

func (m *ConsumerState_Replica) String() string

func (*ConsumerState_Replica) Unmarshal

func (m *ConsumerState_Replica) Unmarshal(dAtA []byte) error

type ConsumerState_Replica_Status

type ConsumerState_Replica_Status int32

Status of the replica shard.

const (
	ConsumerState_Replica_INVALID    ConsumerState_Replica_Status = 0
	ConsumerState_Replica_RECOVERING ConsumerState_Replica_Status = 1
	ConsumerState_Replica_READY      ConsumerState_Replica_Status = 2
	ConsumerState_Replica_PRIMARY    ConsumerState_Replica_Status = 3
)

func (ConsumerState_Replica_Status) EnumDescriptor

func (ConsumerState_Replica_Status) EnumDescriptor() ([]byte, []int)

func (ConsumerState_Replica_Status) String

type ConsumerState_Shard

type ConsumerState_Shard struct {
	// The unique ID of this Shard.
	Id ShardID `protobuf:"bytes,1,opt,name=id,proto3,casttype=ShardID" json:"id,omitempty"`
	// The topic name of this Shard.
	Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
	// The journal name of this Shard's topic partition.
	Partition github_com_LiveRamp_gazette_pkg_journal.Name `protobuf:"bytes,3,opt,name=partition,proto3,casttype=github.com/LiveRamp/gazette/pkg/journal.Name" json:"partition,omitempty"`
	// Assigned replicas and their processing status.
	Replicas []ConsumerState_Replica `protobuf:"bytes,5,rep,name=replicas" json:"replicas"`
}

func (*ConsumerState_Shard) Descriptor

func (*ConsumerState_Shard) Descriptor() ([]byte, []int)

func (*ConsumerState_Shard) GetId

func (m *ConsumerState_Shard) GetId() ShardID

func (*ConsumerState_Shard) GetPartition

func (*ConsumerState_Shard) GetReplicas

func (m *ConsumerState_Shard) GetReplicas() []ConsumerState_Replica

func (*ConsumerState_Shard) GetTopic

func (m *ConsumerState_Shard) GetTopic() string

func (*ConsumerState_Shard) Marshal

func (m *ConsumerState_Shard) Marshal() (dAtA []byte, err error)

func (*ConsumerState_Shard) MarshalTo

func (m *ConsumerState_Shard) MarshalTo(dAtA []byte) (int, error)

func (*ConsumerState_Shard) ProtoMessage

func (*ConsumerState_Shard) ProtoMessage()

func (*ConsumerState_Shard) Reset

func (m *ConsumerState_Shard) Reset()

func (*ConsumerState_Shard) Size

func (m *ConsumerState_Shard) Size() (n int)

func (*ConsumerState_Shard) String

func (m *ConsumerState_Shard) String() string

func (*ConsumerState_Shard) Unmarshal

func (m *ConsumerState_Shard) Unmarshal(dAtA []byte) error

type Empty

type Empty struct {
}

Empty is an empty message, which exists to support RPC APIs taking no arguments.

func (*Empty) Descriptor

func (*Empty) Descriptor() ([]byte, []int)

func (*Empty) Marshal

func (m *Empty) Marshal() (dAtA []byte, err error)

func (*Empty) MarshalTo

func (m *Empty) MarshalTo(dAtA []byte) (int, error)

func (*Empty) ProtoMessage

func (*Empty) ProtoMessage()

func (*Empty) Reset

func (m *Empty) Reset()

func (*Empty) Size

func (m *Empty) Size() (n int)

func (*Empty) String

func (m *Empty) String() string

func (*Empty) Unmarshal

func (m *Empty) Unmarshal(dAtA []byte) error

type Filterer

type Filterer interface {
	Filter(level int, key, val []byte) (remove bool, newVal []byte)
}

Optional Consumer interface for implementations which prune or expire keys & values from the Shard database on a Consumer-specific criteria. This interface intentionally overlaps with `rocks.CompactionFilter`.

type MockConsumerServer

type MockConsumerServer struct {
	mock.Mock
}

MockConsumerServer is an autogenerated mock type for the ConsumerServer type

func (*MockConsumerServer) CurrentConsumerState

func (_m *MockConsumerServer) CurrentConsumerState(_a0 context.Context, _a1 *Empty) (*ConsumerState, error)

CurrentConsumerState provides a mock function with given fields: _a0, _a1

type MockShard

type MockShard struct {
	mock.Mock
}

MockShard is an autogenerated mock type for the Shard type

func (*MockShard) Cache

func (_m *MockShard) Cache() interface{}

Cache provides a mock function with given fields:

func (*MockShard) Database

func (_m *MockShard) Database() *gorocksdb.DB

Database provides a mock function with given fields:

func (*MockShard) ID

func (_m *MockShard) ID() ShardID

ID provides a mock function with given fields:

func (*MockShard) Partition

func (_m *MockShard) Partition() topic.Partition

Partition provides a mock function with given fields:

func (*MockShard) ReadOptions

func (_m *MockShard) ReadOptions() *gorocksdb.ReadOptions

ReadOptions provides a mock function with given fields:

func (*MockShard) SetCache

func (_m *MockShard) SetCache(_a0 interface{})

SetCache provides a mock function with given fields: _a0

func (*MockShard) Transaction

func (_m *MockShard) Transaction() *gorocksdb.WriteBatch

Transaction provides a mock function with given fields:

func (*MockShard) WriteOptions

func (_m *MockShard) WriteOptions() *gorocksdb.WriteOptions

WriteOptions provides a mock function with given fields:

type OptionsIniter

type OptionsIniter interface {
	InitOptions(*rocks.Options)
}

Optional Consumer interface for customization of Shard database options prior to initial open.

type Runner

type Runner struct {
	Consumer Consumer
	// An identifier for this particular runner. Eg, the hostname.
	LocalRouteKey string
	// Base local directory into which shards should be staged.
	LocalDir string
	// Base path in Etcd via which the consumer should coordinate.
	ConsumerRoot string
	// Base journal path for recovery logs of this consumer.
	RecoveryLogRoot string
	// Required number of replicas of the consumer.
	ReplicaCount int

	Etcd    etcd.Client
	Gazette journal.Client

	// Optional hooks for notification of Shard lifecycle. These are largely
	// intended to facilitate testing cases.
	ShardPostInitHook    func(Shard)
	ShardPostConsumeHook func(topic.Envelope, Shard)
	ShardPostCommitHook  func(Shard)
	ShardPostStopHook    func(Shard)
	// contains filtered or unexported fields
}

func (*Runner) CurrentConsumerState

func (r *Runner) CurrentConsumerState(context.Context, *Empty) (*ConsumerState, error)

func (*Runner) FixedItems

func (r *Runner) FixedItems() []string

consumer.Allocator implementation.

func (*Runner) InspectChan

func (r *Runner) InspectChan() chan func(*etcd.Node)

func (*Runner) InstanceKey

func (r *Runner) InstanceKey() string

func (*Runner) ItemIsReadyForPromotion

func (r *Runner) ItemIsReadyForPromotion(item, state string) bool

func (*Runner) ItemRoute

func (r *Runner) ItemRoute(name string, rt consensus.Route, index int, tree *etcd.Node)

func (*Runner) ItemState

func (r *Runner) ItemState(name string) string

func (*Runner) KeysAPI

func (r *Runner) KeysAPI() etcd.KeysAPI

func (*Runner) PathRoot

func (r *Runner) PathRoot() string

func (*Runner) Replicas

func (r *Runner) Replicas() int

func (*Runner) Run

func (r *Runner) Run() error

type Shard

type Shard interface {
	// The concrete ID of this Shard.
	ID() ShardID
	// The consumed Partition of this Shard.
	Partition() topic.Partition

	// A consumer may wish to maintain in-memory state for
	// performance reasons. Examples could include:
	//  * Objects we’re reducing over, for which we wish to avoid
	//    excessive database writes.
	//  * An LRU of "hot" objects we expect to reference again soon.
	// However, to guarantee required transactionality properties,
	// consumers must be careful not to mix states between shards.
	// |Cache| is available to consumers for shard-level isolation
	// of a consumer-specific local memory context.
	Cache() interface{}
	SetCache(interface{})

	// Returns the database of the Shard.
	Database() *rocks.DB

	// Current Transaction of the consumer shard. All writes issued through
	// Transaction will commit atomically and be check-pointed with consumed
	// Journal offsets. This provides exactly-once processing of Journal content
	// (though note that Gazette is itself an at-least once system, and Journal
	// writes themselves could be duplicated). Writes may be done directly to
	// the database, in which case they will be applied at-least once (for
	// example, because a Shard is recovered to a state after a write was applied
	// but before corresponding Journal offsets were written).
	Transaction() *rocks.WriteBatch

	// Returns initialized read and write options for the database.
	ReadOptions() *rocks.ReadOptions
	WriteOptions() *rocks.WriteOptions
}

type ShardHalter

type ShardHalter interface {
	HaltShard(Shard)
}

Optional Consumer interface for notification the Shard is no longer being processed by this Consumer. No further Consume or Flush calls will occur, nor will further writes to the recovery log. A common use case is to hint to the Consumer that external resources or connections associated with the Shard should be released.

type ShardID

type ShardID string

ShardID uniquely identifies a specific Shard. A ShardID must be consistent across processes for the entire duration of the Consumer lifetime.

func ShardName_DEPRECATED

func ShardName_DEPRECATED(p topic.Partition) ShardID

Generates a ShardID for a topic.Partition. Shards are named as:

"shard-{path.Base(topic.Name)}-{Base(partition)}". As a special case, if the

partition was created using standard "my-topic/part-123" enumeration, the "part-" prefix of the journal base name is removed resulting in final ShardIDs like "shard-my-topic-123".

This method is deprecated, but maintained for compatibility with existing ShardIDs used in production. TODO(johnny): Move to a globally unique ShardID, which is content-addressed from the (Consumer, Topic, Journal)-tuple names.

func (ShardID) String

func (id ShardID) String() string

type ShardIndex

type ShardIndex struct {
	// contains filtered or unexported fields
}

ShardIndex tracks Shard instances by ShardID. It provides for acquisition of a Shard instance by ID, and management of Shard tear-down by maintaining reference counts of currently-acquired Shards. This is useful for building APIs which query against consumer databases.

func (*ShardIndex) AcquireShard

func (i *ShardIndex) AcquireShard(id ShardID) (shard Shard, ok bool)

AcquireShard queries for a live, mastered Shard of |id|. If found, the returned |shard| is locked from tear-down (eg, due to membership change) and must be released via ReleaseShard.

func (*ShardIndex) AddShard

func (i *ShardIndex) AddShard(shard Shard)

TODO(johnny): Remove these. They're deprecated names which are being supported for the moment to avoid unrelated churning in the patch set.

func (*ShardIndex) DeindexShard

func (i *ShardIndex) DeindexShard(shard Shard)

DeindexShard drops |shard| from the index, blocking until all obtained references have been released. shard.ID() must be indexed. DeindexShard is exported to facilitate testing, but clients should generally use RegisterWithRunner and not call DeindexShard directly.

func (*ShardIndex) IndexShard

func (i *ShardIndex) IndexShard(shard Shard)

IndexShard adds |shard| to the index. shard.ID() must not already be indexed. IndexShard is exported to facilitate testing, but clients should generally use RegisterWithRunner and not call IndexShard directly.

func (*ShardIndex) RegisterWithRunner

func (i *ShardIndex) RegisterWithRunner(r *Runner)

RegisterWithRunner registers the ShardIndex to watch the Runner, indexing the live set of mastered Shards.

func (*ShardIndex) ReleaseShard

func (i *ShardIndex) ReleaseShard(shard Shard)

ReleaseShard releases a previously obtained Shard, allowing tear-down to occur if the Shard membership status has changed and all references have been released.

func (*ShardIndex) RemoveShard

func (i *ShardIndex) RemoveShard(shard Shard)

type ShardIniter

type ShardIniter interface {
	InitShard(Shard) error
}

Optional Consumer interface for notification of Shard initialization prior to an initial Consume. A common use case is to initialize the shard cache.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL