Documentation ¶
Overview ¶
Code generated by mockery v1.0.0
Code generated by mockery v1.0.0
Package consumer is a generated protocol buffer package. It is generated from these files: service.proto It has these top-level messages: Empty ConsumerState
Index ¶
- Constants
- Variables
- func AppendOffsetKeyEncoding(b []byte, name journal.Name) []byte
- func AppendOffsetValueEncoding(b []byte, offset int64) []byte
- func BlockUntilShardsAreServing(inspector consensus.Inspector)
- func EnumerateShards(c Consumer) map[ShardID]topic.Partition
- func LoadOffsetsFromDB(db *rocks.DB, dbRO *rocks.ReadOptions) (map[journal.Name]int64, error)
- func LoadOffsetsFromEtcd(tree *etcd.Node) (map[journal.Name]int64, error)
- func OffsetPath(consumerPath string, name journal.Name) string
- func RegisterConsumerServer(s *grpc.Server, srv ConsumerServer)
- func ResetShardsToJournalHeads(runner *Runner) error
- func StoreOffsetsToEtcd(rootPath string, offsets map[journal.Name]int64, keysAPI etcd.KeysAPI)deprecated
- type Client
- type Consumer
- type ConsumerClient
- type ConsumerServer
- type ConsumerState
- func (*ConsumerState) Descriptor() ([]byte, []int)
- func (m *ConsumerState) GetEndpoints() []string
- func (m *ConsumerState) GetLocalRouteKey() string
- func (m *ConsumerState) GetReplicaCount() int32
- func (m *ConsumerState) GetRoot() string
- func (m *ConsumerState) GetShards() []ConsumerState_Shard
- func (m *ConsumerState) Marshal() (dAtA []byte, err error)
- func (m *ConsumerState) MarshalTo(dAtA []byte) (int, error)
- func (*ConsumerState) ProtoMessage()
- func (m *ConsumerState) Reset()
- func (m *ConsumerState) Size() (n int)
- func (m *ConsumerState) String() string
- func (m *ConsumerState) Unmarshal(dAtA []byte) error
- type ConsumerState_Replica
- func (*ConsumerState_Replica) Descriptor() ([]byte, []int)
- func (m *ConsumerState_Replica) GetEndpoint() string
- func (m *ConsumerState_Replica) GetStatus() ConsumerState_Replica_Status
- func (m *ConsumerState_Replica) Marshal() (dAtA []byte, err error)
- func (m *ConsumerState_Replica) MarshalTo(dAtA []byte) (int, error)
- func (*ConsumerState_Replica) ProtoMessage()
- func (m *ConsumerState_Replica) Reset()
- func (m *ConsumerState_Replica) Size() (n int)
- func (m *ConsumerState_Replica) String() string
- func (m *ConsumerState_Replica) Unmarshal(dAtA []byte) error
- type ConsumerState_Replica_Status
- type ConsumerState_Shard
- func (*ConsumerState_Shard) Descriptor() ([]byte, []int)
- func (m *ConsumerState_Shard) GetId() ShardID
- func (m *ConsumerState_Shard) GetPartition() github_com_LiveRamp_gazette_pkg_journal.Name
- func (m *ConsumerState_Shard) GetReplicas() []ConsumerState_Replica
- func (m *ConsumerState_Shard) GetTopic() string
- func (m *ConsumerState_Shard) Marshal() (dAtA []byte, err error)
- func (m *ConsumerState_Shard) MarshalTo(dAtA []byte) (int, error)
- func (*ConsumerState_Shard) ProtoMessage()
- func (m *ConsumerState_Shard) Reset()
- func (m *ConsumerState_Shard) Size() (n int)
- func (m *ConsumerState_Shard) String() string
- func (m *ConsumerState_Shard) Unmarshal(dAtA []byte) error
- type Empty
- func (*Empty) Descriptor() ([]byte, []int)
- func (m *Empty) Marshal() (dAtA []byte, err error)
- func (m *Empty) MarshalTo(dAtA []byte) (int, error)
- func (*Empty) ProtoMessage()
- func (m *Empty) Reset()
- func (m *Empty) Size() (n int)
- func (m *Empty) String() string
- func (m *Empty) Unmarshal(dAtA []byte) error
- type Filterer
- type MockConsumerServer
- type MockShard
- func (_m *MockShard) Cache() interface{}
- func (_m *MockShard) Database() *gorocksdb.DB
- func (_m *MockShard) ID() ShardID
- func (_m *MockShard) Partition() topic.Partition
- func (_m *MockShard) ReadOptions() *gorocksdb.ReadOptions
- func (_m *MockShard) SetCache(_a0 interface{})
- func (_m *MockShard) Transaction() *gorocksdb.WriteBatch
- func (_m *MockShard) WriteOptions() *gorocksdb.WriteOptions
- type OptionsIniter
- type Runner
- func (r *Runner) CurrentConsumerState(context.Context, *Empty) (*ConsumerState, error)
- func (r *Runner) FixedItems() []string
- func (r *Runner) InspectChan() chan func(*etcd.Node)
- func (r *Runner) InstanceKey() string
- func (r *Runner) ItemIsReadyForPromotion(item, state string) bool
- func (r *Runner) ItemRoute(name string, rt consensus.Route, index int, tree *etcd.Node)
- func (r *Runner) ItemState(name string) string
- func (r *Runner) KeysAPI() etcd.KeysAPI
- func (r *Runner) PathRoot() string
- func (r *Runner) Replicas() int
- func (r *Runner) Run() error
- type Shard
- type ShardHalter
- type ShardID
- type ShardIndex
- func (i *ShardIndex) AcquireShard(id ShardID) (shard Shard, ok bool)
- func (i *ShardIndex) AddShard(shard Shard)
- func (i *ShardIndex) DeindexShard(shard Shard)
- func (i *ShardIndex) IndexShard(shard Shard)
- func (i *ShardIndex) RegisterWithRunner(r *Runner)
- func (i *ShardIndex) ReleaseShard(shard Shard)
- func (i *ShardIndex) RemoveShard(shard Shard)
- type ShardIniter
Constants ¶
const ( // Peer is actively serving the Shard. Primary = "primary" // Peer is ready to immediately transition to Shard primary. Ready = "ready" // Peer is still rebuilding from the recovery log. Recovering = "recovering" // Peer is responsible for a consumer Shard it doesn't know about. // This typically happens when topics are removed from a consumer, // but remain in (and should be removed from) the consumer's Etcd directory. UnknownShard = "unknown-shard" )
Variables ¶
var ( ErrNoSuchConsumerPartition = errors.New("no such consumer partition") ErrNoReadyPartitionClient = errors.New("no ready consumer partition replica client") )
var ( ErrInvalidLengthService = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowService = fmt.Errorf("proto: integer overflow") )
var ConsumerState_Replica_Status_name = map[int32]string{
0: "INVALID",
1: "RECOVERING",
2: "READY",
3: "PRIMARY",
}
var ConsumerState_Replica_Status_value = map[string]int32{
"INVALID": 0,
"RECOVERING": 1,
"READY": 2,
"PRIMARY": 3,
}
Functions ¶
func AppendOffsetKeyEncoding ¶
AppendOffsetKeyEncoding encodes |name| into a database key representing a consumer journal offset checkpoint. A |name| of "" will generate a key which prefixes all other offset key encodings.
func AppendOffsetValueEncoding ¶
AppendOffsetValueEncoding encodes |offset| into a database value representing a consumer journal offset checkpoint.
func BlockUntilShardsAreServing ¶
BlockUntilShardsAreServing uses |inspector| to observe the consumer allocator tree, and returns only after all shards are being served. This is test-support function.
func EnumerateShards ¶
EnumerateShards returns a mapping of unique ShardIDs and their Partitions implied by the Consumer and its set of consumed Topics.
func LoadOffsetsFromDB ¶
Loads from |db| offsets previously serialized by storeAndClearOffsets.
func LoadOffsetsFromEtcd ¶
Loads legacy offsets stored in Etcd under |tree|.
func RegisterConsumerServer ¶
func RegisterConsumerServer(s *grpc.Server, srv ConsumerServer)
func ResetShardsToJournalHeads ¶
Test support function. Initializes all shards of |runner| to empty database which begin consumption from the current write-head of each topic journal.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client interacts with a Gazette Consumer to maintain an updated pool of GRPC client connections to live Consumer endpoints, along with a local snapshot of Consumer topology for selecting appropriate endpoints.
func (*Client) Invalidate ¶
func (c *Client) Invalidate()
Invalidate triggers an immediate refresh of Client state.
func (*Client) PartitionClient ¶
func (c *Client) PartitionClient(partition journal.Name) (*grpc.ClientConn, ConsumerState_Shard, error)
PartitionClient maps |partition| to its live endpoint and ConsumerState_Shard.
func (*Client) State ¶
func (c *Client) State() ConsumerState
type Consumer ¶
type Consumer interface { // Topics this Consumer is consuming. Topics() []*topic.Description // Called when a message becomes available from one of the consumer’s // joined topics. If the returned error is non-nil, the Shard is assumed to // be in an unhealthy state and will be torn down. Consume(topic.Envelope, Shard, *topic.Publisher) error // Called when a consumer transaction is about to complete. If the Shard // Cache() contains any modified state, it must be persisted to Transaction() // during this call. As in Consume(), a returned error will result in the // tear-down of the Shard. Flush(Shard, *topic.Publisher) error }
type ConsumerClient ¶
type ConsumerClient interface { // CurrentConsumerState returns a snapshot of the current ConsumerState. CurrentConsumerState(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*ConsumerState, error) }
func NewConsumerClient ¶
func NewConsumerClient(cc *grpc.ClientConn) ConsumerClient
type ConsumerServer ¶
type ConsumerServer interface { // CurrentConsumerState returns a snapshot of the current ConsumerState. CurrentConsumerState(context.Context, *Empty) (*ConsumerState, error) }
type ConsumerState ¶
type ConsumerState struct { // Etcd path which roots this consumer. Root string `protobuf:"bytes,1,opt,name=root,proto3" json:"root,omitempty"` // Identifier which uniquely identifies this consumer instance. LocalRouteKey string `protobuf:"bytes,2,opt,name=local_route_key,json=localRouteKey,proto3" json:"local_route_key,omitempty"` // Degree of Shard stand-by replication employed by the consumer. ReplicaCount int32 `protobuf:"varint,3,opt,name=replica_count,json=replicaCount,proto3" json:"replica_count,omitempty"` // All live peer endpoints of the consumer, in sorted "host:port" network format. Endpoints []string `protobuf:"bytes,4,rep,name=endpoints" json:"endpoints,omitempty"` // All Shards of this Consumer, in sorted Shard id order. Shards []ConsumerState_Shard `protobuf:"bytes,5,rep,name=shards" json:"shards"` }
ConsumerState is a snapshot of the state of a Gazette consumer, including all shards of the consumer and the current set of shard replicas.
func (*ConsumerState) Descriptor ¶
func (*ConsumerState) Descriptor() ([]byte, []int)
func (*ConsumerState) GetEndpoints ¶
func (m *ConsumerState) GetEndpoints() []string
func (*ConsumerState) GetLocalRouteKey ¶
func (m *ConsumerState) GetLocalRouteKey() string
func (*ConsumerState) GetReplicaCount ¶
func (m *ConsumerState) GetReplicaCount() int32
func (*ConsumerState) GetRoot ¶
func (m *ConsumerState) GetRoot() string
func (*ConsumerState) GetShards ¶
func (m *ConsumerState) GetShards() []ConsumerState_Shard
func (*ConsumerState) Marshal ¶
func (m *ConsumerState) Marshal() (dAtA []byte, err error)
func (*ConsumerState) ProtoMessage ¶
func (*ConsumerState) ProtoMessage()
func (*ConsumerState) Reset ¶
func (m *ConsumerState) Reset()
func (*ConsumerState) Size ¶
func (m *ConsumerState) Size() (n int)
func (*ConsumerState) String ¶
func (m *ConsumerState) String() string
func (*ConsumerState) Unmarshal ¶
func (m *ConsumerState) Unmarshal(dAtA []byte) error
type ConsumerState_Replica ¶
type ConsumerState_Replica struct { // Addressable endpoint of the replica, in "host:port" network format. Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` Status ConsumerState_Replica_Status `protobuf:"varint,2,opt,name=status,proto3,enum=consumer.ConsumerState_Replica_Status" json:"status,omitempty"` }
func (*ConsumerState_Replica) Descriptor ¶
func (*ConsumerState_Replica) Descriptor() ([]byte, []int)
func (*ConsumerState_Replica) GetEndpoint ¶
func (m *ConsumerState_Replica) GetEndpoint() string
func (*ConsumerState_Replica) GetStatus ¶
func (m *ConsumerState_Replica) GetStatus() ConsumerState_Replica_Status
func (*ConsumerState_Replica) Marshal ¶
func (m *ConsumerState_Replica) Marshal() (dAtA []byte, err error)
func (*ConsumerState_Replica) MarshalTo ¶
func (m *ConsumerState_Replica) MarshalTo(dAtA []byte) (int, error)
func (*ConsumerState_Replica) ProtoMessage ¶
func (*ConsumerState_Replica) ProtoMessage()
func (*ConsumerState_Replica) Reset ¶
func (m *ConsumerState_Replica) Reset()
func (*ConsumerState_Replica) Size ¶
func (m *ConsumerState_Replica) Size() (n int)
func (*ConsumerState_Replica) String ¶
func (m *ConsumerState_Replica) String() string
func (*ConsumerState_Replica) Unmarshal ¶
func (m *ConsumerState_Replica) Unmarshal(dAtA []byte) error
type ConsumerState_Replica_Status ¶
type ConsumerState_Replica_Status int32
Status of the replica shard.
const ( ConsumerState_Replica_INVALID ConsumerState_Replica_Status = 0 ConsumerState_Replica_RECOVERING ConsumerState_Replica_Status = 1 ConsumerState_Replica_READY ConsumerState_Replica_Status = 2 ConsumerState_Replica_PRIMARY ConsumerState_Replica_Status = 3 )
func (ConsumerState_Replica_Status) EnumDescriptor ¶
func (ConsumerState_Replica_Status) EnumDescriptor() ([]byte, []int)
func (ConsumerState_Replica_Status) String ¶
func (x ConsumerState_Replica_Status) String() string
type ConsumerState_Shard ¶
type ConsumerState_Shard struct { // The unique ID of this Shard. Id ShardID `protobuf:"bytes,1,opt,name=id,proto3,casttype=ShardID" json:"id,omitempty"` // The topic name of this Shard. Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` // The journal name of this Shard's topic partition. Partition github_com_LiveRamp_gazette_pkg_journal.Name `protobuf:"bytes,3,opt,name=partition,proto3,casttype=github.com/LiveRamp/gazette/pkg/journal.Name" json:"partition,omitempty"` // Assigned replicas and their processing status. Replicas []ConsumerState_Replica `protobuf:"bytes,5,rep,name=replicas" json:"replicas"` }
func (*ConsumerState_Shard) Descriptor ¶
func (*ConsumerState_Shard) Descriptor() ([]byte, []int)
func (*ConsumerState_Shard) GetId ¶
func (m *ConsumerState_Shard) GetId() ShardID
func (*ConsumerState_Shard) GetPartition ¶
func (m *ConsumerState_Shard) GetPartition() github_com_LiveRamp_gazette_pkg_journal.Name
func (*ConsumerState_Shard) GetReplicas ¶
func (m *ConsumerState_Shard) GetReplicas() []ConsumerState_Replica
func (*ConsumerState_Shard) GetTopic ¶
func (m *ConsumerState_Shard) GetTopic() string
func (*ConsumerState_Shard) Marshal ¶
func (m *ConsumerState_Shard) Marshal() (dAtA []byte, err error)
func (*ConsumerState_Shard) MarshalTo ¶
func (m *ConsumerState_Shard) MarshalTo(dAtA []byte) (int, error)
func (*ConsumerState_Shard) ProtoMessage ¶
func (*ConsumerState_Shard) ProtoMessage()
func (*ConsumerState_Shard) Reset ¶
func (m *ConsumerState_Shard) Reset()
func (*ConsumerState_Shard) Size ¶
func (m *ConsumerState_Shard) Size() (n int)
func (*ConsumerState_Shard) String ¶
func (m *ConsumerState_Shard) String() string
func (*ConsumerState_Shard) Unmarshal ¶
func (m *ConsumerState_Shard) Unmarshal(dAtA []byte) error
type Empty ¶
type Empty struct { }
Empty is an empty message, which exists to support RPC APIs taking no arguments.
func (*Empty) Descriptor ¶
func (*Empty) ProtoMessage ¶
func (*Empty) ProtoMessage()
type Filterer ¶
Optional Consumer interface for implementations which prune or expire keys & values from the Shard database on a Consumer-specific criteria. This interface intentionally overlaps with `rocks.CompactionFilter`.
type MockConsumerServer ¶
MockConsumerServer is an autogenerated mock type for the ConsumerServer type
func (*MockConsumerServer) CurrentConsumerState ¶
func (_m *MockConsumerServer) CurrentConsumerState(_a0 context.Context, _a1 *Empty) (*ConsumerState, error)
CurrentConsumerState provides a mock function with given fields: _a0, _a1
type MockShard ¶
MockShard is an autogenerated mock type for the Shard type
func (*MockShard) Cache ¶
func (_m *MockShard) Cache() interface{}
Cache provides a mock function with given fields:
func (*MockShard) ReadOptions ¶
func (_m *MockShard) ReadOptions() *gorocksdb.ReadOptions
ReadOptions provides a mock function with given fields:
func (*MockShard) SetCache ¶
func (_m *MockShard) SetCache(_a0 interface{})
SetCache provides a mock function with given fields: _a0
func (*MockShard) Transaction ¶
func (_m *MockShard) Transaction() *gorocksdb.WriteBatch
Transaction provides a mock function with given fields:
func (*MockShard) WriteOptions ¶
func (_m *MockShard) WriteOptions() *gorocksdb.WriteOptions
WriteOptions provides a mock function with given fields:
type OptionsIniter ¶
Optional Consumer interface for customization of Shard database options prior to initial open.
type Runner ¶
type Runner struct { Consumer Consumer // An identifier for this particular runner. Eg, the hostname. LocalRouteKey string // Base local directory into which shards should be staged. LocalDir string // Base path in Etcd via which the consumer should coordinate. ConsumerRoot string // Base journal path for recovery logs of this consumer. RecoveryLogRoot string // Required number of replicas of the consumer. ReplicaCount int Etcd etcd.Client Gazette journal.Client // Optional hooks for notification of Shard lifecycle. These are largely // intended to facilitate testing cases. ShardPostInitHook func(Shard) ShardPostConsumeHook func(topic.Envelope, Shard) ShardPostCommitHook func(Shard) ShardPostStopHook func(Shard) // contains filtered or unexported fields }
func (*Runner) CurrentConsumerState ¶
func (*Runner) FixedItems ¶
consumer.Allocator implementation.
func (*Runner) InspectChan ¶
func (*Runner) InstanceKey ¶
func (*Runner) ItemIsReadyForPromotion ¶
type Shard ¶
type Shard interface { // The concrete ID of this Shard. ID() ShardID // The consumed Partition of this Shard. Partition() topic.Partition // A consumer may wish to maintain in-memory state for // performance reasons. Examples could include: // * Objects we’re reducing over, for which we wish to avoid // excessive database writes. // * An LRU of "hot" objects we expect to reference again soon. // However, to guarantee required transactionality properties, // consumers must be careful not to mix states between shards. // |Cache| is available to consumers for shard-level isolation // of a consumer-specific local memory context. Cache() interface{} SetCache(interface{}) // Returns the database of the Shard. Database() *rocks.DB // Current Transaction of the consumer shard. All writes issued through // Transaction will commit atomically and be check-pointed with consumed // Journal offsets. This provides exactly-once processing of Journal content // (though note that Gazette is itself an at-least once system, and Journal // writes themselves could be duplicated). Writes may be done directly to // the database, in which case they will be applied at-least once (for // example, because a Shard is recovered to a state after a write was applied // but before corresponding Journal offsets were written). Transaction() *rocks.WriteBatch // Returns initialized read and write options for the database. ReadOptions() *rocks.ReadOptions WriteOptions() *rocks.WriteOptions }
type ShardHalter ¶
type ShardHalter interface {
HaltShard(Shard)
}
Optional Consumer interface for notification the Shard is no longer being processed by this Consumer. No further Consume or Flush calls will occur, nor will further writes to the recovery log. A common use case is to hint to the Consumer that external resources or connections associated with the Shard should be released.
type ShardID ¶
type ShardID string
ShardID uniquely identifies a specific Shard. A ShardID must be consistent across processes for the entire duration of the Consumer lifetime.
func ShardName_DEPRECATED ¶
Generates a ShardID for a topic.Partition. Shards are named as:
"shard-{path.Base(topic.Name)}-{Base(partition)}". As a special case, if the
partition was created using standard "my-topic/part-123" enumeration, the "part-" prefix of the journal base name is removed resulting in final ShardIDs like "shard-my-topic-123".
This method is deprecated, but maintained for compatibility with existing ShardIDs used in production. TODO(johnny): Move to a globally unique ShardID, which is content-addressed from the (Consumer, Topic, Journal)-tuple names.
type ShardIndex ¶
type ShardIndex struct {
// contains filtered or unexported fields
}
ShardIndex tracks Shard instances by ShardID. It provides for acquisition of a Shard instance by ID, and management of Shard tear-down by maintaining reference counts of currently-acquired Shards. This is useful for building APIs which query against consumer databases.
func (*ShardIndex) AcquireShard ¶
func (i *ShardIndex) AcquireShard(id ShardID) (shard Shard, ok bool)
AcquireShard queries for a live, mastered Shard of |id|. If found, the returned |shard| is locked from tear-down (eg, due to membership change) and must be released via ReleaseShard.
func (*ShardIndex) AddShard ¶
func (i *ShardIndex) AddShard(shard Shard)
TODO(johnny): Remove these. They're deprecated names which are being supported for the moment to avoid unrelated churning in the patch set.
func (*ShardIndex) DeindexShard ¶
func (i *ShardIndex) DeindexShard(shard Shard)
DeindexShard drops |shard| from the index, blocking until all obtained references have been released. shard.ID() must be indexed. DeindexShard is exported to facilitate testing, but clients should generally use RegisterWithRunner and not call DeindexShard directly.
func (*ShardIndex) IndexShard ¶
func (i *ShardIndex) IndexShard(shard Shard)
IndexShard adds |shard| to the index. shard.ID() must not already be indexed. IndexShard is exported to facilitate testing, but clients should generally use RegisterWithRunner and not call IndexShard directly.
func (*ShardIndex) RegisterWithRunner ¶
func (i *ShardIndex) RegisterWithRunner(r *Runner)
RegisterWithRunner registers the ShardIndex to watch the Runner, indexing the live set of mastered Shards.
func (*ShardIndex) ReleaseShard ¶
func (i *ShardIndex) ReleaseShard(shard Shard)
ReleaseShard releases a previously obtained Shard, allowing tear-down to occur if the Shard membership status has changed and all references have been released.
func (*ShardIndex) RemoveShard ¶
func (i *ShardIndex) RemoveShard(shard Shard)
type ShardIniter ¶
Optional Consumer interface for notification of Shard initialization prior to an initial Consume. A common use case is to initialize the shard cache.