sse

package module
v0.0.0-...-c298a35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2020 License: MPL-2.0 Imports: 12 Imported by: 0

README

SSE - Server Side Events Client/Server Library for Go

Synopsis

SSE is a client/server implementation for Server Side Events for Golang. Right now functionality is basic, but there is support for multiple separate streams with many clients connected to each. The client is able to specify the stream it wants to connect to by the URL parameter stream.

Build status

  • Master: CircleCI  Master

Quick start

To install:

$ go get github.com/r3labs/sse

To Test:

$ make deps
$ make test
Example Server

There are two parts of the server. It is comprised of the message scheduler and a http handler function. The messaging system is started when running:

func main() {
	server := sse.New()
}

To add a stream to this handler:

func main() {
	server := sse.New()
	server.CreateStream("messages")
}

This creates a new stream inside of the scheduler. Seeing as there are no consumers, publishing a message to this channel will do nothing. Clients can connect to this stream once the http handler is started by specifying stream as a url parameter, like so:

http://server/events?stream=messages

In order to start the http server:

func main() {
	server := sse.New()

	// Create a new Mux and set the handler
	mux := http.NewServeMux()
	mux.HandleFunc("/events", server.HTTPHandler)

	http.ListenAndServe(":8080", mux)
}

To publish messages to a stream:

func main() {
	server := sse.New()

	// Publish a payload to the stream
	server.Publish("messages", &sse.Event{
		Data: []byte("ping"),
	})
}

Please note there must be a stream with the name you specify and there must be subscribers to that stream

Example Client

The client exposes a way to connect to an SSE server. The client can also handle multiple events under the same url.

To create a new client:

func main() {
	client := sse.NewClient("http://server/events")
}

To subscribe to an event stream, please use the Subscribe function. This accepts the name of the stream and a handler function:

func main() {
	client := sse.NewClient("http://server/events")

	client.Subscribe("messages", func(msg *sse.Event) {
		// Got some data!
		fmt.Println(msg.Data)
	})
}

Please note that this function will block the current thread. You can run this function in a go routine.

If you wish to have events sent to a channel, you can use SubscribeChan:

func main() {
	events := make(chan *sse.Event)

	client := sse.NewClient("http://server/events")
	client.SubscribeChan("messages", events)
}
HTTP client parameters

To add additional parameters to the http client, such as disabling ssl verification for self signed certs, you can override the http client or update its options:

func main() {
	client := sse.NewClient("http://server/events")
	client.Connection.Transport =  &http.Transport{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
	}
}
URL query parameters

To set custom query parameters on the client or disable the stream parameter altogether:

func main() {
	client := sse.NewClient("http://server/events?search=example")

	client.SubscribeRaw(func(msg *sse.Event) {
		// Got some data!
		fmt.Println(msg.Data)
	})
}

Contributing

Please read through our contributing guidelines. Included are directions for opening issues, coding standards, and notes on development.

Moreover, if your pull request contains patches or features, you must include relevant unit tests.

Versioning

For transparency into our release cycle and in striving to maintain backward compatibility, this project is maintained under the Semantic Versioning guidelines.

Code and documentation copyright since 2015 r3labs.io authors.

Code released under the Mozilla Public License Version 2.0.

Documentation

Index

Constants

View Source
const DefaultBufferSize = 1024

DefaultBufferSize size of the queue that holds the streams messages.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	URL        string
	Connection *http.Client
	Retry      time.Time

	Headers        map[string]string
	EncodingBase64 bool
	EventID        string

	ReconnectStrategy backoff.BackOff
	// contains filtered or unexported fields
}

Client handles an incoming server stream

func NewClient

func NewClient(url string) *Client

NewClient creates a new client

func (*Client) OnDisconnect

func (c *Client) OnDisconnect(fn ConnCallback)

OnDisconnect specifies the function to run when the connection disconnects

func (*Client) Subscribe

func (c *Client) Subscribe(stream string, handler func(msg *Event)) error

Subscribe to a data stream

func (*Client) SubscribeChan

func (c *Client) SubscribeChan(stream string, ch chan *Event) error

SubscribeChan sends all events to the provided channel

func (*Client) SubscribeChanRaw

func (c *Client) SubscribeChanRaw(ch chan *Event) error

SubscribeChanRaw sends all events to the provided channel

func (*Client) SubscribeChanRawWithContext

func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error

SubscribeChanRawWithContext sends all events to the provided channel with context

func (*Client) SubscribeChanWithContext

func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error

SubscribeChanWithContext sends all events to the provided channel with context

func (*Client) SubscribeRaw

func (c *Client) SubscribeRaw(handler func(msg *Event)) error

SubscribeRaw to an sse endpoint

func (*Client) SubscribeRawWithContext

func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error

SubscribeRawWithContext to an sse endpoint with context

func (*Client) SubscribeWithContext

func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error

SubscribeWithContext to a data stream with context

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ch chan *Event)

Unsubscribe unsubscribes a channel

type ConnCallback

type ConnCallback func(c *Client)

ConnCallback defines a function to be called on a particular connection event

type Event

type Event struct {
	ID    []byte
	Data  []byte
	Event []byte
	Retry []byte
	// contains filtered or unexported fields
}

Event holds all of the event source fields

type EventLog

type EventLog []*Event

EventLog holds all of previous events

func (*EventLog) Add

func (e *EventLog) Add(ev *Event)

Add event to eventlog

func (*EventLog) Clear

func (e *EventLog) Clear()

Clear events from eventlog

func (*EventLog) Replay

func (e *EventLog) Replay(s *Subscriber)

Replay events to a subscriber

type EventStreamReader

type EventStreamReader struct {
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func NewEventStreamReader

func NewEventStreamReader(eventStream io.Reader) *EventStreamReader

NewEventStreamReader creates an instance of EventStreamReader.

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() ([]byte, error)

ReadEvent scans the EventStream for events.

type Server

type Server struct {
	// Specifies the size of the message buffer for each stream
	BufferSize int
	// Enables creation of a stream when a client connects
	AutoStream bool
	// Enables automatic replay for each new subscriber that connects
	AutoReplay bool
	// Encodes all data as base64
	EncodeBase64 bool
	// Sets a ttl that prevents old events from being transmitted
	EventTTL time.Duration
	Streams  map[string]*Stream
	Headers  map[string]string
	// contains filtered or unexported fields
}

Server Is our main struct

func New

func New() *Server

New will create a server and setup defaults

func (*Server) Close

func (s *Server) Close()

Close shuts down the server, closes all of the streams and connections

func (*Server) CreateStream

func (s *Server) CreateStream(id string) *Stream

CreateStream will create a new stream and register it

func (*Server) HTTPHandler

func (s *Server) HTTPHandler(w http.ResponseWriter, r *http.Request)

HTTPHandler serves new connections with events for a given stream ...

func (*Server) Publish

func (s *Server) Publish(id string, event *Event)

Publish sends a mesage to every client in a streamID

func (*Server) RemoveStream

func (s *Server) RemoveStream(id string)

RemoveStream will remove a stream

func (*Server) StreamExists

func (s *Server) StreamExists(id string) bool

StreamExists checks whether a stream by a given id exists

type Stream

type Stream struct {
	// Enables replaying of eventlog to newly added subscribers
	AutoReplay bool
	Eventlog   EventLog
	// contains filtered or unexported fields
}

Stream ...

type StreamRegistration

type StreamRegistration struct {
	// contains filtered or unexported fields
}

StreamRegistration ...

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL