pubsub

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package pubsub implements an event dispatching server with a single publisher and multiple subscriber clients. Multiple goroutines can safely publish to a single Server instance.

Clients register subscriptions with a query to select which messages they wish to receive. When messages are published, they are broadcast to all clients whose subscription query matches that message. Queries are constructed using the github.com/tendermint/tendermint/internal/pubsub/query package.

Example:

q, err := query.New(`account.name='John'`)
if err != nil {
    return err
}
sub, err := pubsub.SubscribeWithArgs(ctx, pubsub.SubscribeArgs{
    ClientID: "johns-transactions",
    Query:    q,
})
if err != nil {
    return err
}

for {
    next, err := sub.Next(ctx)
    if err == pubsub.ErrTerminated {
       return err // terminated by publisher
    } else if err != nil {
       return err // timed out, client unsubscribed, etc.
    }
    process(next)
}

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSubscriptionNotFound is returned when a client tries to unsubscribe
	// from not existing subscription.
	ErrSubscriptionNotFound = errors.New("subscription not found")

	// ErrAlreadySubscribed is returned when a client tries to subscribe twice or
	// more using the same query.
	ErrAlreadySubscribed = errors.New("already subscribed")

	// ErrServerStopped is returned when attempting to publish or subscribe to a
	// server that has been stopped.
	ErrServerStopped = errors.New("pubsub server is stopped")
)
View Source
var (
	// ErrUnsubscribed is returned by Next when the client has unsubscribed.
	ErrUnsubscribed = errors.New("subscription removed by client")

	// ErrTerminated is returned by Next when the subscription was terminated by
	// the publisher.
	ErrTerminated = errors.New("subscription terminated by publisher")
)

Functions

This section is empty.

Types

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message glues data and events together.

func (Message) Data

func (msg Message) Data() types.EventData

Data returns an original data published.

func (Message) Events

func (msg Message) Events() []abci.Event

Events returns events, which matched the client's query.

func (Message) SubscriptionID

func (msg Message) SubscriptionID() string

SubscriptionID returns the unique identifier for the subscription that produced this message.

type Option

type Option func(*Server)

Option sets a parameter for the server.

func BufferCapacity

func BufferCapacity(cap int) Option

BufferCapacity allows you to specify capacity for publisher's queue. This is the number of messages that can be published without blocking. If no buffer is specified, publishing is synchronous with delivery. This function will panic if cap < 0.

type Server

type Server struct {
	service.BaseService
	// contains filtered or unexported fields
}

Server allows clients to subscribe/unsubscribe for messages, publishing messages with or without events, and manages internal state.

func NewServer

func NewServer(logger log.Logger, options ...Option) *Server

NewServer returns a new server. See the commentary on the Option functions for a detailed description of how to configure buffering. If no options are provided, the resulting server's queue is unbuffered.

func (*Server) BufferCapacity

func (s *Server) BufferCapacity() int

BufferCapacity returns capacity of the publication queue.

func (*Server) NumClientSubscriptions

func (s *Server) NumClientSubscriptions(clientID string) int

NumClientSubscriptions returns the number of subscriptions the client has.

func (*Server) NumClients

func (s *Server) NumClients() int

NumClients returns the number of clients.

func (*Server) Observe

func (s *Server) Observe(_ctx context.Context, observe func(Message) error, queries ...*query.Query) error

Observe registers an observer function that will be called synchronously with each published message matching any of the given queries, prior to it being forwarded to any subscriber. If no queries are specified, all messages will be observed. An error is reported if an observer is already registered.

func (*Server) OnStart

func (s *Server) OnStart(ctx context.Context) error

OnStart implements Service.OnStart by starting the server.

func (*Server) OnStop

func (s *Server) OnStop()

OnStop implements part of the Service interface. It is a no-op.

func (*Server) Publish

func (s *Server) Publish(msg types.EventData) error

Publish publishes the given message. An error will be returned to the caller if the pubsub server has shut down.

func (*Server) PublishWithEvents

func (s *Server) PublishWithEvents(msg types.EventData, events []abci.Event) error

PublishWithEvents publishes the given message with the set of events. The set is matched with clients queries. If there is a match, the message is sent to the client.

func (*Server) SubscribeWithArgs

func (s *Server) SubscribeWithArgs(_ctx context.Context, args SubscribeArgs) (*Subscription, error)

SubscribeWithArgs creates a subscription for the given arguments. It is an error if the query is nil, a subscription already exists for the specified client ID and query, or if the capacity arguments are invalid.

func (*Server) Unsubscribe

func (s *Server) Unsubscribe(_ctx context.Context, args UnsubscribeArgs) error

Unsubscribe removes the subscription for the given client and/or query. It returns ErrSubscriptionNotFound if no such subscription exists.

func (*Server) UnsubscribeAll

func (s *Server) UnsubscribeAll(_ctx context.Context, clientID string) error

UnsubscribeAll removes all subscriptions for the given client ID. It returns ErrSubscriptionNotFound if no subscriptions exist for that client.

func (*Server) Wait

func (s *Server) Wait()

Wait implements Service.Wait by blocking until the server has exited, then yielding to the base service wait.

type SubscribeArgs

type SubscribeArgs struct {
	ClientID string       // Client ID
	Query    *query.Query // filter query for events (required)
	Limit    int          // subscription queue capacity limit (0 means 1)
	Quota    int          // subscription queue soft quota (0 uses Limit)
}

SubscribeArgs are the parameters to create a new subscription.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

A Subscription represents a client subscription for a particular query.

func (*Subscription) ID

func (s *Subscription) ID() string

ID returns the unique subscription identifier for s.

func (*Subscription) Next

func (s *Subscription) Next(ctx context.Context) (Message, error)

Next blocks until a message is available, ctx ends, or the subscription ends. Next returns ErrUnsubscribed if s was unsubscribed, ErrTerminated if s was terminated by the publisher, or a context error if ctx ended without a message being available.

type UnsubscribeArgs

type UnsubscribeArgs struct {
	Subscriber string       // subscriber ID chosen by the client (required)
	ID         string       // subscription ID (assigned by the server)
	Query      *query.Query // the query registered with the subscription
}

UnsubscribeArgs are the parameters to remove a subscription. The subscriber ID must be populated, and at least one of the client ID or the registered query.

func (UnsubscribeArgs) Validate

func (args UnsubscribeArgs) Validate() error

Validate returns nil if args are valid to identify a subscription to remove. Otherwise, it reports an error.

Directories

Path Synopsis
Package query implements the custom query format used to filter event subscriptions in Tendermint.
Package query implements the custom query format used to filter event subscriptions in Tendermint.
syntax
Package syntax defines a scanner and parser for the Tendermint event filter query language.
Package syntax defines a scanner and parser for the Tendermint event filter query language.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL