streaming

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2024 License: MIT Imports: 17 Imported by: 0

README

Streaming

Pulse leverages Redis streams to provide scalable and reliable event streams that can be used to implement distributed architectures. Pulse provides a simple API to create and consume streams, for example:

Single Reader

The code above creates a stream and adds a new event to it. The event is then consumed by a reader. The reader is closed after the event is consumed.

%%{init: {'themeVariables': { 'background': '#282828', 'mainBkg': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    main-->|Add|Stream
    Stream-.->|Event|Reader
    Reader-.->|Event|main

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Reader pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

Multiple readers can be created for the same stream across many nodes. Readers are independent and each instance receives a copy of the same events. Readers can specify a start position for the stream cursor. The default start position is the last event in the stream.

Multi Reader

%%{init: {'themeVariables': { 'background': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    main-->|Add 1, 2|Stream
    Reader-.->|Events 1, 2|main
    Reader2-.->|Event 2|main
    Stream-.->|Events 1, 2|Reader
    Stream-.->|Event 2|Reader2[Other Reader]

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Reader,Reader2 pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

Event Sinks

Event sinks enable concurrent processing of a sequence of events for better performance. They also enable redundancy in case of node failure or network partitions.

Event sinks make it possible for multiple nodes to share the same stream cursor. If a stream contains 3 events and 3 nodes are consuming the stream using the same sink (i.e. a sink with the same name), then each node will receive a unique event from the sequence. Nodes using a different sink (or a reader) will receive copies of the same events.

Events read from a sink must be acknowledged by the client. Pulse automatically requeues events added to a sink that have been read by a node but not acknowledged.

Creating a sink is as simple as:

Single Sink

Note a couple of differences with the reader example above:

  • Sinks are given a name during creation, multiple nodes using the same name share the same stream cursor.
  • Events are acknowledged using sink.Ack. This provides an at-least-once delivery guarantee where unacknowledged events are automatically re-queued.
%%{init: {'themeVariables': { 'background': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    main
    Stream
    Sink
    main-->|Add|Stream
    Stream-.->|Event|Sink
    Sink-.->|Event|main
    main-->|Ack|Sink

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Sink pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

As with readers, multiple sinks can be created for the same stream. Copies of the same event are distributed among all sinks.

Multi Sink

%%{init: {'themeVariables': { 'background': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    main-->|Add 1, 2|Stream
    Sink-.->|Events 1, 2|main
    main-->|Ack 1, 2|Sink
    Sink2-.->|Event 2|main
    main-->|Ack 2|Sink2
    Stream-.->|Events 1, 2|Sink
    Stream-.->|Event 2|Sink2[Other Sink]

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Sink,Sink2 pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

Reading from multiple streams

Readers and sinks can also read concurrently from multiple streams:

Multi Stream

%%{init: {'themeVariables': { 'background': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart LR
    main-->|Add 1|Stream
    main-->|Add 2|Stream2[Other Stream]
    Sink-.->|Event 1|main
    Sink-.->|Event 2|main
    main-->|Ack 1|Sink
    main-->|Ack 2|Sink
    Stream-.->|Event 1|Sink
    Stream2-.->|Event 2|Sink

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Stream2,Sink pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

AddStream can be called at any time to add new streams to a reader or a sink. Streams can also be removed using RemoveStream.

Remove Stream

Pub/Sub

Streams supports a flexible pub/sub mechanism where events can be attached to topics and readers or sinks can define simple or custom matching logic.

Pub/Sub

%%{init: {'themeVariables': { 'background': '#282828', 'edgeLabelBackground': '#7A7A7A'}}}%%
flowchart RL
    main-->|Add 1|Topic
    main-->|Add 2|Topic2
    subgraph Stream
        Topic2[Other Topic]
        Topic
    end
    Topic-.->|Event 1|Sink
    Topic2-.->|Event 2|Sink
    Sink-.->|Event 1|main
    Sink-.->|Event 2|main
    main-->|Ack 1|Sink
    main-->|Ack 2|Sink

    classDef userCode fill:#9A6D1F, stroke:#D9B871, stroke-width:2px, color:#FFF2CC;
    classDef pulse fill:#25503C, stroke:#5E8E71, stroke-width:2px, color:#D6E9C6;

    class main userCode;
    class Stream,Topic,Topic2,Sink pulse;

    linkStyle 0 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 1 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 2 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 3 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 4 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 5 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;
    linkStyle 6 stroke:#DDDDDD,color:#DDDDDD,stroke-width:3px;

Topics can be matched using their name as in the example above or using complex patterns. For example:

Pub/Sub

Note: Event filtering is done client-side in the sink or reader and does not affect the underlying stream. This means that events are still stored in the stream and can be consumed by other sinks.

Examples

The examples directory contains a number of examples that demonstrate the basic usage of the streaming package.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acker added in v1.0.0

type Acker interface {
	XAck(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd
}

Acker is the interface used by events to acknowledge themselves.

type Event

type Event struct {
	// ID is the unique event ID.
	ID string
	// StreamName is the name of the stream the event belongs to.
	StreamName string
	// SinkName is the name of the sink the event belongs to.
	SinkName string
	// EventName is the producer-defined event name.
	EventName string
	// Topic is the producer-defined event topic if any, empty string if none.
	Topic string
	// Payload is the event payload.
	Payload []byte
	// Acker is the redis client used to acknowledge events.
	Acker Acker
	// contains filtered or unexported fields
}

Event is a stream event.

func (*Event) CreatedAt

func (e *Event) CreatedAt() time.Time

CreatedAt returns the event creation time (millisecond precision).

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader represents a stream reader.

func (*Reader) AddStream

func (r *Reader) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error

AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.

func (*Reader) Close

func (r *Reader) Close()

Close stops event polling and closes the reader channel. It is safe to call Close multiple times.

func (*Reader) IsClosed

func (r *Reader) IsClosed() bool

IsClosed returns true if the reader is stopped.

func (*Reader) RemoveStream

func (r *Reader) RemoveStream(ctx context.Context, stream *Stream) error

RemoveStream removes the stream from the sink, it is idempotent.

func (*Reader) Subscribe

func (r *Reader) Subscribe() <-chan *Event

Subscribe returns a channel that receives events from the stream. The channel is closed when the reader is closed.

func (*Reader) Unsubscribe

func (r *Reader) Unsubscribe(c <-chan *Event)

Unsubscribe removes the channel from the reader subscribers and closes it.

type Sink

type Sink struct {
	// Name is the sink name.
	Name string
	// contains filtered or unexported fields
}

Sink represents a stream sink.

func (*Sink) Ack

func (s *Sink) Ack(ctx context.Context, e *Event) error

Ack acknowledges the event.

func (*Sink) AddStream

func (s *Sink) AddStream(ctx context.Context, stream *Stream, opts ...options.AddStream) error

AddStream adds the stream to the sink. By default the stream cursor starts at the same timestamp as the sink main stream cursor. This can be overridden with opts. AddStream does nothing if the stream is already part of the sink.

func (*Sink) Close

func (s *Sink) Close()

Close stops event polling, waits for all events to be processed, and closes the sink channel. It is safe to call Close multiple times.

func (*Sink) IsClosed

func (s *Sink) IsClosed() bool

IsClosed returns true if the sink was closed.

func (*Sink) RemoveStream

func (s *Sink) RemoveStream(ctx context.Context, stream *Stream) error

RemoveStream removes the stream from the sink, it is idempotent.

func (*Sink) Subscribe

func (s *Sink) Subscribe() <-chan *Event

Subscribe returns a channel that receives events from the sink.

func (*Sink) Unsubscribe

func (s *Sink) Unsubscribe(c <-chan *Event)

Unsubscribe removes the channel from the sink and closes it.

type Stream

type Stream struct {
	// Name of the stream.
	Name string
	// MaxLen is the maximum number of events in the stream.
	MaxLen int
	// contains filtered or unexported fields
}

Stream encapsulates a stream of events. Events published to a stream can optionally be associated with a topic. Stream consumers can subscribe to a stream and optionally provide a topic matching criteria. Consumers can be created within a group. Each consumer group receives a unique copy of the stream events.

func NewStream

func NewStream(name string, rdb *redis.Client, opts ...options.Stream) (*Stream, error)

NewStream returns the stream with the given name. All stream instances with the same name share the same events.

func (*Stream) Add

func (s *Stream) Add(ctx context.Context, name string, payload []byte, opts ...options.AddEvent) (string, error)

Add appends an event to the stream and returns its ID. If the option WithOnlyIfStreamExists is used and the stream does not exist then no event is added and the empty string is returned. The stream is created if the option is omitted or when NewSink is called.

func (*Stream) Destroy

func (s *Stream) Destroy(ctx context.Context) error

Destroy deletes the entire stream and all its messages.

func (*Stream) NewReader

func (s *Stream) NewReader(ctx context.Context, opts ...options.Reader) (*Reader, error)

NewReader creates a new stream reader. All reader instances get all the events in the stream. Events are read starting:

  • from the last event by default
  • from the oldest event stored in the stream if the WithReaderStartAtOldest option is used
  • after the event with the ID provided via WithReaderLastEventID if the event is still in the stream, oldest event otherwise
  • from the event added on or after the timestamp provided via WithReaderStartAt if still in the stream, oldest event otherwise

func (*Stream) NewSink

func (s *Stream) NewSink(ctx context.Context, name string, opts ...options.Sink) (*Sink, error)

NewSink creates a new stream sink with the given name. All sink instances with the same name share the same stream cursor. Events read through a sink are not removed from the stream until they are acked by the client unless the WithNoAck option is used. Events are read starting:

  • from the last event by default
  • from the oldest event stored in the stream if the WithSinkStartAtOldest option is used
  • after the event with the ID provided via WithSinkLastEventID if the event is still in the stream, oldest event otherwise
  • from the event added on or after the timestamp provided via WithSinkStartAt if still in the stream, oldest event otherwise

func (*Stream) Remove

func (s *Stream) Remove(ctx context.Context, ids ...string) error

Remove removes the events with the given IDs from the stream. Note: clients should not need to call this method in normal operation, instead they should use the Ack method to acknowledge events.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL