Documentation ¶
Overview ¶
Package estream - JSON-based event stream
Example ¶
/* Set up a pair of event handlers. */ done := make(chan struct{}) leftConn, rightConn := net.Pipe() leftStream := New(leftConn) AddHandler[int](leftStream, "number", func(name string, number int) { fmt.Printf("Left got a number: %d\n", number) if err := leftStream.Send("boolean", true); nil != err { fmt.Printf( "Error sending boolean to right side: %s", err, ) } }) rightStream := New(rightConn) AddHandler[bool](rightStream, "boolean", func(name string, tf bool) { fmt.Printf("Right got a boolean: %t\n", tf) close(done) }) /* Start processing events. */ go func() { if err := leftStream.Run(); nil != err && !errors.Is(err, io.EOF) { fmt.Printf("Left run error: %s\n", err) } }() go func() { if err := rightStream.Run(); nil != err && !errors.Is(err, io.EOF) { fmt.Printf("Right run error: %s\n", err) } }() /* Send an event which makes an event. */ if err := rightStream.Send("number", 100); nil != err { fmt.Printf("Error sending number to left side: %s", err) } /* Wait for the events to finish processing. */ <-done
Output: Left got a number: 100 Right got a boolean: true
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Delete = (func(string, any))(nil)
Delete may be passed to AddHandler to request deletion of a handler. It is equivalent to AddHandler[any](s, name, nil), but somewhat easier to read.
Functions ¶
func AddHandler ¶
AddHandler adds a handler to a stream. It is not a method on Stream due to restrictions on generic functions. AddHandler may be called at any time, even during a call to s.Run. If handler is nil, s's handler for the given name is deleted, if it has one. To set a default handler, pass the empty string as the name.
Types ¶
type HandlerStoppedError ¶
type HandlerStoppedError struct { Name string /* Event name. */ Data any /* Event data, unmarshalled. */ }
HandlerStoppedError is returned from Run and RunOnce when an event is received while a call to WaitForHandlers is blocking.
func (HandlerStoppedError) Error ¶
func (err HandlerStoppedError) Error() string
Error implements the error interface.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is a bidirectional stream of events.
func New ¶
func New(rwc io.ReadWriteCloser) *Stream
New creats a new stream. After creating the stream, call AddHandler to add handlers and then call Run to process events.
func (*Stream) Run ¶
Run start event processing on the stream. The stream will not read from its underlying io.ReadWriteCloser until Run or RunOnce is called, which may cause blocking if care is not taken.
func (*Stream) Send ¶
Send sends an event with the given event name and data via the Stream's underlying io.ReadWriteCloser.
func (*Stream) SendJSONSLogs ¶
SendJSONSLogs reads JSON log objects as produced by slog.JSONHandler and sends them via s.Send using the msg field of the JSON objects as the event names.
func (*Stream) WaitForHandlers ¶
func (s *Stream) WaitForHandlers()
WaitForHandlers waits until no more handlers are running. While WaitForHandlers/ is blocking, no new handlers will be started. This should usually only be called after Run or RunOnce return an error and shouldn't be called concurrently witht either.