Documentation ¶
Overview ¶
Package sse provides utilities for creating and consuming fully spec-compliant HTML5 server-sent events streams.
The central piece of a server's implementation is the Provider interface. A Provider describes a publish-subscribe system that can be used to implement messaging for the SSE protocol. This package already has an implementation, called Joe, that is the default provider for any server. Abstracting the messaging system implementation away allows servers to use any arbitrary provider under the same interface. The default provider will work for simple use-cases, but where scalability is required, one will look at a more suitable solution. Adapters that satisfy the Provider interface can easily be created, and then plugged into the server instance. Events themselves are represented using the Message type.
On the client-side, we use the Client struct to create connections to event streams. Using an `http.Request` we instantiate a Connection. Then we subscribe to incoming events using callback functions, and then we establish the connection by calling the Connection's Connect method.
Example (MessageWriter) ¶
e := Message{ Type: Type("test"), ID: ID("1"), } w := &strings.Builder{} bw := base64.NewEncoder(base64.StdEncoding, w) binary.Write(bw, binary.BigEndian, []byte{6, 9, 4, 2, 0}) binary.Write(bw, binary.BigEndian, []byte("data from sensor")) bw.Close() w.WriteByte('\n') // Ensures that the data written above will be a distinct `data` field. enc := json.NewEncoder(w) enc.SetIndent("", " ") enc.Encode(map[string]string{"hello": "world"}) // Not necessary to add a newline here – json.Encoder.Encode adds a newline at the end. // io.CopyN(hex.NewEncoder(w), rand.Reader, 8) io.Copy(hex.NewEncoder(w), bytes.NewReader([]byte{5, 1, 6, 34, 234, 12, 143, 91})) mw := io.MultiWriter(os.Stdout, w) // The first newline adds the data written above as a `data field`. io.WriteString(mw, "\nYou'll see me both in console and in event\n\n") // Add the data to the event. It will be split into fields here, // according to the newlines present in the input. e.AppendData(w.String()) e.WriteTo(os.Stdout)
Output: You'll see me both in console and in event id: 1 event: test data: BgkEAgBkYXRhIGZyb20gc2Vuc29y data: { data: "hello": "world" data: } data: 05010622ea0c8f5b data: You'll see me both in console and in event data:
Index ¶
- Constants
- Variables
- func Read(r io.Reader, cfg *ReadConfig) func(func(Event, error) bool)
- type Backoff
- type Client
- type Connection
- func (c *Connection) Buffer(buf []byte, maxSize int)
- func (c *Connection) Connect() error
- func (c *Connection) SubscribeEvent(typ string, cb EventCallback) EventCallbackRemover
- func (c *Connection) SubscribeMessages(cb EventCallback) EventCallbackRemover
- func (c *Connection) SubscribeToAll(cb EventCallback) EventCallbackRemover
- type ConnectionError
- type Event
- type EventCallback
- type EventCallbackRemover
- type EventID
- func (i EventID) IsSet() bool
- func (i *EventID) MarshalJSON() ([]byte, error)
- func (i *EventID) MarshalText() ([]byte, error)
- func (i *EventID) Scan(src interface{}) error
- func (i EventID) String() string
- func (i *EventID) UnmarshalJSON(data []byte) error
- func (i *EventID) UnmarshalText(data []byte) error
- func (i EventID) Value() (driver.Value, error)
- type EventType
- func (i EventType) IsSet() bool
- func (i *EventType) MarshalJSON() ([]byte, error)
- func (i *EventType) MarshalText() ([]byte, error)
- func (i *EventType) Scan(src interface{}) error
- func (i EventType) String() string
- func (i *EventType) UnmarshalJSON(data []byte) error
- func (i *EventType) UnmarshalText(data []byte) error
- func (i EventType) Value() (driver.Value, error)
- type FiniteReplayer
- type Joe
- type LogLevel
- type Logger
- type Message
- func (e *Message) AppendComment(comments ...string)
- func (e *Message) AppendData(chunks ...string)
- func (e *Message) Clone() *Message
- func (e *Message) MarshalText() ([]byte, error)
- func (e *Message) String() string
- func (e *Message) UnmarshalText(p []byte) error
- func (e *Message) WriteTo(w io.Writer) (int64, error)
- type MessageWriter
- type Provider
- type ReadConfig
- type Replayer
- type ResponseValidator
- type ResponseWriter
- type Server
- type Session
- type Subscription
- type UnmarshalError
- type ValidReplayer
Examples ¶
Constants ¶
const DefaultTopic = ""
DefaultTopic is the identifier for the topic that is implied when no topics are specified for a Subscription or a Message.
Variables ¶
var DefaultClient = &Client{ HTTPClient: http.DefaultClient, ResponseValidator: DefaultValidator, Backoff: Backoff{ InitialInterval: time.Millisecond * 500, Multiplier: 1.5, Jitter: 0.5, }, }
DefaultClient is the client that is used when creating a new connection using the NewConnection function. Unset properties on new clients are replaced with the ones set for the default client.
var ErrNoGetBody = errors.New("the GetBody function doesn't exist on the request")
ErrNoGetBody is a sentinel error returned when the connection cannot be reattempted due to GetBody not existing on the original request.
var ErrNoTopic = errors.New("go-sse.server: no topics specified")
ErrNoTopic is a sentinel error returned when a Message is published without any topics. It is not an issue to call Server.Publish without topics, because the Server will add the DefaultTopic; it is an error to call Provider.Publish or Replayer.Put without any topics, though.
var ErrProviderClosed = errors.New("go-sse.server: provider is closed")
ErrProviderClosed is a sentinel error returned by providers when any operation is attempted after the provider is closed.
var ErrUnexpectedEOF = parser.ErrUnexpectedEOF
ErrUnexpectedEOF is returned when unmarshaling a Message from an input that doesn't end in a newline.
If it returned from a Connection, it means that the data from the server has reached EOF in the middle of an incomplete event and retries are disabled (normally the client retries the connection in this situation).
var ErrUpgradeUnsupported = errors.New("go-sse.server: upgrade unsupported")
ErrUpgradeUnsupported is returned when a request can't be upgraded to support server-sent events.
Functions ¶
func Read ¶ added in v0.10.0
Read parses an SSE stream and yields all incoming events, On any encountered errors iteration stops and no further events are parsed – the loop can safely be ended on error. If EOF is reached, the Read operation is considered successful and no error is returned. An Event will never be yielded together with an error.
Read is especially useful for parsing responses from services which communicate using SSE but not over long-lived connections – for example, LLM APIs.
Read handles the Event.LastEventID value just as the browser SSE client (EventSource) would – for every event, the last encountered event ID will be given, even if the ID is not the current event's ID. Read, unlike EventSource, does not set Event.Type to "message" if no "event" field is received, leaving it blank.
Read provides no way to handle the "retry" field and doesn't handle retrying. Use a Client and a Connection if you need to retry requests.
Types ¶
type Backoff ¶ added in v0.8.0
type Backoff struct { // The initial wait time before a reconnection is attempted. // Must be >0. Defaults to 500ms. InitialInterval time.Duration // How much should the reconnection time grow on subsequent attempts. // Must be >=1; 1 = constant interval. Defaults to 1.5. Multiplier float64 // How much does the reconnection time vary relative to the base value. // This is useful to prevent multiple clients to reconnect at the exact // same time, as it makes the wait times distinct. // Must be in range (0, 1); -1 = no randomization. Defaults to 0.5. Jitter float64 // How much can the wait time grow. // If <=0 = the wait time can infinitely grow. Defaults to infinite growth. MaxInterval time.Duration // How much time can retries be attempted. // For example, if this is 5 seconds, after 5 seconds the client // will stop retrying. // If <=0 = no limit. Defaults to no limit. MaxElapsedTime time.Duration // How many retries are allowed. // <0 = no retries, 0 = infinite. Defaults to infinite retries. MaxRetries int }
Backoff configures the reconnection strategy of a Connection.
type Client ¶
type Client struct { // The HTTP client to be used. Defaults to http.DefaultClient. HTTPClient *http.Client // A callback that's executed whenever a reconnection attempt starts. // It receives the error that caused the retry and the reconnection time. OnRetry func(error, time.Duration) // A function to check if the response from the server is valid. // Defaults to a function that checks the response's status code is 200 // and the content type is text/event-stream. // // If the error type returned has a Temporary or a Timeout method, // they will be used to determine whether to reattempt the connection. // Otherwise, the error will be considered permanent and no reconnections // will be attempted. ResponseValidator ResponseValidator // Backoff configures the backoff strategy. See the documentation of // each field for more information. Backoff Backoff }
The Client struct is used to initialize new connections to different servers. It is safe for concurrent use.
After connections are created, the Connect method must be called to start receiving events.
func (*Client) NewConnection ¶
func (c *Client) NewConnection(r *http.Request) *Connection
NewConnection initializes and configures a connection. On connect, the given request is sent and if successful the connection starts receiving messages. Use the request's context to stop the connection.
If the request has a body, it is necessary to provide a GetBody function in order for the connection to be reattempted, in case of an error. Using readers such as bytes.Reader, strings.Reader or bytes.Buffer when creating a request using http.NewRequestWithContext will ensure this function is present on the request.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a connection to an events stream. Created using the Client struct, a Connection processes the incoming events and calls the subscribed event callbacks. If the connection to the server temporarily fails, the connection will be reattempted. Retry values received from servers will be taken into account.
Connections must not be copied after they are created.
func NewConnection ¶
func NewConnection(r *http.Request) *Connection
NewConnection creates a connection using the default client.
func (*Connection) Buffer ¶ added in v0.9.0
func (c *Connection) Buffer(buf []byte, maxSize int)
Buffer sets the underlying buffer to be used when scanning events. Use this if you need to read very large events (bigger than the default of 65K bytes).
Read the documentation of bufio.Scanner.Buffer for more information.
func (*Connection) Connect ¶
func (c *Connection) Connect() error
Connect sends the request the connection was created with to the server and, if successful, it starts receiving events. The caller goroutine is blocked until the request's context is done or an error occurs.
If the request's context is cancelled, Connect returns its error. Otherwise, if the maximum number or retries is made, the last error that occurred is returned. Connect never returns otherwise – either the context is cancelled, or it's done retrying.
All errors returned other than the context errors will be wrapped inside a *ConnectionError.
func (*Connection) SubscribeEvent ¶
func (c *Connection) SubscribeEvent(typ string, cb EventCallback) EventCallbackRemover
SubscribeEvent subscribes the given callback to all the events with the provided type (the `event` field has the value given here). Remove the callback by calling the returned function.
func (*Connection) SubscribeMessages ¶
func (c *Connection) SubscribeMessages(cb EventCallback) EventCallbackRemover
SubscribeMessages subscribes the given callback to all events without type (without or with empty `event` field). Remove the callback by calling the returned function.
func (*Connection) SubscribeToAll ¶
func (c *Connection) SubscribeToAll(cb EventCallback) EventCallbackRemover
SubscribeToAll subscribes the given callback to all events, with or without type. Remove the callback by calling the returned function.
type ConnectionError ¶
type ConnectionError struct { // The request for which the connection failed. Req *http.Request // The reason the operation failed. Err error // The reason why the request failed. Reason string }
ConnectionError is the type that wraps all the connection errors that occur.
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Unwrap ¶
func (e *ConnectionError) Unwrap() error
type Event ¶
type Event struct { // The last non-empty ID of all the events received. This may not be // the ID of the latest event! LastEventID string // The event's type. It is empty if the event is unnamed. Type string // The event's payload. Data string }
The Event struct represents an event sent to the client by a server.
type EventCallback ¶ added in v0.3.0
type EventCallback func(Event)
EventCallback is a function that is used to receive events from a Connection.
type EventCallbackRemover ¶ added in v0.3.0
type EventCallbackRemover func()
EventCallbackRemover is a function that removes an already registered callback from a connection. Calling it multiple times is a no-op.
type EventID ¶
type EventID struct {
// contains filtered or unexported fields
}
EventID is a value of the "id" field. It must have a single line.
func ID ¶ added in v0.5.0
ID creates an event ID and assumes it is valid. If it is not valid, it panics.
func NewID ¶ added in v0.5.0
NewID creates an event ID value. A valid ID must not have any newlines. If the input is not valid, an unset (invalid) ID is returned.
func (EventID) IsSet ¶
func (i EventID) IsSet() bool
IsSet returns true if the receiver is a valid (set) value.
func (*EventID) MarshalJSON ¶
MarshalJSON returns a JSON representation of the underlying value if it is set. It otherwise returns the representation of the JSON null value.
func (*EventID) MarshalText ¶
MarshalText returns a copy of the underlying value if it is set. It returns an error when trying to marshal an unset value.
func (*EventID) Scan ¶
func (i *EventID) Scan(src interface{}) error
Scan implements the sql.Scanner interface. Values can be scanned from:
- nil interfaces (result: unset value)
- byte slice
- string
func (EventID) String ¶
func (i EventID) String() string
String returns the underlying value. The value may be an empty string, make sure to check if the value is set before using it.
func (*EventID) UnmarshalJSON ¶
UnmarshalJSON sets the underlying value to the given JSON value if the value is a string. The previous value is discarded if the operation fails.
func (*EventID) UnmarshalText ¶
UnmarshalText sets the underlying value to the given string, if valid. If the input is invalid, no changes are made to the receiver.
type EventType ¶ added in v0.5.0
type EventType struct {
// contains filtered or unexported fields
}
EventType is a value of the "event" field. It must have a single line.
func NewType ¶ added in v0.5.0
NewType creates a value for the "event" field. It is valid if it does not have any newlines. If the input is not valid, an unset (invalid) ID is returned.
func Type ¶ added in v0.5.0
Type creates an EventType and assumes it is valid. If it is not valid, it panics.
func (EventType) IsSet ¶ added in v0.5.0
func (i EventType) IsSet() bool
IsSet returns true if the receiver is a valid (set) value.
func (*EventType) MarshalJSON ¶ added in v0.5.0
MarshalJSON returns a JSON representation of the underlying value if it is set. It otherwise returns the representation of the JSON null value.
func (*EventType) MarshalText ¶ added in v0.5.0
MarshalText returns a copy of the underlying value if it is set. It returns an error when trying to marshal an unset value.
func (*EventType) Scan ¶ added in v0.5.0
func (i *EventType) Scan(src interface{}) error
Scan implements the sql.Scanner interface. Values can be scanned from:
- nil interfaces (result: unset value)
- byte slice
- string
func (EventType) String ¶ added in v0.5.0
func (i EventType) String() string
String returns the underlying value. The value may be an empty string, make sure to check if the value is set before using it.
func (*EventType) UnmarshalJSON ¶ added in v0.5.0
UnmarshalJSON sets the underlying value to the given JSON value if the value is a string. The previous value is discarded if the operation fails.
func (*EventType) UnmarshalText ¶ added in v0.5.0
UnmarshalText sets the underlying value to the given string, if valid. If the input is invalid, no changes are made to the receiver.
type FiniteReplayer ¶ added in v0.9.0
type FiniteReplayer struct {
// contains filtered or unexported fields
}
FiniteReplayer is a replayer that replays at maximum a certain number of events. The events must have an ID unless the replayer is configured to set IDs automatically.
func NewFiniteReplayer ¶ added in v0.9.0
func NewFiniteReplayer( count int, autoIDs bool, ) (*FiniteReplayer, error)
NewFiniteReplayer creates a finite replay provider with the given max count and auto ID behaviour.
Count is the maximum number of events FiniteReplayer should hold as valid. It must be greater than zero.
AutoIDs configures FiniteReplayer to automatically set the IDs of events.
func (*FiniteReplayer) Put ¶ added in v0.9.0
func (f *FiniteReplayer) Put(message *Message, topics []string) (*Message, error)
Put puts a message into the replayer's buffer. If there are more messages than the maximum number, the oldest message is removed.
func (*FiniteReplayer) Replay ¶ added in v0.9.0
func (f *FiniteReplayer) Replay(subscription Subscription) error
Replay replays the stored messages to the listener.
type Joe ¶
type Joe struct { // An optional replayer that Joe uses to resend older messages to new subscribers. Replayer Replayer // contains filtered or unexported fields }
Joe is a basic server provider that synchronously executes operations by queueing them in channels. Events are also sent synchronously to subscribers, so if a subscriber's callback blocks, the others have to wait.
Joe optionally supports event replaying with the help of a Replayer.
If the replayer panics, the subscription for which it panicked is considered failed and an error is returned, and thereafter the replayer is not used anymore – no replays will be attempted for future subscriptions. If due to some other unexpected scenario something panics internally, Joe will remove all subscribers and close itself, so subscribers don't end up blocked.
He serves simple use-cases well, as he's light on resources, and does not require any external services. Also, he is the default provider for Servers.
func (*Joe) Publish ¶
Publish tells Joe to send the given message to the subscribers. When a message is published to multiple topics, Joe makes sure to not send the Message multiple times to clients that are subscribed to more than one topic that receive the given Message. Every client receives each unique message once, regardless of how many topics it is subscribed to or to how many topics the message is published.
It returns ErrNoTopic if no topics are provided, eventual Replayer.Put errors or ErrProviderClosed. If the replayer returns an error the message will still be sent but most probably it won't be replayed to new subscribers, depending on how the error is handled by the replay provider.
type LogLevel ¶ added in v0.8.0
type LogLevel int
LogLevel are the supported log levels of the Server's Logger.
type Logger ¶ added in v0.4.1
type Logger interface { // Log is called by the Server to log an event. The http.Request context // is passed. The message string is useful for display and the data contains // additional information about the event. // // When the log level is Error, the data map will contain an "err" key // with a value of type error. This is the error that triggered the log // event. // // If the data map contains the "lastEventID" key, then it means that // a client is being subscribed. The value corresponding to "lastEventID" // is of type sse.EventID; there will also be a "topics" key, with a value of // type []string, which contains all the topics the client is being // subscribed to. Log(ctx context.Context, level LogLevel, msg string, data map[string]any) }
The Logger interface which the Server expects. Adapt your loggers to this interface in order to use it with the Server.
type Message ¶
type Message struct { ID EventID Type EventType Retry time.Duration // contains filtered or unexported fields }
Message is the representation of an event sent from the server to its clients.
func (*Message) AppendComment ¶ added in v0.5.0
AppendComment adds comment fields to the message's event. If the comments span multiple lines, they are broken into multiple comment fields.
func (*Message) AppendData ¶
AppendData adds multiple data fields on the message's event from the given strings. Each string will be a distinct data field, and if the strings themselves span multiple lines they will be broken into multiple fields.
Server-sent events are not suited for binary data: the event fields are delimited by newlines, where a newline can be a LF, CR or CRLF sequence. When the client interprets the fields, it joins multiple data fields using LF, so information is altered. Here's an example:
initial payload: This is a\r\nmultiline\rtext.\nIt has multiple\nnewline\r\nvariations. data sent over the wire: data: This is a data: multiline data: text. data: It has multiple data: newline data: variations data received by client: This is a\nmultiline\ntext.\nIt has multiple\nnewline\nvariations.
Each line prepended with "data:" is a field; multiple data fields are joined together using LF as the delimiter. If you attempted to send the same payload without prepending the "data:" prefix, like so:
data: This is a multiline text. It has multiple newline variations
there would be only one data field (the first one). The rest would be different fields, named "multiline", "text.", "It has multiple" etc., which are invalid fields according to the protocol.
Besides, the protocol explicitly states that event streams must always be UTF-8 encoded: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream.
If you need to send binary data, you can use a Base64 encoder or any other encoder that does not output any newline characters (\r or \n) and then append the resulted data.
Given that clients treat all newlines the same and replace the original newlines with LF, for internal code simplicity AppendData replaces them aswell.
func (*Message) MarshalText ¶
MarshalText writes the standard textual representation of the message's event. Marshalling and unmarshalling will result in a message with an event that has the same fields; topic will be lost.
If you want to preserve everything, create your own custom marshalling logic. For an example using encoding/json, see the top-level MessageCustomJSONMarshal example.
Use the WriteTo method if you don't need the byte representation.
The representation is written to a bytes.Buffer, which means the error is always nil. If the buffer grows to a size bigger than the maximum allowed, MarshalText will panic. See the bytes.Buffer documentation for more info.
func (*Message) String ¶
String writes the message's event standard textual representation to a strings.Builder and returns the resulted string. It may panic if the representation is too long to be buffered.
Use the WriteTo method if you don't actually need the string representation.
func (*Message) UnmarshalText ¶
UnmarshalText extracts the first event found in the given byte slice into the receiver. The input is expected to be a wire format event, as defined by the spec. Therefore, previous fields present on the Message will be overwritten (i.e. event, ID, comments, data, retry).
Unmarshaling ignores fields with invalid names. If no valid fields are found, an error is returned. For a field to be valid it must end in a newline - if the last field of the event doesn't end in one, an error is returned.
All returned errors are of type UnmarshalError.
type MessageWriter ¶ added in v0.5.2
type MessageWriter interface { // Sens sends the message to the client. // To make sure it is sent, call Flush. Send(m *Message) error // Flush sends any buffered messages to the client. Flush() error }
MessageWriter is a special kind of response writer used by providers to send Messages to clients.
type Provider ¶
type Provider interface { // Subscribe to the provider. The context is used to remove the subscriber automatically // when it is done. Errors returned by the subscription's callback function must be returned // by Subscribe. // // Providers can assume that the topics list for a subscription has at least one topic. Subscribe(ctx context.Context, subscription Subscription) error // Publish a message to all the subscribers that are subscribed to the given topics. // The topics slice must be non-empty, or ErrNoTopic will be raised. Publish(message *Message, topics []string) error // Shutdown stops the provider. Calling Shutdown will clean up all the provider's resources // and make Subscribe and Publish fail with an error. All the listener channels will be // closed and any ongoing publishes will be aborted. // // If the given context times out before the provider is shut down – shutting it down takes // longer, the context error is returned. // // Calling Shutdown multiple times after it successfully returned the first time // does nothing but return ErrProviderClosed. Shutdown(ctx context.Context) error }
A Provider is a publish-subscribe system that can be used to implement a HTML5 server-sent events protocol. A standard interface is required so HTTP request handlers are agnostic to the provider's implementation.
Providers are required to be thread-safe.
After Shutdown is called, trying to call any method of the provider must return ErrProviderClosed. The providers may return other implementation-specific errors too, but the close error is guaranteed to be the same across providers.
type ReadConfig ¶ added in v0.10.0
type ReadConfig struct { // MaxEventSize is the maximum expected length of the byte sequence // representing a single event. Parsing events longer than that // will result in an error. // // By default this limit is 64KB. You don't need to set this if it // is enough for your needs (e.g. the events you receive don't contain // larger amounts of data). MaxEventSize int }
ReadConfig is used to configure how Read behaves.
type Replayer ¶ added in v0.9.0
type Replayer interface { // Put adds a new event to the replay buffer. The Message that is returned may not have the // same address, if the replayer automatically sets IDs. // // Put errors if the message couldn't be queued – if no topics are provided, // a message without an ID is put into a Replayer which does not // automatically set IDs, or a message with an ID is put into a Replayer which // does automatically set IDs. An error should be returned for other failures // related to the given message. When no topics are provided, ErrNoTopic should be // returned. // // The Put operation may be executed by the replayer in another goroutine only if // it can ensure that any Replay operation called after the Put goroutine is started // can replay the new received message. This also requires the replayer implementation // to be thread-safe. // // Replayers are not required to guarantee that immediately after Put returns // the new messages can be replayed. If an error occurs internally when putting the new message // and retrying the operation would block for too long, it can be aborted. // // To indicate a complete replayer failure (i.e. the replayer won't work after this point) // a panic should be used instead of an error. Put(message *Message, topics []string) (*Message, error) // Replay sends to a new subscriber all the valid events received by the replayer // since the event with the listener's ID. If the ID the listener provides // is invalid, the provider should not replay any events. // // Replay calls must return only after replaying is done. // Implementations should not keep references to the subscription client // after Replay returns. // // If an error is returned, then at least some messages weren't successfully replayed. // The error is nil if there were no messages to replay for the particular subscription // or if all messages were replayed successfully. // // If any messages are replayed, Client.Flush must be called by implementations. Replay(subscription Subscription) error }
A Replayer is a type that can replay older published events to new subscribers. Replayers use event IDs, the topics the events were published and optionally any other criteria to determine which are valid for replay.
While replayers can require events to have IDs beforehand, they can also set the IDs themselves, automatically - it's up to the implementation. Replayers should not overwrite or remove any existing IDs and return an error instead.
Replayers are not required to be thread-safe - server providers are required to ensure only one operation is executed on the replayer at any given time. Server providers may not execute replay operation concurrently with other operations, so make sure any action on the replayer blocks for as little as possible. If a replayer is thread-safe, some operations may be run in a separate goroutine - see the interface's method documentation.
Executing actions that require waiting for a long time on I/O, such as HTTP requests or database calls must be handled with great care, so the server provider is not blocked. Reducing them to the minimum by using techniques such as caching or by executing them in separate goroutines is recommended, as long as the implementation fulfills the requirements.
If not specified otherwise, the errors returned are implementation-specific.
type ResponseValidator ¶
The ResponseValidator type defines the type of the function that checks whether server responses are valid, before starting to read events from them. See the Client's documentation for more info.
These errors are considered permanent and thus if the client is configured to retry on error no retry is attempted and the error is returned.
var DefaultValidator ResponseValidator = func(r *http.Response) error { if r.StatusCode != http.StatusOK { return fmt.Errorf("expected status code %d %s, received %d %s", http.StatusOK, http.StatusText(http.StatusOK), r.StatusCode, http.StatusText(r.StatusCode)) } cts := r.Header.Get("Content-Type") ct := contentType(cts) if expected := "text/event-stream"; ct != expected { return fmt.Errorf("expected content type to have %q, received %q", expected, cts) } return nil }
DefaultValidator is the default client response validation function. As per the spec, It checks the content type to be text/event-stream and the response status code to be 200 OK.
If this validator fails, errors are considered permanent. No retry attempts are made.
See https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model.
var NoopValidator ResponseValidator = func(_ *http.Response) error { return nil }
NoopValidator is a client response validator function that treats all responses as valid.
type ResponseWriter ¶ added in v0.6.0
type ResponseWriter interface { http.ResponseWriter Flush() error }
ResponseWriter is a http.ResponseWriter augmented with a Flush method.
type Server ¶
type Server struct { // The provider used to publish and subscribe clients to events. // Defaults to Joe. Provider Provider // A callback that's called when an SSE session is started. // You can use this to authorize the session, set the topics // the client should be subscribed to and so on. Using the // Res field of the Session you can write an error response // to the client. // // The boolean returned indicates whether the returned subscription // is valid or not. If it is valid, the Provider will receive it // and events will be sent to this client, otherwise the request // will be ended. // // If this is not set, the client will be subscribed to the provider // using the DefaultTopic. OnSession func(*Session) (Subscription, bool) // If Logger is not nil, the Server will log various information about // the request lifecycle. See the documentation of Logger for more info. Logger Logger // contains filtered or unexported fields }
A Server is mostly a convenience wrapper around a Provider. It implements the http.Handler interface and has some methods for calling the underlying provider's methods.
When creating a server, if no provider is specified using the WithProvider option, the Joe provider found in this package with no replay provider is used.
func (*Server) Publish ¶
Publish sends the event to all subscribes that are subscribed to the topic the event is published to. The topics are optional - if none are specified, the event is published to the DefaultTopic.
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements a default HTTP handler for a server.
This handler upgrades the request, subscribes it to the server's provider and starts sending incoming events to the client, while logging any errors. It also sends the Last-Event-ID header's value, if present.
If the request isn't upgradeable, it writes a message to the client along with an 500 Internal Server ConnectionError response code. If on subscribe the provider returns an error, it writes the error message to the client and a 500 Internal Server ConnectionError response code.
To customize behavior, use the OnSession callback or create your custom handler.
func (*Server) Shutdown ¶
Shutdown closes all the connections and stops the server. Publish operations will fail with the error sent by the underlying provider. NewServer requests will be ignored.
Call this method when shutting down the HTTP server using http.Server's RegisterOnShutdown method. Not doing this will result in the server never shutting down or connections being abruptly stopped.
See the Provider.Shutdown documentation for information on context usage and errors.
type Session ¶ added in v0.6.0
type Session struct { // The response writer for the request. Can be used to write an error response // back to the client. Must not be used after the Session was subscribed! Res ResponseWriter // The initial HTTP request. Can be used to retrieve authentication data, // topics, or data from context – a logger, for example. Req *http.Request // Last event ID of the client. It is unset if no ID was provided in the Last-Event-Id // request header. LastEventID EventID // contains filtered or unexported fields }
A Session is an HTTP request from an SSE client. Create one using the Upgrade function.
Using a Session you can also access the initial HTTP request, get the last event ID, or write data to the client.
func Upgrade ¶
Upgrade upgrades an HTTP request to support server-sent events. It returns a Session that's used to send events to the client, or an error if the upgrade failed.
The headers required by the SSE protocol are only sent when calling the Send method for the first time. If other operations are done before sending messages, other headers and status codes can safely be set.
type Subscription ¶
type Subscription struct { // The client to which messages are sent. The implementation of the interface does not have to be // thread-safe – providers will not call methods on it concurrently. Client MessageWriter // An optional last event ID indicating the event to resume the stream from. // The events will replay starting from the first valid event sent after the one with the given ID. // If the ID is invalid replaying events will be omitted and new events will be sent as normal. LastEventID EventID // The topics to receive message from. Must be a non-empty list. // Topics are orthogonal to event types. They are used to filter what the server sends to each client. Topics []string }
The Subscription struct is used to subscribe to a given provider.
type UnmarshalError ¶
type UnmarshalError struct { Reason error FieldName string // The value of the invalid field. FieldValue string }
UnmarshalError is the error returned by the Message's UnmarshalText method. If the error is related to a specific field, FieldName will be a non-empty string. If no fields were found in the target text or any other errors occurred, only a Reason will be provided. Reason is always present.
func (*UnmarshalError) Error ¶
func (u *UnmarshalError) Error() string
func (*UnmarshalError) Unwrap ¶
func (u *UnmarshalError) Unwrap() error
type ValidReplayer ¶ added in v0.9.0
type ValidReplayer struct { // The function used to retrieve the current time. Defaults to time.Now. // Useful when testing. Now func() time.Time // After how long the replayer should attempt to clean up expired events. // By default cleanup is done after a fourth of the TTL has passed; this means // that messages may be stored for a duration equal to 5/4*TTL. If this is not // desired, set the GC interval to a value sensible for your use case or set // it to 0 – this disables automatic cleanup, enabling you to do it manually // using the GC method. GCInterval time.Duration // contains filtered or unexported fields }
ValidReplayer is a Replayer that replays all the buffered non-expired events.
The replayer removes any expired events when a new event is put and after at least a GCInterval period passed.
The events must have an ID unless the replayer is configured to set IDs automatically.
func NewValidReplayer ¶ added in v0.9.0
func NewValidReplayer(ttl time.Duration, autoIDs bool) (*ValidReplayer, error)
NewValidReplayer creates a ValidReplayer with the given message lifetime duration (time-to-live) and auto ID behavior.
The TTL must be a positive duration. It is technically possible to use a very big duration in order to store and replay every message put for the lifetime of the program; this is not recommended, as memory usage becomes effectively unbounded which might lead to a crash.
func (*ValidReplayer) GC ¶ added in v0.9.0
func (v *ValidReplayer) GC()
GC removes all the expired messages from the replayer's buffer.
func (*ValidReplayer) Put ¶ added in v0.9.0
func (v *ValidReplayer) Put(message *Message, topics []string) (*Message, error)
Put puts the message into the replayer's buffer.
func (*ValidReplayer) Replay ¶ added in v0.9.0
func (v *ValidReplayer) Replay(subscription Subscription) error
Replay replays all the valid messages to the listener.