eventsource

package module
v0.0.0-...-c590279 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: Apache-2.0 Imports: 13 Imported by: 230

README

GoDoc CircleCI

Eventsource

Eventsource implements a Go implementation of client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/

Installation

go get github.com/donovanhide/eventsource

Documentation

License

Eventsource is available under the Apache License, Version 2.0.

Documentation

Overview

Package eventsource implements a client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/

The client and server respect the Last-Event-ID header. If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	*bufio.Reader
}

A Decoder is capable of reading Events from a stream.

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new Decoder instance that reads events with the given io.Reader.

func (*Decoder) Decode

func (dec *Decoder) Decode() (Event, error)

Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occuring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.

func NewEncoder

func NewEncoder(w io.Writer, compressed bool) *Encoder

NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.

func (*Encoder) Encode

func (enc *Encoder) Encode(ev Event) error

Encode writes an event in the format specified by the server-sent events protocol.

type Event

type Event interface {
	// Id is an identifier that can be used to allow a client to replay
	// missed Events by returning the Last-Event-Id header.
	// Return empty string if not required.
	Id() string
	// The name of the event. Return empty string if not required.
	Event() string
	// The payload of the event.
	Data() string
}

Any event received by the client or sent by the server will implement this interface

Example
package main

import (
	"fmt"
	"net"
	"net/http"
	"time"

	"github.com/donovanhide/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
	TICK_COUNT = 5
)

func TimePublisher(srv *eventsource.Server) {
	start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
	ticker := time.NewTicker(time.Second)
	for i := 0; i < TICK_COUNT; i++ {
		<-ticker.C
		srv.Publish([]string{"time"}, TimeEvent(start))
		start = start.Add(time.Second)
	}
}

func main() {
	srv := eventsource.NewServer()
	srv.Gzip = true
	defer srv.Close()
	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		return
	}
	defer l.Close()
	http.HandleFunc("/time", srv.Handler("time"))
	go http.Serve(l, nil)
	go TimePublisher(srv)
	stream, err := eventsource.Subscribe("http://127.0.0.1:8080/time", "")
	if err != nil {
		return
	}
	for i := 0; i < TICK_COUNT; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}

}
Output:

1356998400000000000 Tick 2013-01-01 00:00:00 +0000 UTC
1356998401000000000 Tick 2013-01-01 00:00:01 +0000 UTC
1356998402000000000 Tick 2013-01-01 00:00:02 +0000 UTC
1356998403000000000 Tick 2013-01-01 00:00:03 +0000 UTC
1356998404000000000 Tick 2013-01-01 00:00:04 +0000 UTC

type Repository

type Repository interface {
	// Gets the Events which should follow on from the specified channel and event id.
	Replay(channel, id string) chan Event
}

If history is required, this interface will allow clients to reply previous events through the server. Both methods can be called from different goroutines concurrently, so you must make sure they are go-routine safe.

Example
package main

import (
	"encoding/json"
	"fmt"
	"github.com/donovanhide/eventsource"
	"net"
	"net/http"
)

type NewsArticle struct {
	id             string
	Title, Content string
}

func (a *NewsArticle) Id() string    { return a.id }
func (a *NewsArticle) Event() string { return "News Article" }
func (a *NewsArticle) Data() string  { b, _ := json.Marshal(a); return string(b) }

var articles = []NewsArticle{
	{"2", "Governments struggle to control global price of gas", "Hot air...."},
	{"1", "Tomorrow is another day", "And so is the day after."},
	{"3", "News for news' sake", "Nothing has happened."},
}

func buildRepo(srv *eventsource.Server) {
	repo := eventsource.NewSliceRepository()
	srv.Register("articles", repo)
	for i := range articles {
		repo.Add("articles", &articles[i])
		srv.Publish([]string{"articles"}, &articles[i])
	}
}

func main() {
	srv := eventsource.NewServer()
	defer srv.Close()
	http.HandleFunc("/articles", srv.Handler("articles"))
	l, err := net.Listen("tcp", ":8080")
	if err != nil {
		return
	}
	defer l.Close()
	go http.Serve(l, nil)
	stream, err := eventsource.Subscribe("http://127.0.0.1:8080/articles", "")
	if err != nil {
		return
	}
	go buildRepo(srv)
	// This will receive events in the order that they come
	for i := 0; i < 3; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
	stream, err = eventsource.Subscribe("http://127.0.0.1:8080/articles", "1")
	if err != nil {
		fmt.Println(err)
		return
	}
	// This will replay the events in order of id
	for i := 0; i < 3; i++ {
		ev := <-stream.Events
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
}
Output:

2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}

type Server

type Server struct {
	AllowCORS  bool        // Enable all handlers to be accessible from any origin
	ReplayAll  bool        // Replay repository even if there's no Last-Event-Id specified
	BufferSize int         // How many messages do we let the client get behind before disconnecting
	Gzip       bool        // Enable compression if client can accept it
	Logger     *log.Logger // Logger is a logger that, when set, will be used for logging debug messages
	// contains filtered or unexported fields
}

func NewServer

func NewServer() *Server

Create a new Server ready for handler creation and publishing events

func (*Server) Close

func (srv *Server) Close()

Stop handling publishing

func (*Server) Handler

func (srv *Server) Handler(channel string) http.HandlerFunc

Create a new handler for serving a specified channel

func (*Server) Publish

func (srv *Server) Publish(channels []string, ev Event)

Publish an event with the specified id to one or more channels

func (*Server) Register

func (srv *Server) Register(channel string, repo Repository)

Register the repository to be used for the specified channel

type SliceRepository

type SliceRepository struct {
	// contains filtered or unexported fields
}

Example repository that uses a slice as storage for past events.

func NewSliceRepository

func NewSliceRepository() *SliceRepository

func (*SliceRepository) Add

func (repo *SliceRepository) Add(channel string, event Event)

func (SliceRepository) Replay

func (repo SliceRepository) Replay(channel, id string) (out chan Event)

type Stream

type Stream struct {

	// Events emits the events received by the stream
	Events chan Event
	// Errors emits any errors encountered while reading events from the stream.
	// It's mainly for informative purposes - the client isn't required to take any
	// action when an error is encountered. The stream will always attempt to continue,
	// even if that involves reconnecting to the server.
	Errors chan error
	// Logger is a logger that, when set, will be used for logging debug messages
	Logger *log.Logger
	// contains filtered or unexported fields
}

Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.

func Subscribe

func Subscribe(url, lastEventId string) (*Stream, error)

Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events.

func SubscribeWith

func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error)

SubscribeWith takes a http client and request providing customization over both headers and control over the http client settings (timeouts, tls, etc)

func SubscribeWithRequest

func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error)

SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers to be specified, authentication to be configured, etc.

func (*Stream) Close

func (stream *Stream) Close()

Close will close the stream. It is safe for concurrent access and can be called multiple times.

type SubscriptionError

type SubscriptionError struct {
	Code    int
	Message string
}

func (SubscriptionError) Error

func (e SubscriptionError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL