streams

package
v0.0.0-...-87a4e12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: MIT Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataSource

type DataSource interface {
	// contains filtered or unexported methods
}

DataSource is a source of events for a stream. It can be a list of subjects, a mirror of another stream, or an aggregate of multiple streams.

type DataSourceAggregate

type DataSourceAggregate struct {
	Sources []*StreamSource
}

DataSourceAggregate is used to indicate that a stream is an aggregate of multiple streams.

type DataSourceSubjects

type DataSourceSubjects struct {
	Subjects []string
}

DataSourceSubjects is a list of subjects that will be used as a source of events for a stream.

type DiscardPolicy

type DiscardPolicy string

DiscardPolicy defines the behavior when a limit on events is reached. The default behavior is to discard old events, but some applications may prefer to discard new events instead.

const (
	// DiscardPolicyDefault is the default discard policy, which is equivalent
	// to [DiscardPolicyOld].
	DiscardPolicyDefault DiscardPolicy = ""
	// DiscardPolicyOld discards old events when a limit is reached.
	DiscardPolicyOld DiscardPolicy = "old"
	// DiscardPolicyNew discards new events when a limit is reached.
	DiscardPolicyNew DiscardPolicy = "new"
)

type Option

type Option func(*Options) error

Option defines an option for configuring a stream.

func AggregateStreams

func AggregateStreams(sources ...*StreamSource) Option

AggregateStreams is used to indicate that a stream is an aggregate of multiple streams. At least one stream source is required.

This option is mutually exclusive with WithSubjects and [MirrorStream].

func WithDeduplicationWindow

func WithDeduplicationWindow(deduplicationWindow time.Duration) Option

WithDeduplicationWindow is used to indicate how long events should be deduplicated for.

Defaults to 2 minutes if not set.

func WithDiscardNewPerSubject

func WithDiscardNewPerSubject(discardNewPerSubject bool) Option

WithDiscardNewPerSubject sets whether to discard new events per subject when a limit is reached. The default is false.

Use with WithDiscardPolicy set to DiscardPolicyNew.

func WithDiscardPolicy

func WithDiscardPolicy(discardPolicy DiscardPolicy) Option

WithDiscardPolicy sets the discard policy to use when a limit is reached. The default is to discard old events.

If this is set to DiscardPolicyNew, use WithDiscardNewPerSubject to control whether to discard new events per subject.

func WithMaxAge

func WithMaxAge(maxAge time.Duration) Option

WithMaxAge sets the maximum age of events in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.

func WithMaxBytes

func WithMaxBytes(maxBytes uint) Option

WithMaxBytes sets the maximum number of bytes in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.

func WithMaxEventSize

func WithMaxEventSize(maxEventSize uint) Option

WithMaxEventSize is used to indicate the maximum size of an event. Can not be larger than 1 MiB.

Defaults to 1 MiB if not set.

func WithMaxEvents

func WithMaxEvents(maxEvents uint) Option

WithMaxEvents sets the maximum number of events in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.

func WithMaxEventsPerSubject

func WithMaxEventsPerSubject(maxEventsPerSubject uint) Option

WithMaxEventsPerSubject sets the maximum number of events per subject in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.

func WithStorageReplicas

func WithStorageReplicas(replicas uint) Option

WithStorageReplicas is used to indicate how many replicas of a stream should be stored. If not set, the default value of 1 will be used.

func WithStorageType

func WithStorageType(storageType StorageType) Option

WithStorageType is used to indicate where a stream should be stored. If not set, the default StorageTypeFile will be used.

func WithSubjects

func WithSubjects(subjects ...string) Option

WithSubjects is used to indicate that a stream should receive events from the given subjects. At least one subject is required and wildcards are supported.

This option is mutually exclusive with [MirrorStream] and AggregateStreams.

Examples:

WithSubjects("orders.created", "orders.updated")
WithSubjects("orders.*")
WithSubjects("orders.>")

Subjects are case-sensitive and should only contain the following characters:

  • `a` to `z`, `A` to `Z` and `0` to `9` are allowed.

  • `_` and `-` are allowed for separating words, but the use of camelCase is recommended.

  • `.` is allowed and used as a hierarchy separator, such as `time.us.east` and `time.eu.sweden`, which share the `time` prefix.

  • `*` matches a single token, at any level of the subject. Such as `time.*.east` will match `time.us.east` and `time.eu.east` but not `time.us.west` or `time.us.central.east`. Similarly `time.us.*` will match `time.us.east` but not `time.us.east.atlanta`.

    The `*` wildcard can be used multiple times in a subject, such as `time.*.*` will match `time.us.east` and `time.eu.west` but not `time.us.east.atlanta`.

  • `>` matches one or more tokens at the tail of a subject, and can only be used as the last token. Such as `time.us.>` will match `time.us.east` and `time.us.east.atlanta` but not `time.eu.east`.

See NATS concepts: https://docs.nats.io/nats-concepts/subjects

type Options

type Options struct {
	// RetentionPolicy defines the policy for retaining events in the stream.
	RetentionPolicy RetentionPolicy

	// Source defines how this stream receives events.
	Source DataSource

	// Storage defines where this stream is stored.
	Storage Storage

	// DeduplicationWindow defines the window of time in which duplicate events
	// are discarded.
	DeduplicationWindow *time.Duration

	// MaxEventSize defines the maximum size of an event in bytes.
	MaxEventSize uint
}

type Pointer

type Pointer interface {
	// contains filtered or unexported methods
}

Pointer describes where to start receiving events from. See the functions AtStreamStart, AtStreamEnd, AtStreamOffset, and AtStreamTimestamp to create a pointer.

func AtStreamEnd

func AtStreamEnd() Pointer

AtStreamEnd returns a pointer that points at the end of the stream. Using this pointer means only new events are received.

func AtStreamOffset

func AtStreamOffset(id uint64) Pointer

AtStreamOffset returns a pointer that points at the given offset. The ID should be the ID of an event in the stream.

func AtStreamStart

func AtStreamStart() Pointer

AtStreamStart returns a pointer that points at the beginning of the stream.

func AtStreamTimestamp

func AtStreamTimestamp(timestamp time.Time) Pointer

AtStreamTimestamp returns a pointer that points at the given timestamp.

type PointerEnd

type PointerEnd struct{}

PointerEnd is a pointer that starts at the end of the stream.

type PointerOffset

type PointerOffset struct {
	ID uint64
}

PointerOffset is a pointer that starts at the given offset.

type PointerStart

type PointerStart struct{}

PointerStart is a pointer that starts at the beginning of the stream.

type PointerTimestamp

type PointerTimestamp struct {
	Timestamp time.Time
}

PointerTimestamp is a pointer that starts at the given timestamp.

type RetentionPolicy

type RetentionPolicy struct {
	// MaxAge defines the maximum age of events in the stream.
	MaxAge time.Duration
	// MaxEvents defines the maximum number of events in the stream.
	MaxEvents uint
	// MaxEventsPerSubject defines the maximum number of events per subject in
	// the stream.
	MaxEventsPerSubject uint
	// MaxBytes defines the maximum number of bytes in the stream.
	MaxBytes uint

	// DiscardPolicy defines the behavior when a limit on events is reached.
	DiscardPolicy DiscardPolicy
	// DiscardNewPerSubject defines whether to discard new events per subject
	// if the discard policy is set to [DiscardPolicyNew].
	DiscardNewPerSubject bool
}

RetentionPolicy defines the policy for retaining events in the stream.

type Storage

type Storage struct {
	// Type indicates where the stream should be stored. Defaults to
	// [StorageTypeFile] if not set.
	Type StorageType
	// Replicas indicates how many replicas of the stream should be stored. If
	// zero, the default value of 1 will be used.
	Replicas uint
}

Storage is used to define how to store a stream.

type StorageType

type StorageType string

StorageType indicates where the stream should be stored.

const (
	// StorageTypeDefault is the default storage type, which is equivalent to
	// [StorageTypeFile].
	StorageTypeDefault StorageType = ""
	// StorageTypeFile indicates that the stream should be stored in a file.
	// This is the default.
	StorageTypeFile StorageType = "file"
	// StorageTypeMemory indicates that the stream should be stored in memory.
	StorageTypeMemory StorageType = "memory"
)

type Stream

type Stream interface {
	// Name is the unique identifier of the stream.
	Name() string

	// RetentionPolicy defines the retention policy for the stream.
	RetentionPolicy() RetentionPolicy

	// Source defines how events are sourced into this stream.
	Source() DataSource

	// Storage defines the storage configuration for the stream.
	Storage() Storage

	// DeduplicationWindow defines the window of time in which duplicate events
	// are discarded.
	DeduplicationWindow() time.Duration

	// MaxEventSize defines the maximum size of an event in bytes.
	MaxEventSize() uint
}

Stream contains information about a defined stream.

type StreamSource

type StreamSource struct {
	// Name of the stream to receive events from.
	Name string
	// Pointer is a pointer where to start receiving events from. If nil, events
	// will be received from the beginning of the stream.
	Pointer Pointer
	// FilterSubjects is a list of subjects to filter events by. If empty, all
	// subjects will be received.
	//
	// See [WithSubjects] for more information on subjects.
	FilterSubjects []string
}

StreamSource is used to define how to source events from a given stream. It can be used to specify where events are to be received from, and optionally filter the subjects that are received.

func CopyFromStream

func CopyFromStream(name string) *StreamSource

CopyFromStream creates a new StreamSource that will receive events from the given stream.

func CopyFromStreamAt

func CopyFromStreamAt(name string, pointer Pointer) *StreamSource

CopyFromStreamAt creates a new StreamSource that will receive events from the given stream, starting at the given Pointer.

func CopyFromStreamAtWithSubjects

func CopyFromStreamAtWithSubjects(name string, pointer Pointer, subjects ...string) *StreamSource

CopyFromStreamAtWithSubjects creates a new StreamSource that will receive events from the given stream, starting at the given Pointer and filtering by the given subjects.

See WithSubjects for more information on subjects.

func CopyFromStreamWithSubjects

func CopyFromStreamWithSubjects(name string, subjects ...string) *StreamSource

CopyFromStreamWithSubjects creates a new StreamSource that will receive events from the given stream, filtering by the given subjects.

See WithSubjects for more information on subjects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL