estream

package
v0.0.1-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2024 License: BSD-3-Clause, BSD-3-Clause Imports: 5 Imported by: 0

README

EventStream

Event stream which uses JSON under the hood.

Documentation

Overview

Package estream - JSON-based event stream

Example
/* Set up a pair of event handlers. */
done := make(chan struct{})
leftConn, rightConn := net.Pipe()
leftStream := New(leftConn)
AddHandler[int](leftStream, "number", func(name string, number int) {
	fmt.Printf("Left got a number: %d\n", number)
	if err := leftStream.Send("boolean", true); nil != err {
		fmt.Printf(
			"Error sending boolean to right side: %s",
			err,
		)
	}
})
rightStream := New(rightConn)
AddHandler[bool](rightStream, "boolean", func(name string, tf bool) {
	fmt.Printf("Right got a boolean: %t\n", tf)
	close(done)
})

/* Start processing events. */
go func() {
	if err := leftStream.Run(); nil != err &&
		!errors.Is(err, io.EOF) {
		fmt.Printf("Left run error: %s\n", err)
	}
}()
go func() {
	if err := rightStream.Run(); nil != err &&
		!errors.Is(err, io.EOF) {
		fmt.Printf("Right run error: %s\n", err)
	}
}()

/* Send an event which makes an event. */
if err := rightStream.Send("number", 100); nil != err {
	fmt.Printf("Error sending number to left side: %s", err)
}

/* Wait for the events to finish processing. */
<-done
Output:

Left got a number: 100
Right got a boolean: true

Index

Examples

Constants

This section is empty.

Variables

View Source
var Delete = (func(string, any))(nil)

Delete may be passed to AddHandler to request deletion of a handler. It is equivalent to AddHandler[any](s, name, nil), but somewhat easier to read.

Functions

func AddHandler

func AddHandler[T any](s *Stream, name string, handler func(string, T))

AddHandler adds a handler to a stream. It is not a method on Stream due to restrictions on generic functions. AddHandler may be called at any time, even during a call to s.Run. If handler is nil, s's handler for the given name is deleted, if it has one. To set a default handler, pass the empty string as the name.

Types

type HandlerStoppedError

type HandlerStoppedError struct {
	Name string /* Event name. */
	Data any    /* Event data, unmarshalled. */
}

HandlerStoppedError is returned from Run and RunOnce when an event is received while a call to WaitForHandlers is blocking.

func (HandlerStoppedError) Error

func (err HandlerStoppedError) Error() string

Error implements the error interface.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a bidirectional stream of events.

func New

func New(rwc io.ReadWriteCloser) *Stream

New creats a new stream. After creating the stream, call AddHandler to add handlers and then call Run to process events.

func (*Stream) Close

func (s *Stream) Close() error

Close closes s's underlying io.ReadWriteCloser.

func (*Stream) Run

func (s *Stream) Run() error

Run start event processing on the stream. The stream will not read from its underlying io.ReadWriteCloser until Run or RunOnce is called, which may cause blocking if care is not taken.

func (*Stream) RunOnce

func (s *Stream) RunOnce() error

RunOnce processes a single event on the stream.

func (*Stream) Send

func (s *Stream) Send(name string, data any) error

Send sends an event with the given event name and data via the Stream's underlying io.ReadWriteCloser.

func (*Stream) SendJSONSLogs

func (s *Stream) SendJSONSLogs(r io.Reader) error

SendJSONSLogs reads JSON log objects as produced by slog.JSONHandler and sends them via s.Send using the msg field of the JSON objects as the event names.

func (*Stream) WaitForHandlers

func (s *Stream) WaitForHandlers()

WaitForHandlers waits until no more handlers are running. While WaitForHandlers/ is blocking, no new handlers will be started. This should usually only be called after Run or RunOnce return an error and shouldn't be called concurrently witht either.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL