Documentation ¶
Index ¶
- Constants
- func ClientMaxBufferSize(s int) func(c *Client)
- type Client
- func (c *Client) OnConnect(fn ConnCallback)
- func (c *Client) OnDisconnect(fn ConnCallback)
- func (c *Client) Subscribe(stream string, handler func(msg *Event)) error
- func (c *Client) SubscribeChan(stream string, ch chan *Event) error
- func (c *Client) SubscribeChanRaw(ch chan *Event) error
- func (c *Client) SubscribeChanRawWithContext(ctx context.Context, ch chan *Event) error
- func (c *Client) SubscribeChanWithContext(ctx context.Context, stream string, ch chan *Event) error
- func (c *Client) SubscribeRaw(handler func(msg *Event)) error
- func (c *Client) SubscribeRawWithContext(ctx context.Context, handler func(msg *Event)) error
- func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error
- func (c *Client) Unsubscribe(ch chan *Event)
- type ConnCallback
- type Event
- type EventLog
- type EventStreamReader
- type ResponseValidator
- type Server
- func (s *Server) Close()
- func (s *Server) CreateStream(id string) *Stream
- func (s *Server) Publish(id string, event *Event)
- func (s *Server) RemoveStream(id string)
- func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *Server) StreamExists(id string) bool
- func (s *Server) TryPublish(id string, event *Event) bool
- type Stream
- type Subscriber
Constants ¶
const DefaultBufferSize = 1024
DefaultBufferSize size of the queue that holds the streams messages.
Variables ¶
This section is empty.
Functions ¶
func ClientMaxBufferSize ¶
Types ¶
type Client ¶
type Client struct { Retry time.Time ReconnectStrategy backoff.BackOff Headers map[string]string ReconnectNotify backoff.Notify ResponseValidator ResponseValidator Connection *http.Client URL string LastEventID atomic.Value // []byte EncodingBase64 bool Connected bool // contains filtered or unexported fields }
Client handles an incoming server stream
func (*Client) OnConnect ¶
func (c *Client) OnConnect(fn ConnCallback)
OnConnect specifies the function to run when the connection is successful
func (*Client) OnDisconnect ¶
func (c *Client) OnDisconnect(fn ConnCallback)
OnDisconnect specifies the function to run when the connection disconnects
func (*Client) SubscribeChan ¶
SubscribeChan sends all events to the provided channel
func (*Client) SubscribeChanRaw ¶
SubscribeChanRaw sends all events to the provided channel
func (*Client) SubscribeChanRawWithContext ¶
SubscribeChanRawWithContext sends all events to the provided channel with context
func (*Client) SubscribeChanWithContext ¶
SubscribeChanWithContext sends all events to the provided channel with context
func (*Client) SubscribeRaw ¶
SubscribeRaw to an sse endpoint
func (*Client) SubscribeRawWithContext ¶
SubscribeRawWithContext to an sse endpoint with context
func (*Client) SubscribeWithContext ¶
func (c *Client) SubscribeWithContext(ctx context.Context, stream string, handler func(msg *Event)) error
SubscribeWithContext to a data stream with context
func (*Client) Unsubscribe ¶
Unsubscribe unsubscribes a channel
type ConnCallback ¶
type ConnCallback func(c *Client)
ConnCallback defines a function to be called on a particular connection event
type Event ¶
type Event struct { ID []byte Data []byte Event []byte Retry []byte Comment []byte // contains filtered or unexported fields }
Event holds all of the event source fields
type EventLog ¶
EventLog holds unexpired previous events
func NewEventLog ¶
NewEventLog creates a new Event Log.
EventTTL determines for how long the event is considered valid. Valid events will be replayed for newly joined clients. Expired events are periodically removed from the log to save space if EventTTL != 0. To preserve backwards compatibility, with EventTTL == 0 all events ever published on a given stream are forever retained and replayed, bevare of the balooning memory as a result.
MaxCapacity is a soft limit on the number of entries in the Event Log. Filling the Event Log up to MaxCapacity will trigger an unscheduled removal of expired entries; if that is not enough, the oldest entry in the Event Log will be deleted to free up the space. MaxCapacity == 0 means unlimited capacity.
func (*EventLog) Replay ¶
func (e *EventLog) Replay(s *Subscriber)
Replay plays unexpired previous events to a subscriber
type EventStreamReader ¶
type EventStreamReader struct {
// contains filtered or unexported fields
}
EventStreamReader scans an io.Reader looking for EventStream messages.
func NewEventStreamReader ¶
func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader
NewEventStreamReader creates an instance of EventStreamReader.
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() ([]byte, error)
ReadEvent scans the EventStream for events.
type ResponseValidator ¶
ResponseValidator validates a response
type Server ¶
type Server struct { // Extra headers adding to the HTTP response to each client Headers map[string]string // Sets a ttl that prevents old events from being transmitted EventTTL time.Duration // Max messages stored for replaying per stream, measured in items MaxCapacity int // Specifies the size of the message buffer for each stream BufferSize int // Encodes all data as base64 EncodeBase64 bool // Splits an events data into multiple data: entries SplitData bool // Enables creation of a stream when a client connects AutoStream bool // Enables automatic replay for each new subscriber that connects AutoReplay bool // Specifies the function to run when client subscribe or un-subscribe OnSubscribe func(streamID string, sub *Subscriber) OnUnsubscribe func(streamID string, sub *Subscriber) // contains filtered or unexported fields }
Server Is our main struct
func NewWithCallback ¶
func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server
NewWithCallback will create a server and setup defaults with callback function
func (*Server) Close ¶
func (s *Server) Close()
Close shuts down the server, closes all of the streams and connections
func (*Server) CreateStream ¶
CreateStream will create a new stream and register it
func (*Server) Publish ¶
Publish sends a mesage to every client in a streamID. If the stream's buffer is full, it blocks until the message is sent out to all subscribers (but not necessarily arrived the clients), or when the stream is closed.
func (*Server) RemoveStream ¶
RemoveStream will remove a stream
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP serves new connections with events for a given stream ...
func (*Server) StreamExists ¶
StreamExists checks whether a stream by a given id exists
func (*Server) TryPublish ¶
TryPublish is the same as Publish except that when the operation would cause the call to be blocked, it simply drops the message and returns false. Together with a small BufferSize, it can be useful when publishing the latest message ASAP is more important than reliable delivery.
type Stream ¶
type Stream struct { ID string Eventlog *EventLog // Enables replaying of eventlog to newly added subscribers AutoReplay bool // Specifies the function to run when client subscribe or un-subscribe OnSubscribe func(streamID string, sub *Subscriber) OnUnsubscribe func(streamID string, sub *Subscriber) // contains filtered or unexported fields }
Stream ...
type Subscriber ¶
Subscriber ...