sse

package
v0.0.0-...-5783bb9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2024 License: BSD-3-Clause Imports: 12 Imported by: 0

README

Server-Send Events

Traditionally, a web page has to send a request to the server to receive new data; that is, the page requests data from the server. With server-sent events, it's possible for a server to send new data to a web page at any time, by pushing messages to the web page.

More information: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

Server

On the server side, you can use the Handler function to automatically create and handle SSE subscriptions based on incoming client HTTP requests.

// Handler
router := lib.NewServeMux()
router.HandleFunc("/sse", Handler(yourStreamSetupFunction))

// Server options
opts := []http.Option{
  // SSE requires no timeout on "keep-alive" connections
  http.WithIdleTimeout(0),
  http.WithHandler(router),
  http.WithPort(8080),
}

// Start server
srv, _ := http.NewServer(opts...)
go func() {
  _ = srv.Start()
}()

Client

A client instance can be used to subscribe to a SSE stream on the server.

// Create client instance
cl, _ := NewClient(nil)

// Prepare a request and submit it to the server to obtain
// a subscription instance in return.
req, _ := PrepareRequest(context.Background(), "http://localhost:8080/sse", nil)
sub, err := cl.Subscribe(req)

// Handle incoming events
for ev := range sub.Receive() {
  fmt.Printf("server event: %+v", ev)
}

Documentation

Overview

Package sse provides a "Server-Send Events" server/client implementation.

Traditionally, a web page has to send a request to the server to receive new data; that is, the page requests data from the server. With server-sent events, it's possible for a server to send new data to a web page at any time, by pushing messages to the web page.

More information: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Handler

func Handler(setup func(req *http.Request) *Subscription) http.HandlerFunc

Handler provides a basic "Server-Send Events" handler implementation. The provided `setup` function allows to flexibly create streams and/or subscriptions based on the HTTP request contents, e.g., credentials, headers, query, etc.

  • SSE requires no timeouts on "keep-alive" connections on the server side.
  • If the subscription is closed by the server, the client connection will be closed as well.
  • If the client connection drops, the subscription will be closed on the server as well.

func PrepareRequest

func PrepareRequest(ctx context.Context, url string, headers map[string]string) (*http.Request, error)

PrepareRequest returns an HTTP request configured to receive an incoming stream of SSE events from the provided `url` endpoint.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client instances can be used to receive events published by a server via subscriptions. A single client can be used to open any number of subscriptions.

func NewClient

func NewClient(hc *http.Client) (*Client, error)

NewClient returns a ready-to-use new client instance. If `hc` is nil `http.DefaultClient` will be used by default.

func (*Client) Subscribe

func (cl *Client) Subscribe(req *http.Request) (*Subscription, error)

Subscribe opens a new subscription instance for the provided HTTP request. The subscription can be closed by the client using the `context` in the provided HTTP request.

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event instances are the minimal communication unit between the server/publisher and any clients/subscribers. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields

func (Event) Data

func (e Event) Data() interface{}

Data returns the event's payload.

func (Event) Decode

func (e Event) Decode(target interface{}) error

Decode the event data into the provided `target` element. If `target` is `nil` or not a pointer this method returns `json.InvalidUnmarshalError`.

func (Event) Encode

func (e Event) Encode() ([]byte, error)

Encode the event in the proper HTTP transmission format.

func (Event) ID

func (e Event) ID() int

ID return the event's unique identifier.

func (Event) Name

func (e Event) Name() string

Name returns the event's type identifier, if any. Events with no type value are considered messages by the spec.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream operators provide a simple pub/sub one-directional mechanism that allows a sender (i.e., server) to broadcast events to one or more subscribers (i.e., clients).

func NewStream

func NewStream(name string, opts ...StreamOption) (*Stream, error)

NewStream returns a new stream operator with the provided name. A stream operator can be used on the server (i.e., sender) side to broadcast events and messages to connected subscribers.

func (*Stream) Close

func (st *Stream) Close()

Close the stream and free any related resources. Once closed, all send operations on the stream instance are no-ops.

func (*Stream) SendEvent

func (st *Stream) SendEvent(name string, payload interface{})

SendEvent broadcast an event with the provided `name` and `payload` to the stream clients.

func (*Stream) SendMessage

func (st *Stream) SendMessage(payload interface{})

SendMessage broadcast a message with the provided `payload` to the stream clients.

func (*Stream) Subscribe

func (st *Stream) Subscribe(ctx context.Context, id string) *Subscription

Subscribe will register a new client/receiver for the stream. The provided `id` value MUST be unique. If a subscriber already exists with the `id`, a reference to it will be returned.

func (*Stream) Unsubscribe

func (st *Stream) Unsubscribe(id string) bool

Unsubscribe will terminate and remove an existing client/receiver. If no client exists for `id` this method returns `false`.

type StreamOption

type StreamOption func(st *Stream) error

StreamOption provide a functional-style mechanism to adjust the behavior of a stream operator instance.

func WithLogger

func WithLogger(logger xlog.Logger) StreamOption

WithLogger set the log handler for the stream operator. Logs are discarded by default.

func WithMessageRetry

func WithMessageRetry(retry uint) StreamOption

WithMessageRetry adjust the `retry` message value, in milliseconds, set by the stream for all send messages and events. Default value is `2000`.

func WithSendTimeout

func WithSendTimeout(timeout time.Duration) StreamOption

WithSendTimeout adjust the maximum time to wait for message delivery on send operations. Default value is 2 seconds.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription instances can be used to receive events published by the originating stream operator.

func (*Subscription) Done

func (sb *Subscription) Done() <-chan struct{}

Done returns a channel that's closed when the subscription is being terminated. No further activity should be expected on `Receive`.

func (*Subscription) ID

func (sb *Subscription) ID() string

ID returns the subscriber's unique identifier.

func (*Subscription) Receive

func (sb *Subscription) Receive() <-chan Event

Receive any events published by the stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL