unifrost

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2020 License: Apache-2.0 Imports: 10 Imported by: 0

README

unifrost: A go module that makes it easier to stream pubsub events to the web

GoDoc Go Report Card CII Best Practices

⚠ This project is on early stage, it's not ready for production yet ⚠

Previously named gochan

unifrost is a go module for relaying pubsub messages to the web via SSE(Eventsource). It is based on Twitter's implementation for real-time event-streaming in their new web app.

unifrost is named after bifrost, the rainbow bridge that connects Asgard with Midgard (Earth), that is MCU reference which is able to transport people both ways. But because unifrost sends messages from server to client (only one way), hence unifrost. 😎

It uses the Go CDK as broker neutral pubsub driver that supports multiple pubsub brokers:

  • Google Cloud Pub/Sub
  • Amazon Simple Queueing Service
  • Azure Service Bus (Pending)
  • RabbitMQ
  • NATS
  • Kafka
  • In-Memory (Only for testing)

Installation

unifrost supports Go modules and built against go version 1.13

go get github.com/unifrost/unifrost

Documentation

For documentation check godoc.

Usage

unifrost uses Server-Sent-Events, because of this it doesn't require to run a standalone server, unlike websockets it can be embedded in your api server. unifrost's stream handler has a ServeHTTP method i.e it implements http.Handler interface so that it can be used directly or can be wrapped with middlewares like Authentication easily.

// Golang (psuedo-code)

// Using stream handler directly
streamHandler, err := unifrost.NewStreamHandler(
  ctx,
  &memdriver.Client{},
  unifrost.ConsumerTTL(2*time.Second),
)
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", streamHandler))
// Golang (psuedo-code)

// Using stream handler by wrapping it in auth middleware
streamHandler, err := unifrost.NewStreamHandler(
  ctx,
  &memdriver.Client{},
  unifrost.ConsumerTTL(2*time.Second),
)

mux := http.NewServeMux()
mux.HandleFunc("/events", func (w http.ResponseWriter, r *http.Request) {
    err := Auth(r)
    if err != nil {
      http.Error(w, "unauthorized", http.StatusUnauthorized)
      return
    }

    streamHandler.ServeHTTP(w,r)
})
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", mux))

Message Protocol

Every message sent by the server is encoded in plaintext in JSON, it contains topic and the payload.

Every message will be an array of length 2, first index will be the topic string, second index will be the payload in string type.

When consumer connects to the server, server sends a preflight message that contains the initial server configuration and list of topics the consumer has already been subscribed.

  1. Configuration: it contains the consumer_id and consumer_ttl set by the stream handler config
  2. Subscriptions associated with the specified consumer id.

Example first message:

[
  "/unifrost/info",

  "{\"config\":{\"consumer_id\":\"unique-id\",\"consumer_ttl_millis\":2000},\"subscriptions\":[]}"
]

Example error message:

[
  "/unifrost/error",

  "{\"error\":{\"code\":\"subscription-failure\",\"message\":\"Cannot receive message from subscription, closing subscription\",\"topic\":\"topic3\"}}"
]

All the messages are streamed over single channel, i.e using EventSource JS API new EventSource().onmessage or new EventSource().addEventListener('message', (e) =>{}) methods will listen to them.

All the info events are streamed over message channel i.e using the EventSource JS API, onmessage or addEventListener('message', () => {}) method will listen to them. All the subscription events have event name same as their topic name, so to listen to topic events you need to add an event-listener on the EventSource object.

Example

Client example:

// Typescript (psuedo-code)
const consumerID = 'unique-id';

const sse = new EventSource(`/events?id=${consumerID}`);
// for info events like first-message and errors
sse.addEventListener('message', (e) => {
  const message = JSON.parse(e.data);

  const topic = message[0] as String;
  const payload = message[1] as String;

  // Payload is the exact message from Pub Sub broker, probably JSON.
  // Decode payload
  const data = JSON.parse(payload);
});

New consumer is registered explicitly using the streamHandler.NewConsumer() with an auto generated id. To register a consumer with custom id use streamHandler.NewCustomConsumer(id)

This makes it easy to integrate authentication with unifrost.StreamHandler. One possible auth workflow can be, create a new unifrost consumer after login and return the consumer id to the client to store it in the local storage of the browser. Further using the consumer id to connect to the stream handler.

If you don't care about authentication, you can also generate a new consumer automatically everytime a new consumer connects without the id parameter use the following middleware with the streamer. And handle registering the id to your backend from your client.

// Golang (psuedo-code)

mux.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
    // Auto generate new consumer_id, when new consumer connects.
    q := r.URL.Query()
    if q.Get("id") == "" {
        consumer, _ := streamHandler.NewConsumer(ctx)
        q.Set("id", consumer.ID)
        r.URL.RawQuery = q.Encode()
    }

    streamer.ServeHTTP(w, r)
})

When a consumer gets disconnected it has a time window to connect to the server again with the state unchanged. If consumer ttl is not specified in the streamer config then default ttl is set to one.

Managing subscriptions

unifrost.StreamHandler provides simple API for subscribing and unsubscribing to topics.

func (s *StreamHandler) Subscribe(ctx context.Context, consumerID string, topic string) error


func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topic string) error

These methods can be used to add or remove subscriptions for a consumer.

If you want to give subscription control to the client look at the implementation in the example.

To know more, check out the example

Why Server Sent Events (SSE) ?

Why would you choose SSE over WebSockets?

One reason SSEs have been kept in the shadow is because later APIs like WebSockets provide a richer protocol to perform bi-directional, full-duplex communication. However, in some scenarios data doesn't need to be sent from the client. You simply need updates from some server action. A few examples would be status updates, tweet likes, tweet retweets, tickers, news feeds, or other automated data push mechanisms (e.g. updating a client-side Web SQL Database or IndexedDB object store). If you'll need to send data to a server, Fetch API is always a friend.

SSEs are sent over traditional HTTP. That means they do not require a special protocol or server implementation to get working. WebSockets on the other hand, require full-duplex connections and new Web Socket servers to handle the protocol. In addition, Server-Sent Events have a variety of features that WebSockets lack by design such as automatic reconnection, event IDs, and the ability to send arbitrary events.

Because SSE works on top of HTTP, HTTP protocol improvements can also benefit SSE. For example, the in-development HTTP/3 protocol, built on top of QUIC, could offer additional performance improvements in the presence of packet loss due to lack of head-of-line blocking.

Community:

Join the #unifrost channel on gophers Slack Workspace for questions and discussions.

Future Goals:

  • Standalone server that can be configured by yaml, while also staying modular.
  • Creating a website for documentation & overview, and some examples.

Users

If you are using unifrost in production please let me know by sending an email or file an issue.

Show some love

The best way to show some love towards the project, is to contribute and file issues.

If you love unifrost, you can support by sharing the project on Twitter.

You can also support by sponsoring the project via PayPal.

License

APACHE v2

Documentation

Overview

Package unifrost is a go module for relaying pubsub messages to the web using SSE(Eventsource). It is based on Twitter's implementation for real-time event-streaming in their new web app.

Supported brokers

Google Cloud Pub/Sub
Amazon Simple Queueing Service
Azure Service Bus (Pending)
RabbitMQ
NATS
Kafka
In-memory (Only for testing)

For examples check https://github.com/unifrost/unifrost/tree/master/examples/

Index

Constants

View Source
const (
	// ERROR .
	ERROR = iota
	// EVENT .
	EVENT
)

Variables

View Source
var (
	// ErrConsumerNotFound is returned if the consumer-id is not registered in the StreamHandler.
	ErrConsumerNotFound = errors.New("stream handler: consumer doesn't exists")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	ID string
	// contains filtered or unexported fields
}

Consumer manages all the topic subscriptions.

type Option

type Option func(*StreamHandler) error

Option is a self-refrential function for configuration parameters

func ConsumerTTL

func ConsumerTTL(t time.Duration) Option

ConsumerTTL is an option that is used to set the consumer's TTL default TTL is 1 minute

type StreamHandler

type StreamHandler struct {
	// contains filtered or unexported fields
}

StreamHandler handles all the consumers and subscriptions. It implements the http.Handler interface for easy embedding with any API server.

func NewStreamHandler

func NewStreamHandler(ctx context.Context, subClient drivers.SubscriberClient, options ...Option) (*StreamHandler, error)

NewStreamHandler returns *unifrost.StreamHandler, handles all the consumers and subscriptions.

Additional configuration options can be added with unifrost.Option functions.

func (*StreamHandler) Close

func (s *StreamHandler) Close(ctx context.Context) error

Close closes the StreamHandler and also closes all the connected consumers.

func (*StreamHandler) CloseConsumer

func (s *StreamHandler) CloseConsumer(ctx context.Context, consumerID string) error

CloseConsumer closes the specified consumer and removes it.

func (*StreamHandler) GetConsumerByID

func (s *StreamHandler) GetConsumerByID(consumerID string) (*Consumer, error)

GetConsumerByID returns a pointer consumer struct.

If the consumer id specified is invalid or doesn't exists an error 'unifrost.ErrConsumerNotFound' is returned

func (*StreamHandler) GetConsumerTopics

func (s *StreamHandler) GetConsumerTopics(ctx context.Context, c *Consumer) []string

GetConsumerTopics returns a slice of all the topics the consumer is subscribed to.

func (*StreamHandler) IsConsumerConnected

func (s *StreamHandler) IsConsumerConnected(c *Consumer) bool

IsConsumerConnected reports whether consumer is connected to the server.

func (*StreamHandler) NewConsumer

func (s *StreamHandler) NewConsumer(ctx context.Context) (*Consumer, error)

NewConsumer creates a new consumer with an autogenerated consumer id.

func (*StreamHandler) NewCustomConsumer

func (s *StreamHandler) NewCustomConsumer(ctx context.Context, consumerID string) (*Consumer, error)

NewCustomConsumer creates a new consumer with the specified consumer id.

func (*StreamHandler) ServeHTTP

func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP is the http handler for eventsource. For connecting query parameter 'id' is required i.e consumer_id.

func (*StreamHandler) Subscribe

func (s *StreamHandler) Subscribe(ctx context.Context, consumerID string, topic string) error

Subscribe subscribes the specified consumer to the specified topic. If specified consumer doesn't exists ErrConsumerNotFound error is returned.

func (*StreamHandler) TotalConsumerTopics

func (s *StreamHandler) TotalConsumerTopics(ctx context.Context, c *Consumer) int

TotalConsumerTopics returns the number of topics the consumer is subscribed to.

func (*StreamHandler) TotalConsumers

func (s *StreamHandler) TotalConsumers(ctx context.Context) int

TotalConsumers returns the number of consumer connected to the stream handler.

func (*StreamHandler) Unsubscribe

func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topic string) error

Unsubscribe method unsubscribes the specified consumer to the specified topic and shutdowns the subscription. If specified consumer doesn't exists ErrConsumerNotFound error is returned.

Directories

Path Synopsis
Package drivers contains all the drivers required to connect to different brokers, under a single easy to use interface.
Package drivers contains all the drivers required to connect to different brokers, under a single easy to use interface.
gcpdriver
Package gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler
Package gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler
kafkadriver
Package kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler
Package kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler
memdriver
Package memdriver contains In-memory testing driver for unifrost.StreamHandler
Package memdriver contains In-memory testing driver for unifrost.StreamHandler
natsdriver
Package natsdriver contains NATS driver for unifrost.StreamHandler
Package natsdriver contains NATS driver for unifrost.StreamHandler
rabbitdriver
Package rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler
Package rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler
sqsdriver
Package sqsdriver contains Amazon SQS driver for unifrost.StreamHandler
Package sqsdriver contains Amazon SQS driver for unifrost.StreamHandler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL