Documentation ¶
Index ¶
- type Acker
- type Event
- type Reader
- func (r *Reader) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error
- func (r *Reader) Close()
- func (r *Reader) IsClosed() bool
- func (r *Reader) RemoveStream(ctx context.Context, stream *Stream) error
- func (r *Reader) Subscribe() <-chan *Event
- func (r *Reader) Unsubscribe(c <-chan *Event)
- type Sink
- func (s *Sink) Ack(ctx context.Context, e *Event) error
- func (s *Sink) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error
- func (s *Sink) Close()
- func (s *Sink) IsClosed() bool
- func (s *Sink) RemoveStream(ctx context.Context, stream *Stream) error
- func (s *Sink) Subscribe() <-chan *Event
- func (s *Sink) Unsubscribe(c <-chan *Event)
- type Stream
- func (s *Stream) Add(ctx context.Context, name string, payload []byte, opts ...options.AddEvent) (string, error)
- func (s *Stream) Destroy(ctx context.Context) error
- func (s *Stream) NewReader(ctx context.Context, opts ...options.Reader) (*Reader, error)
- func (s *Stream) NewSink(ctx context.Context, name string, opts ...options.Sink) (*Sink, error)
- func (s *Stream) Remove(ctx context.Context, ids ...string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Acker ¶ added in v1.0.0
type Acker interface {
XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
}
Acker is the interface used by events to acknowledge themselves.
type Event ¶
type Event struct { // ID is the unique event ID. ID string // StreamName is the name of the stream the event belongs to. StreamName string // SinkName is the name of the sink the event belongs to. SinkName string // EventName is the producer-defined event name. EventName string // Topic is the producer-defined event topic if any, empty string if none. Topic string // Payload is the event payload. Payload []byte // Acker is the redis client used to acknowledge events. Acker Acker // contains filtered or unexported fields }
Event is a stream event.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader represents a stream reader.
func (*Reader) AddStream ¶
AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.
func (*Reader) Close ¶
func (r *Reader) Close()
Close stops event polling and closes the reader channel. It is safe to call Close multiple times.
func (*Reader) RemoveStream ¶
RemoveStream removes the stream from the sink, it is idempotent.
func (*Reader) Subscribe ¶
Subscribe returns a channel that receives events from the stream. The channel is closed when the reader is closed.
func (*Reader) Unsubscribe ¶
Unsubscribe removes the channel from the reader subscribers and closes it.
type Sink ¶
type Sink struct { // Name is the sink name. Name string // contains filtered or unexported fields }
Sink represents a stream sink.
func (*Sink) AddStream ¶
AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.
func (*Sink) Close ¶
func (s *Sink) Close()
Close stops event polling, waits for all events to be processed, and closes the sink channel. It is safe to call Close multiple times.
func (*Sink) RemoveStream ¶
RemoveStream removes the stream from the sink, it is idempotent.
func (*Sink) Unsubscribe ¶
Unsubscribe removes the channel from the sink and closes it.
type Stream ¶
type Stream struct { // Name of the stream. Name string // MaxLen is the maximum number of events in the stream. MaxLen int // contains filtered or unexported fields }
Stream encapsulates a stream of events. Events published to a stream can optionally be associated with a topic. Stream consumers can subscribe to a stream and optionally provide a topic matching criteria. Consumers can be created within a group. Each consumer group receives a unique copy of the stream events.
func NewStream ¶
NewStream returns the stream with the given name. All stream instances with the same name share the same events.
func (*Stream) Add ¶
func (s *Stream) Add(ctx context.Context, name string, payload []byte, opts ...options.AddEvent) (string, error)
Add appends an event to the stream and returns its ID. If the option WithOnlyIfStreamExists is used and the stream does not exist then no event is added and the empty string is returned. The stream is created if the option is omitted or when NewSink is called.
func (*Stream) NewReader ¶
NewReader creates a new stream reader. All reader instances get all the events in the stream. Events are read starting:
- from the last event by default
- from the oldest event stored in the stream if the WithReaderStartAtOldest option is used
- after the event with the ID provided via WithReaderLastEventID if the event is still in the stream, oldest event otherwise
- from the event added on or after the timestamp provided via WithReaderStartAt if still in the stream, oldest event otherwise
func (*Stream) NewSink ¶
NewSink creates a new stream sink with the given name. All sink instances with the same name share the same stream cursor. Events read through a sink are not removed from the stream until they are acked by the client unless the WithNoAck option is used. Events are read starting:
- from the last event by default
- from the oldest event stored in the stream if the WithSinkStartAtOldest option is used
- after the event with the ID provided via WithSinkLastEventID if the event is still in the stream, oldest event otherwise
- from the event added on or after the timestamp provided via WithSinkStartAt if still in the stream, oldest event otherwise