consumer

package
v0.1.0-alpha1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package consumer provides all aspects of the arrebato server for managing consumers. Primarily the storage of consumer indexes. Consumer indexes indicate the location within a topic for a given consumer identifier.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoTopic = errors.New("no topic")

ErrNoTopic is the error given when attempting to perform an action against a topic that does not exist.

Functions

This section is empty.

Types

type BoltStore

type BoltStore struct {
	// contains filtered or unexported fields
}

The BoltStore type is responsible for maintaining the state of consumers within a boltdb database.

func NewBoltStore

func NewBoltStore(db *bbolt.DB) *BoltStore

NewBoltStore returns a new instance of the BoltStore type that will persist and query consumer state from a boltdb instance.

func (*BoltStore) GetTopicIndex

func (bs *BoltStore) GetTopicIndex(_ context.Context, topic, consumerID string) (*consumer.TopicIndex, error)

GetTopicIndex returns the current index in a topic for a consumer identifier. Returns a zero index if the consumer does not exist. Or ErrNoTopic if the topic does not exist.

func (*BoltStore) Indexes

func (bs *BoltStore) Indexes(ctx context.Context) (map[string]map[string]uint64, error)

Indexes returns the current consumer index of every consumer for every topic. The initial map is keyed by topic name, the nested map is keyed by consumer identifier.

func (*BoltStore) Prune

func (bs *BoltStore) Prune(ctx context.Context, topicName string, before time.Time) ([]string, error)

Prune all consumer indexes for a topic where the last timestamp is before the provided time, effectively resetting that consumer's index to zero.

func (*BoltStore) SetTopicIndex

func (bs *BoltStore) SetTopicIndex(_ context.Context, c *consumer.TopicIndex) error

SetTopicIndex sets the current index on a topic for a consumer. Returns ErrNoTopic if the topic does not exist.

type Executor

type Executor interface {
	// Execute should perform actions corresponding to the provided command. The returned error should correspond
	// to the issue relevant to the command.
	Execute(ctx context.Context, cmd command.Command) error
}

The Executor interface describes types that execute commands related to Topic data.

type GRPC

type GRPC struct {
	// contains filtered or unexported fields
}

The GRPC type is a consumersvc.ConsumerServiceServer implementation that handles inbound gRPC requests to manage consumer state.

func NewGRPC

func NewGRPC(executor Executor) *GRPC

NewGRPC returns a new instance of the GRPC type that will modify consumer data via commands sent to the Executor.

func (*GRPC) Register

func (svr *GRPC) Register(registrar grpc.ServiceRegistrar, healthServer *health.Server)

Register the GRPC service onto the grpc.ServiceRegistrar.

func (*GRPC) SetTopicIndex

SetTopicIndex handles an inbound gRPC request to set the current position of a consumer on a topic. Returns a codes.FailedPrecondition error code if this server is not the leader, or a codes.NotFound error code if the presented topic does not exist.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

The Handler type is responsible for handling commands sent to the server regarding consumer state.

func NewHandler

func NewHandler(manager Manager, logger hclog.Logger) *Handler

NewHandler returns a new instance of the Handler type that will handle inbound commands regarding consumers.

func (*Handler) SetTopicIndex

func (h *Handler) SetTopicIndex(ctx context.Context, cmd *consumercmd.SetTopicIndex) error

SetTopicIndex handles a command that modifies the current index for a consumer on a topic.

type Manager

type Manager interface {
	SetTopicIndex(ctx context.Context, c *consumer.TopicIndex) error
}

The Manager interface describes types that can manage consumer state.

type MetricExporter

type MetricExporter struct {
	// contains filtered or unexported fields
}

The MetricExporter type is used to export metrics regarding the state of consumers within the data store.

func NewMetricExporter

func NewMetricExporter(source MetricSource) *MetricExporter

NewMetricExporter returns a new instance of the MetricExporter type that will obtain statistics to export as metrics from the MetricSource implementation.

func (*MetricExporter) Export

func (m *MetricExporter) Export(ctx context.Context) error

Export metrics regarding consumers. This method exports the current index of all consumers on all topics.

type MetricSource

type MetricSource interface {
	Indexes(ctx context.Context) (map[string]map[string]uint64, error)
}

The MetricSource interface describes types that can obtain statistics about the current state of consumers that can be exported as metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL