Documentation ¶
Overview ¶
Package eventsource implements a client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/
The client and server respect the Last-Event-ID header. If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Decoder ¶
A Decoder is capable of reading Events from a stream.
func NewDecoder ¶
NewDecoder returns a new Decoder instance that reads events with the given io.Reader.
func (*Decoder) Decode ¶
Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occuring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).
type Encoder ¶
type Encoder struct {
// contains filtered or unexported fields
}
An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.
func NewEncoder ¶
NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.
type Event ¶
type Event interface { // Id is an identifier that can be used to allow a client to replay // missed Events by returning the Last-Event-Id header. // Return empty string if not required. Id() string // The name of the event. Return empty string if not required. Event() string // The payload of the event. Data() string }
Any event received by the client or sent by the server will implement this interface
Example ¶
package main import ( "fmt" "net" "net/http" "time" "github.com/donovanhide/eventsource" ) type TimeEvent time.Time func (t TimeEvent) Id() string { return fmt.Sprint(time.Time(t).UnixNano()) } func (t TimeEvent) Event() string { return "Tick" } func (t TimeEvent) Data() string { return time.Time(t).String() } const ( TICK_COUNT = 5 ) func TimePublisher(srv *eventsource.Server) { start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC) ticker := time.NewTicker(time.Second) for i := 0; i < TICK_COUNT; i++ { <-ticker.C srv.Publish([]string{"time"}, TimeEvent(start)) start = start.Add(time.Second) } } func main() { srv := eventsource.NewServer() srv.Gzip = true defer srv.Close() l, err := net.Listen("tcp", ":8080") if err != nil { return } defer l.Close() http.HandleFunc("/time", srv.Handler("time")) go http.Serve(l, nil) go TimePublisher(srv) stream, err := eventsource.Subscribe("http://127.0.0.1:8080/time", "") if err != nil { return } for i := 0; i < TICK_COUNT; i++ { ev := <-stream.Events fmt.Println(ev.Id(), ev.Event(), ev.Data()) } }
Output: 1356998400000000000 Tick 2013-01-01 00:00:00 +0000 UTC 1356998401000000000 Tick 2013-01-01 00:00:01 +0000 UTC 1356998402000000000 Tick 2013-01-01 00:00:02 +0000 UTC 1356998403000000000 Tick 2013-01-01 00:00:03 +0000 UTC 1356998404000000000 Tick 2013-01-01 00:00:04 +0000 UTC
type Repository ¶
type Repository interface { // Gets the Events which should follow on from the specified channel and event id. Replay(channel, id string) chan Event }
If history is required, this interface will allow clients to reply previous events through the server. Both methods can be called from different goroutines concurrently, so you must make sure they are go-routine safe.
Example ¶
package main import ( "encoding/json" "fmt" "github.com/donovanhide/eventsource" "net" "net/http" ) type NewsArticle struct { id string Title, Content string } func (a *NewsArticle) Id() string { return a.id } func (a *NewsArticle) Event() string { return "News Article" } func (a *NewsArticle) Data() string { b, _ := json.Marshal(a); return string(b) } var articles = []NewsArticle{ {"2", "Governments struggle to control global price of gas", "Hot air...."}, {"1", "Tomorrow is another day", "And so is the day after."}, {"3", "News for news' sake", "Nothing has happened."}, } func buildRepo(srv *eventsource.Server) { repo := eventsource.NewSliceRepository() srv.Register("articles", repo) for i := range articles { repo.Add("articles", &articles[i]) srv.Publish([]string{"articles"}, &articles[i]) } } func main() { srv := eventsource.NewServer() defer srv.Close() http.HandleFunc("/articles", srv.Handler("articles")) l, err := net.Listen("tcp", ":8080") if err != nil { return } defer l.Close() go http.Serve(l, nil) stream, err := eventsource.Subscribe("http://127.0.0.1:8080/articles", "") if err != nil { return } go buildRepo(srv) // This will receive events in the order that they come for i := 0; i < 3; i++ { ev := <-stream.Events fmt.Println(ev.Id(), ev.Event(), ev.Data()) } stream, err = eventsource.Subscribe("http://127.0.0.1:8080/articles", "1") if err != nil { fmt.Println(err) return } // This will replay the events in order of id for i := 0; i < 3; i++ { ev := <-stream.Events fmt.Println(ev.Id(), ev.Event(), ev.Data()) } }
Output: 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."} 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."} 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."} 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."} 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."} 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
type Server ¶
type Server struct { AllowCORS bool // Enable all handlers to be accessible from any origin ReplayAll bool // Replay repository even if there's no Last-Event-Id specified BufferSize int // How many messages do we let the client get behind before disconnecting Gzip bool // Enable compression if client can accept it Logger *log.Logger // Logger is a logger that, when set, will be used for logging debug messages // contains filtered or unexported fields }
func NewServer ¶
func NewServer() *Server
Create a new Server ready for handler creation and publishing events
func (*Server) Handler ¶
func (srv *Server) Handler(channel string) http.HandlerFunc
Create a new handler for serving a specified channel
func (*Server) Register ¶
func (srv *Server) Register(channel string, repo Repository)
Register the repository to be used for the specified channel
type SliceRepository ¶
type SliceRepository struct {
// contains filtered or unexported fields
}
Example repository that uses a slice as storage for past events.
func NewSliceRepository ¶
func NewSliceRepository() *SliceRepository
func (*SliceRepository) Add ¶
func (repo *SliceRepository) Add(channel string, event Event)
func (SliceRepository) Replay ¶
func (repo SliceRepository) Replay(channel, id string) (out chan Event)
type Stream ¶
type Stream struct { // Events emits the events received by the stream Events chan Event // Errors emits any errors encountered while reading events from the stream. // It's mainly for informative purposes - the client isn't required to take any // action when an error is encountered. The stream will always attempt to continue, // even if that involves reconnecting to the server. Errors chan error // Logger is a logger that, when set, will be used for logging debug messages Logger *log.Logger // contains filtered or unexported fields }
Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.
func Subscribe ¶
Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events.
func SubscribeWith ¶
SubscribeWith takes a http client and request providing customization over both headers and control over the http client settings (timeouts, tls, etc)
func SubscribeWithRequest ¶
SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers to be specified, authentication to be configured, etc.
type SubscriptionError ¶
func (SubscriptionError) Error ¶
func (e SubscriptionError) Error() string