Documentation ¶
Overview ¶
Package unifrost is a go module for relaying pubsub messages to the web using SSE(Eventsource). It is based on Twitter's implementation for real-time event-streaming in their new web app.
Supported brokers
Google Cloud Pub/Sub Amazon Simple Queueing Service Azure Service Bus (Pending) RabbitMQ NATS Kafka In-memory (Only for testing)
For examples check https://github.com/unifrost/unifrost/tree/master/examples/
Index ¶
- Constants
- Variables
- type Consumer
- type Option
- type StreamHandler
- func (s *StreamHandler) Close(ctx context.Context) error
- func (s *StreamHandler) CloseConsumer(ctx context.Context, consumerID string) error
- func (s *StreamHandler) GetConsumerByID(consumerID string) (*Consumer, error)
- func (s *StreamHandler) GetConsumerTopics(ctx context.Context, c *Consumer) []string
- func (s *StreamHandler) IsConsumerConnected(c *Consumer) bool
- func (s *StreamHandler) NewConsumer(ctx context.Context) (*Consumer, error)
- func (s *StreamHandler) NewCustomConsumer(ctx context.Context, consumerID string) (*Consumer, error)
- func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *StreamHandler) Subscribe(ctx context.Context, consumerID string, topic string) error
- func (s *StreamHandler) TotalConsumerTopics(ctx context.Context, c *Consumer) int
- func (s *StreamHandler) TotalConsumers(ctx context.Context) int
- func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topic string) error
Constants ¶
const ( // ERROR . ERROR = iota // EVENT . EVENT )
Variables ¶
var ( // ErrConsumerNotFound is returned if the consumer-id is not registered in the StreamHandler. ErrConsumerNotFound = errors.New("stream handler: consumer doesn't exists") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { ID string // contains filtered or unexported fields }
Consumer manages all the topic subscriptions.
type Option ¶
type Option func(*StreamHandler) error
Option is a self-refrential function for configuration parameters
func ConsumerTTL ¶
ConsumerTTL is an option that is used to set the consumer's TTL default TTL is 1 minute
type StreamHandler ¶
type StreamHandler struct {
// contains filtered or unexported fields
}
StreamHandler handles all the consumers and subscriptions. It implements the http.Handler interface for easy embedding with any API server.
func NewStreamHandler ¶
func NewStreamHandler(ctx context.Context, subClient drivers.SubscriberClient, options ...Option) (*StreamHandler, error)
NewStreamHandler returns *unifrost.StreamHandler, handles all the consumers and subscriptions.
Additional configuration options can be added with unifrost.Option functions.
func (*StreamHandler) Close ¶
func (s *StreamHandler) Close(ctx context.Context) error
Close closes the StreamHandler and also closes all the connected consumers.
func (*StreamHandler) CloseConsumer ¶
func (s *StreamHandler) CloseConsumer(ctx context.Context, consumerID string) error
CloseConsumer closes the specified consumer and removes it.
func (*StreamHandler) GetConsumerByID ¶
func (s *StreamHandler) GetConsumerByID(consumerID string) (*Consumer, error)
GetConsumerByID returns a pointer consumer struct.
If the consumer id specified is invalid or doesn't exists an error 'unifrost.ErrConsumerNotFound' is returned
func (*StreamHandler) GetConsumerTopics ¶
func (s *StreamHandler) GetConsumerTopics(ctx context.Context, c *Consumer) []string
GetConsumerTopics returns a slice of all the topics the consumer is subscribed to.
func (*StreamHandler) IsConsumerConnected ¶
func (s *StreamHandler) IsConsumerConnected(c *Consumer) bool
IsConsumerConnected reports whether consumer is connected to the server.
func (*StreamHandler) NewConsumer ¶
func (s *StreamHandler) NewConsumer(ctx context.Context) (*Consumer, error)
NewConsumer creates a new consumer with an autogenerated consumer id.
func (*StreamHandler) NewCustomConsumer ¶
func (s *StreamHandler) NewCustomConsumer(ctx context.Context, consumerID string) (*Consumer, error)
NewCustomConsumer creates a new consumer with the specified consumer id.
func (*StreamHandler) ServeHTTP ¶
func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP is the http handler for eventsource. For connecting query parameter 'id' is required i.e consumer_id.
func (*StreamHandler) Subscribe ¶
Subscribe subscribes the specified consumer to the specified topic. If specified consumer doesn't exists ErrConsumerNotFound error is returned.
func (*StreamHandler) TotalConsumerTopics ¶
func (s *StreamHandler) TotalConsumerTopics(ctx context.Context, c *Consumer) int
TotalConsumerTopics returns the number of topics the consumer is subscribed to.
func (*StreamHandler) TotalConsumers ¶
func (s *StreamHandler) TotalConsumers(ctx context.Context) int
TotalConsumers returns the number of consumer connected to the stream handler.
func (*StreamHandler) Unsubscribe ¶
Unsubscribe method unsubscribes the specified consumer to the specified topic and shutdowns the subscription. If specified consumer doesn't exists ErrConsumerNotFound error is returned.
Directories ¶
Path | Synopsis |
---|---|
Package drivers contains all the drivers required to connect to different brokers, under a single easy to use interface.
|
Package drivers contains all the drivers required to connect to different brokers, under a single easy to use interface. |
gcpdriver
Package gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler
|
Package gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler |
kafkadriver
Package kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler
|
Package kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler |
memdriver
Package memdriver contains In-memory testing driver for unifrost.StreamHandler
|
Package memdriver contains In-memory testing driver for unifrost.StreamHandler |
natsdriver
Package natsdriver contains NATS driver for unifrost.StreamHandler
|
Package natsdriver contains NATS driver for unifrost.StreamHandler |
rabbitdriver
Package rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler
|
Package rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler |
sqsdriver
Package sqsdriver contains Amazon SQS driver for unifrost.StreamHandler
|
Package sqsdriver contains Amazon SQS driver for unifrost.StreamHandler |