Documentation ¶
Overview ¶
Package ensign implements the Ensign single node server.
Index ¶
- Constants
- type PublisherHandler
- func (p *PublisherHandler) Ack(eventID []byte, committed time.Time) error
- func (p PublisherHandler) CloseStream(publisherID string, events, topics uint64) error
- func (p *PublisherHandler) Nack(eventID []byte, code api.Nack_Code, message string) error
- func (p PublisherHandler) Publisher() *api.Publisher
- func (p *PublisherHandler) Reply(msg broker.PublishResult) error
- type Server
- func (s *Server) CreateTopic(ctx context.Context, in *api.Topic) (_ *api.Topic, err error)
- func (s *Server) DeleteTopic(ctx context.Context, in *api.TopicMod) (out *api.TopicStatus, err error)
- func (s *Server) EnSQL(in *api.Query, stream api.Ensign_EnSQLServer) (err error)
- func (s *Server) Explain(ctx context.Context, in *api.Query) (out *api.QueryExplanation, err error)
- func (s *Server) Info(ctx context.Context, in *api.InfoRequest) (out *api.ProjectInfo, err error)
- func (s *Server) ListTopics(ctx context.Context, in *api.PageInfo) (out *api.TopicsPage, err error)
- func (s *Server) Publish(stream api.Ensign_PublishServer) (err error)
- func (s *Server) Rehash(ctx context.Context, topicID ulid.ULID, policy *api.Deduplication) (err error)
- func (s *Server) RetrieveTopic(ctx context.Context, in *api.Topic) (out *api.Topic, err error)
- func (s *Server) Run(sock net.Listener)
- func (s *Server) RunBroker()
- func (s *Server) Serve() (err error)
- func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out *api.TopicStatus, err error)
- func (s *Server) Shutdown() (err error)
- func (s *Server) Status(ctx context.Context, in *api.HealthCheck) (out *api.ServiceState, err error)
- func (s *Server) StoreMock() *mock.Store
- func (s *Server) StreamInterceptors() []grpc.StreamServerInterceptor
- func (s *Server) Subscribe(stream api.Ensign_SubscribeServer) (err error)
- func (s *Server) TopicExists(ctx context.Context, in *api.TopicName) (out *api.TopicExistsInfo, err error)
- func (s *Server) TopicFilter(topicID ulid.ULID) (_ *bloom.BloomFilter, err error)
- func (s *Server) TopicNames(ctx context.Context, in *api.PageInfo) (out *api.TopicNamesPage, err error)
- func (s *Server) UnaryInterceptors() []grpc.UnaryServerInterceptor
- func (s *Server) WaitForQuarterdeck() (err error)
- type StreamHandler
- type StreamType
- type SubscriberHandler
Constants ¶
const EventMaxDataSize int = 5.243e6
Cannot publish events > 5MiB long
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PublisherHandler ¶ added in v0.7.0
type PublisherHandler struct { StreamHandler // contains filtered or unexported fields }
func NewPublisherHandler ¶ added in v0.9.0
func NewPublisherHandler(stream api.Ensign_PublishServer, meta store.MetaStore) *PublisherHandler
func (*PublisherHandler) Ack ¶ added in v0.7.0
func (p *PublisherHandler) Ack(eventID []byte, committed time.Time) error
Sends an ack back to the client, logging any send errors that occur.
func (PublisherHandler) CloseStream ¶ added in v0.9.0
func (p PublisherHandler) CloseStream(publisherID string, events, topics uint64) error
Sends close stream message and logs stream closed along with any send errors.
func (*PublisherHandler) Nack ¶ added in v0.7.0
Sends a nack back to the client, logging any send errors that occur.
func (PublisherHandler) Publisher ¶ added in v0.9.0
func (p PublisherHandler) Publisher() *api.Publisher
Publisher gathers publisher info from the claims and from the request. NOTE: authorize must be called before this method can be called.
func (*PublisherHandler) Reply ¶ added in v0.9.0
func (p *PublisherHandler) Reply(msg broker.PublishResult) error
Handles the publisher reply from the broker
type Server ¶
type Server struct { health.ProbeServer api.UnimplementedEnsignServer // contains filtered or unexported fields }
An Ensign server implements the Ensign service as defined by the wire protocol.
func New ¶
New creates a new ensign server with the given configuration. Most server setup is conducted in this method including setting up logging, connecting to databases, etc. If this method succeeds without an error, the server is ready to be served, but it will not listen to or handle requests until the Serve() method is called.
func (*Server) CreateTopic ¶ added in v0.5.0
CreateTopic is a user-facing request to create a Topic. Ensign first verifies that the topic is eligible to be created, then stores the topic in a pending state to disk and returns success to the user. Afterwards, Ensign sends a notification to the placement service in order to figure out where the topic should be assigned so that it can start receiving events.
Permissions: topics:create
func (*Server) DeleteTopic ¶ added in v0.5.0
func (s *Server) DeleteTopic(ctx context.Context, in *api.TopicMod) (out *api.TopicStatus, err error)
DeleteTopic is a user-facing request to modify a topic and either archive it, which will make it read-only permanently or to destroy it, which will also have the effect of removing all of the data in the topic. This method is a stateful method, e.g. the topic will be updated to the current status then the Ensign placement server will take action from there.
Permissions: topics:edit (archive) or topics:destroy (destroy)
func (*Server) EnSQL ¶ added in v0.9.0
EnSQL parses an incoming query and executes the query request, sending all results onto the query stream. The EnSQL query is guaranteed to terminate, e.g. it is not a long running query that waits for subscription events to come from publishers. Once the query has been completed the stream will close. Errors are returned for standard SQL operations errors - for example if the query cannot be parsed or no results would be returned from the query.
Permissions: subscriber
func (*Server) Explain ¶ added in v0.9.0
Explain parses the input query and returns an explanation consisting of the query plan and approximate number of results any any possible errors.
Permissions: subscriber TODO: implement explanation
func (*Server) Info ¶ added in v0.7.0
func (s *Server) Info(ctx context.Context, in *api.InfoRequest) (out *api.ProjectInfo, err error)
The Info RPC returns a summary of the current state of the project retrieved from the claims of the request. This RPC requires the ReadTopics permission in order to return any information. The status request can be filtered by a list of topics to specify exactly what statistics are returned.
Permissions: topics:read and metrics:read
func (*Server) ListTopics ¶ added in v0.5.0
ListTopics associated with the project ID in the claims of the request. This unary request is paginated to prevent a huge amount of data transfer.
Permissions: topics:read
func (*Server) Publish ¶
func (s *Server) Publish(stream api.Ensign_PublishServer) (err error)
Publish implements a streaming endpoint that allows users to publish events into a topic or topics that are managed by the current broker.
The Publish stream has two phases and operates three go routines. The first phase is the initialization phase where the stream is opened and authorized, then the topics are loaded from the database. The handler waits for an OpenStream message to be received from the client before proceeding, and sends either an error or a StreamReady message if successful. Clients should ensure that they send an OpenStream message and then recv StreamReady before publishing any events to a topic.
The second phase initializes two go routines: the primary go routine receives events from the client and extracts them. It then sends them via a channel to a second go routine that handles pre-broker processing and any acks/nacks received from the broker. The handler go routine waits for these routines to complete before returning any error from the recv routine.
Permissions: publisher
func (*Server) Rehash ¶ added in v0.12.0
func (s *Server) Rehash(ctx context.Context, topicID ulid.ULID, policy *api.Deduplication) (err error)
Rehash clears the old event hashes and recomputes the hashes with the new policy. TODO: this method operates on a snapshot of the database and is not concurrency safe. TODO: rehash does not take offset policies into account.
func (*Server) RetrieveTopic ¶ added in v0.5.1
RetrieveTopic is a user-face request to fetch a single Topic and is typically used for existence checks; e.g. does this topic exist or not. The user only has to specify a TopicID in the request and then a complete topic is returned. If the topic is not found a status error with codes.NotFound is returned.
Permissions: topics:read
func (*Server) Run ¶
Run the gRPC server on the specified socket. This method can be used to serve TCP requests or to connect to a bufconn for testing purposes. This method blocks while the server is running so it should be run in a go routine.
func (*Server) RunBroker ¶ added in v0.9.0
func (s *Server) RunBroker()
RunBroker runs the internal broker for testing purposes.
func (*Server) SetTopicPolicy ¶ added in v0.11.0
func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out *api.TopicStatus, err error)
SetTopicPolicy allows the user to specify topic management policies such as deduplication or sharding. If the topic is already in the policies specified, then READY is returned. Otherwise a job is queued to modify the topic policy and PENDING is returned. This is a patch endpoint, so if any policy is set to UNKNOWN, then it is ignored; only named policies initiate changes on the topic.
Permissions: topics:edit
func (*Server) Shutdown ¶
Shutdown stops the ensign server and all long running processes gracefully. May return a multierror if there were multiple problems during shutdown but it will attempt to close all open services and processes.
func (*Server) Status ¶
func (s *Server) Status(ctx context.Context, in *api.HealthCheck) (out *api.ServiceState, err error)
Status implements a simple heartbeat mechanism for checking on the state of the Ensign server and making sure that the node is up and responding.
Permissions: None
func (*Server) StoreMock ¶ added in v0.5.0
StoreMock returns the underlying store for testing purposes. Can only be accessed in storage testing mode otherwise the method panics.
func (*Server) StreamInterceptors ¶
func (s *Server) StreamInterceptors() []grpc.StreamServerInterceptor
Prepares the interceptors (middleware) for the unary RPC endpoints of the server. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call. All stream interceptors returned by this method should be chained using grpc.ChainStreamInterceptor().
func (*Server) Subscribe ¶
func (s *Server) Subscribe(stream api.Ensign_SubscribeServer) (err error)
Subscribe implements the streaming endpoint for the API
Permissions: subscriber
func (*Server) TopicExists ¶ added in v0.5.1
func (s *Server) TopicExists(ctx context.Context, in *api.TopicName) (out *api.TopicExistsInfo, err error)
TopicExists does a quick check to see if the topic ID or name exists in the project and returns a simple yes or no bool with the original query. If both ID and name are specified then this method checks if a topic with the specified name has the specified ID (e.g. it is a more strict existence check). The claims must have any of the topics:read, publisher, or subscriber permissions in order to access this endpoint.
Permissions: topics:read OR publisher OR subscriber
func (*Server) TopicFilter ¶ added in v0.12.0
TopicFilter loads a bloom filter with all of the event hashes for the events in the specified topic. The TopicInfo for the event is read to determine how many events are in the topic. The bloom filter is constructed as the larger of either 10k events or twice the number of events in the topic and with a false positive rate of 1%. The filter can be tested and modified as needed to detect duplicates.
func (*Server) TopicNames ¶ added in v0.5.1
func (s *Server) TopicNames(ctx context.Context, in *api.PageInfo) (out *api.TopicNamesPage, err error)
TopicNames returns a paginated response that maps topic names to IDs for all topics in the project. The claims must have any of the topics:read, publisher, or subscriber permissions in order to access this endpoint.
Permissions: topics:read OR publisher OR subscriber
func (*Server) UnaryInterceptors ¶
func (s *Server) UnaryInterceptors() []grpc.UnaryServerInterceptor
Prepares the interceptors (middleware) for the unary RPC endpoings of the server. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call. All unary interceptors returned by this method should be chained using grpc.ChainUnaryInterceptor().
func (*Server) WaitForQuarterdeck ¶ added in v0.5.0
Creates a quarterdeck client connected to the same host as the Auth KeysURL and waits until Quarterdeck returns a healthy response or the exponential timeout backoff limit is reached before returning.
type StreamHandler ¶ added in v0.7.0
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler provides some common functionality to both the Publisher and Subscriber stream handlers, for example providing authentication and collecting allowed topics.
func NewStreamHandler ¶ added in v0.9.0
func NewStreamHandler(stype StreamType, stream grpc.ServerStream, meta store.MetaStore) *StreamHandler
Creates a new StreamHandler -- primarily used for testing.
func (*StreamHandler) AllowedTopics ¶ added in v0.7.0
func (s *StreamHandler) AllowedTopics() (group *topics.NameGroup, err error)
AllowedTopics returns a set of topic IDs and hashed topic names that are allowed to be accessed by the given claims. This set can be filtered to further restrict the stream based on user input. A specialized data structure is used to make it easy to perform content filtering based on name and ID.
func (*StreamHandler) Authorize ¶ added in v0.7.0
func (s *StreamHandler) Authorize(permission string) (_ *tokens.Claims, err error)
Authorize fetches the claims from the stream context, returning an error if the user is not authorized. The claims are cached on the stream handler and returned without error after the first time they are correctly fetched. Fetching claims requires a permission (e.g. subscribe or publish). If not authorized a status error is returned. Authorize MUST be called before projectID or projectTopics is called.
func (*StreamHandler) ProjectID ¶ added in v0.7.0
func (s *StreamHandler) ProjectID() (ulid.ULID, error)
Returns the ProjectID associated with the claims. Authorize must be called first or this method will error. Returns a status error if no project ID is on the claims. The projectID is cached on the stream handler and will be returned without error.
type StreamType ¶ added in v0.9.0
type StreamType uint8
const ( UnknownStream StreamType = iota PublisherStream SubscriberStream )
func (StreamType) String ¶ added in v0.9.0
func (s StreamType) String() string
type SubscriberHandler ¶ added in v0.7.0
type SubscriberHandler struct { StreamHandler // contains filtered or unexported fields }
func NewSubscribeHandler ¶ added in v0.9.0
func NewSubscribeHandler(stream api.Ensign_SubscribeServer, meta store.MetaStore) *SubscriberHandler
func (SubscriberHandler) Send ¶ added in v0.7.0
func (s SubscriberHandler) Send(event *api.EventWrapper) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Implements a go routine that collects topic info periodically outside of the broker.
|
Implements a go routine that collects topic info periodically outside of the broker. |
Package mock implements an in-memory gRPC mock Ensign server that can be connected to using a bufconn.
|
Package mock implements an in-memory gRPC mock Ensign server that can be connected to using a bufconn. |
Package o11y (a numeronym for "observability") exports server-specific metric collectors to Prometheus for monitoring of the service and Ensign nodes.
|
Package o11y (a numeronym for "observability") exports server-specific metric collectors to Prometheus for monitoring of the service and Ensign nodes. |
Ensign maintains two separate storage locations on disk: the event store which is intended to be an append-only fast disk write for incoming events and a meta store which is used to persist operational metadata such as topic and placement information.
|
Ensign maintains two separate storage locations on disk: the event store which is intended to be an append-only fast disk write for incoming events and a meta store which is used to persist operational metadata such as topic and placement information. |
errors
Package errors implements standard database read/write errors for the store package.
|
Package errors implements standard database read/write errors for the store package. |
Package topics provides some helpers for managing topics in memory.
|
Package topics provides some helpers for managing topics in memory. |