Documentation
¶
Overview ¶
Package message provides all aspects of the arrebato server for managing messages. Primarily the production and consumption of messages.
Index ¶
- Variables
- type ACL
- type BoltStore
- func (bs *BoltStore) Counts(ctx context.Context) (map[string]uint64, error)
- func (bs *BoltStore) Create(_ context.Context, m *message.Message) (uint64, error)
- func (bs *BoltStore) Indexes(ctx context.Context) (map[string]uint64, error)
- func (bs *BoltStore) Prune(ctx context.Context, topicName string, before time.Time) (uint64, error)
- func (bs *BoltStore) Read(ctx context.Context, topic string, startIndex uint64, fn ReadFunc) error
- type Creator
- type Executor
- type GRPC
- type Handler
- type MetricExporter
- type MetricSource
- type PublicKeyGetter
- type ReadFunc
- type Reader
- type TopicGetter
- type TopicIndexGetter
- type TopicOwnerGetter
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTopic is the error given when attempting to read/write messages on a topic that does not exist. ErrNoTopic = errors.New("no topic") // ErrNoMessages is the error given when attempting to read messages from a topic that contains none. ErrNoMessages = errors.New("no messages") // ErrNoTopicInfo is the error given when a topic is found without any data regarding its retention // period. A topic should never be in this state and probably needs recreating. ErrNoTopicInfo = errors.New("no topic info") )
Functions ¶
This section is empty.
Types ¶
type ACL ¶
type ACL interface { // Allowed should return true if the client has the given permission on a topic. If no ACL has been set up // within the server then it should always return true. Allowed(ctx context.Context, topic, client string, permission acl.Permission) (bool, error) }
The ACL interface describes types that act as an access-control list to determine what clients are permitted to do on certain topics.
type BoltStore ¶
type BoltStore struct {
// contains filtered or unexported fields
}
The BoltStore type is responsible for persisting/querying message data from a boltdb instance.
func NewBoltStore ¶
NewBoltStore returns a new instance of the BoltStore type that uses the provided bbolt.DB instance for persistence.
type Creator ¶
type Creator interface { // Create should create a new message in the store and return the index of the message in the log. Create(ctx context.Context, m *message.Message) (uint64, error) }
The Creator interface describes types that can create messages within a store.
type Executor ¶
type Executor interface { // Execute should perform actions corresponding to the provided command. The returned error should correspond // to the issue relevant to the command. For example, a command that creates a new Message should return // ErrNoTopic if the topic does not exist. Execute(ctx context.Context, cmd command.Command) error }
The Executor interface describes types that execute commands related to Message data.
type GRPC ¶
type GRPC struct {
// contains filtered or unexported fields
}
The GRPC type is a messagesvc.MessageServiceServer implementation that handles inbound gRPC requests to manage and query Messages.
func NewGRPC ¶
func NewGRPC( nodeName string, executor Executor, reader Reader, consumers TopicIndexGetter, acl ACL, publicKeys PublicKeyGetter, topics TopicGetter, owners TopicOwnerGetter, ) *GRPC
NewGRPC returns a new instance of the GRPC type that will modify Message data via commands sent to the Executor and read messages via the Reader implementation. The index of consumers will be obtained via the TopicIndexGetter implementation, permissions will be checked using the ACL implementation, topic details will be obtained via the TopicGetter implementation and client's public signing keys are obtained via the PublicKeyGetter implementation.
func (*GRPC) Consume ¶
func (svr *GRPC) Consume(request *messagesvc.ConsumeRequest, server messagesvc.MessageService_ConsumeServer) error
Consume messages from a topic. Returns a codes.NotFound code if the topic does not exist. In order to be allowed to consume the desired topic, the client must be given permission by the ACL, and must be calling the server that is assigned the desired topic.
func (*GRPC) Produce ¶
func (svr *GRPC) Produce(ctx context.Context, request *messagesvc.ProduceRequest) (*messagesvc.ProduceResponse, error)
Produce a new message for a topic. Returns a codes.NotFound code if the topic does not exist.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
The Handler type is responsible for handling message commands and modifying the message state appropriately.
func NewHandler ¶
NewHandler returns a new instance of the Handler type that will store messages in the provided Creator implementation.
func (*Handler) Create ¶
func (h *Handler) Create(ctx context.Context, cmd *messagecmd.CreateMessage) error
Create handles the messagecmd.CreateMessage command and creates a new message within the message store.
type MetricExporter ¶
type MetricExporter struct {
// contains filtered or unexported fields
}
The MetricExporter type is used to export metrics regarding the state of messages within the data store.
func NewMetricExporter ¶
func NewMetricExporter(source MetricSource) *MetricExporter
NewMetricExporter returns a new instance of the MetricExporter type that will obtain statistics to export as metrics from the MetricSource implementation.
func (*MetricExporter) Export ¶
func (m *MetricExporter) Export(ctx context.Context) error
Export metrics regarding messages. This method exports the current index of all topics as well as the count of messages within those topics. Since topics can be pruned, the current index does not always reflect the total number of messages that can be consumed.
type MetricSource ¶
type MetricSource interface { Indexes(ctx context.Context) (map[string]uint64, error) Counts(ctx context.Context) (map[string]uint64, error) }
The MetricSource interface describes types that can obtain statistics about the current state of messages that can be exported as metrics.
type PublicKeyGetter ¶
type PublicKeyGetter interface { // Get should return the client's public signing key. If there is no key then it should return signing.ErrNoPublicKey. Get(ctx context.Context, clientID string) ([]byte, error) }
The PublicKeyGetter interface describes types that can obtain a client's public signing key.
type Reader ¶
type Reader interface { // Read should start reading messages within a topic starting from a given index, invoking the ReadFunc for // each message. Read(ctx context.Context, topic string, startIndex uint64, fn ReadFunc) error }
The Reader interface describes types that can read messages from a topic starting from a given index.
type TopicGetter ¶
type TopicGetter interface { // Get should return the named topic. It should return topic.ErrNoTopic if the topic does not exist. Get(ctx context.Context, name string) (*topicpb.Topic, error) }
The TopicGetter interface describes types that can obtain details on a specific topic.
type TopicIndexGetter ¶
type TopicIndexGetter interface { // GetTopicIndex should return the number that describes the current index in the topic that a consumer has // read to. It should return ErrNoTopic if the topic does not exist. GetTopicIndex(ctx context.Context, topic string, consumerID string) (*consumerpb.TopicIndex, error) }
The TopicIndexGetter interface describes types that can retrieve the current index on a topic for a consumer.
type TopicOwnerGetter ¶
type TopicOwnerGetter interface { // GetTopicOwner should return a node that is assigned to a given topic. Or return node.ErrNoNode if an owner // cannot be found for the given topic. GetTopicOwner(ctx context.Context, topicName string) (*nodepb.Node, error) }
The TopicOwnerGetter interface describes types that can query which node owns a topic.