message

package
v0.1.0-alpha4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package message provides all aspects of the arrebato server for managing messages. Primarily the production and consumption of messages.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTopic is the error given when attempting to read/write messages on a topic that does not exist.
	ErrNoTopic = errors.New("no topic")

	// ErrNoMessages is the error given when attempting to read messages from a topic that contains none.
	ErrNoMessages = errors.New("no messages")

	// ErrNoTopicInfo is the error given when a topic is found without any data regarding its retention
	// period. A topic should never be in this state and probably needs recreating.
	ErrNoTopicInfo = errors.New("no topic info")
)

Functions

This section is empty.

Types

type ACL

type ACL interface {
	// Allowed should return true if the client has the given permission on a topic. If no ACL has been set up
	// within the server then it should always return true.
	Allowed(ctx context.Context, topic, client string, permission acl.Permission) (bool, error)
}

The ACL interface describes types that act as an access-control list to determine what clients are permitted to do on certain topics.

type BoltStore

type BoltStore struct {
	// contains filtered or unexported fields
}

The BoltStore type is responsible for persisting/querying message data from a boltdb instance.

func NewBoltStore

func NewBoltStore(db *bbolt.DB) *BoltStore

NewBoltStore returns a new instance of the BoltStore type that uses the provided bbolt.DB instance for persistence.

func (*BoltStore) Counts

func (bs *BoltStore) Counts(ctx context.Context) (map[string]uint64, error)

Counts returns the number of messages available within each topic.

func (*BoltStore) Create

func (bs *BoltStore) Create(_ context.Context, m *message.Message) (uint64, error)

Create a new Message within the store, returning its index.

func (*BoltStore) Indexes

func (bs *BoltStore) Indexes(ctx context.Context) (map[string]uint64, error)

Indexes returns the current message index of every known topic.

func (*BoltStore) Prune

func (bs *BoltStore) Prune(ctx context.Context, topicName string, before time.Time) (uint64, error)

Prune all messages on a topic created before the given timestamp.

func (*BoltStore) Read

func (bs *BoltStore) Read(ctx context.Context, topic string, startIndex uint64, fn ReadFunc) error

Read messages from the desired topic, starting at the desired index. Each message triggers an invocation of the provided ReadFunc. This method blocks until the end of the messages is reached, the provided context is cancelled or the ReadFunc returns an error.

type Creator

type Creator interface {
	// Create should create a new message in the store and return the index of the message in the log.
	Create(ctx context.Context, m *message.Message) (uint64, error)
}

The Creator interface describes types that can create messages within a store.

type Executor

type Executor interface {
	// Execute should perform actions corresponding to the provided command. The returned error should correspond
	// to the issue relevant to the command. For example, a command that creates a new Message should return
	// ErrNoTopic if the topic does not exist.
	Execute(ctx context.Context, cmd command.Command) error
}

The Executor interface describes types that execute commands related to Message data.

type GRPC

type GRPC struct {
	// contains filtered or unexported fields
}

The GRPC type is a messagesvc.MessageServiceServer implementation that handles inbound gRPC requests to manage and query Messages.

func NewGRPC

func NewGRPC(
	nodeName string,
	executor Executor,
	reader Reader,
	consumers TopicIndexGetter,
	acl ACL,
	publicKeys PublicKeyGetter,
	topics TopicGetter,
	owners TopicOwnerGetter,
) *GRPC

NewGRPC returns a new instance of the GRPC type that will modify Message data via commands sent to the Executor and read messages via the Reader implementation. The index of consumers will be obtained via the TopicIndexGetter implementation, permissions will be checked using the ACL implementation, topic details will be obtained via the TopicGetter implementation and client's public signing keys are obtained via the PublicKeyGetter implementation.

func (*GRPC) Consume

Consume messages from a topic. Returns a codes.NotFound code if the topic does not exist. In order to be allowed to consume the desired topic, the client must be given permission by the ACL, and must be calling the server that is assigned the desired topic.

func (*GRPC) Produce

func (svr *GRPC) Produce(ctx context.Context, request *messagesvc.ProduceRequest) (*messagesvc.ProduceResponse, error)

Produce a new message for a topic. Returns a codes.NotFound code if the topic does not exist.

func (*GRPC) Register

func (svr *GRPC) Register(registrar grpc.ServiceRegistrar, health *health.Server)

Register the GRPC service onto the grpc.ServiceRegistrar.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

The Handler type is responsible for handling message commands and modifying the message state appropriately.

func NewHandler

func NewHandler(messages Creator, logger hclog.Logger) *Handler

NewHandler returns a new instance of the Handler type that will store messages in the provided Creator implementation.

func (*Handler) Create

func (h *Handler) Create(ctx context.Context, cmd *messagecmd.CreateMessage) error

Create handles the messagecmd.CreateMessage command and creates a new message within the message store.

type MetricExporter

type MetricExporter struct {
	// contains filtered or unexported fields
}

The MetricExporter type is used to export metrics regarding the state of messages within the data store.

func NewMetricExporter

func NewMetricExporter(source MetricSource) *MetricExporter

NewMetricExporter returns a new instance of the MetricExporter type that will obtain statistics to export as metrics from the MetricSource implementation.

func (*MetricExporter) Export

func (m *MetricExporter) Export(ctx context.Context) error

Export metrics regarding messages. This method exports the current index of all topics as well as the count of messages within those topics. Since topics can be pruned, the current index does not always reflect the total number of messages that can be consumed.

type MetricSource

type MetricSource interface {
	Indexes(ctx context.Context) (map[string]uint64, error)
	Counts(ctx context.Context) (map[string]uint64, error)
}

The MetricSource interface describes types that can obtain statistics about the current state of messages that can be exported as metrics.

type PublicKeyGetter

type PublicKeyGetter interface {
	// Get should return the client's public signing key. If there is no key then it should return signing.ErrNoPublicKey.
	Get(ctx context.Context, clientID string) ([]byte, error)
}

The PublicKeyGetter interface describes types that can obtain a client's public signing key.

type ReadFunc

type ReadFunc func(ctx context.Context, m *message.Message) error

ReadFunc is a function that is invoked whenever a message is read from a topic.

type Reader

type Reader interface {
	// Read should start reading messages within a topic starting from a given index, invoking the ReadFunc for
	// each message.
	Read(ctx context.Context, topic string, startIndex uint64, fn ReadFunc) error
}

The Reader interface describes types that can read messages from a topic starting from a given index.

type TopicGetter

type TopicGetter interface {
	// Get should return the named topic. It should return topic.ErrNoTopic if the topic does not exist.
	Get(ctx context.Context, name string) (*topicpb.Topic, error)
}

The TopicGetter interface describes types that can obtain details on a specific topic.

type TopicIndexGetter

type TopicIndexGetter interface {
	// GetTopicIndex should return the number that describes the current index in the topic that a consumer has
	// read to. It should return ErrNoTopic if the topic does not exist.
	GetTopicIndex(ctx context.Context, topic string, consumerID string) (*consumerpb.TopicIndex, error)
}

The TopicIndexGetter interface describes types that can retrieve the current index on a topic for a consumer.

type TopicOwnerGetter

type TopicOwnerGetter interface {
	// GetTopicOwner should return a node that is assigned to a given topic. Or return node.ErrNoNode if an owner
	// cannot be found for the given topic.
	GetTopicOwner(ctx context.Context, topicName string) (*nodepb.Node, error)
}

The TopicOwnerGetter interface describes types that can query which node owns a topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL