Documentation ¶
Index ¶
- type DataSource
- type DataSourceAggregate
- type DataSourceSubjects
- type DiscardPolicy
- type Option
- func AggregateStreams(sources ...*StreamSource) Option
- func WithDeduplicationWindow(deduplicationWindow time.Duration) Option
- func WithDiscardNewPerSubject(discardNewPerSubject bool) Option
- func WithDiscardPolicy(discardPolicy DiscardPolicy) Option
- func WithMaxAge(maxAge time.Duration) Option
- func WithMaxBytes(maxBytes uint) Option
- func WithMaxEventSize(maxEventSize uint) Option
- func WithMaxEvents(maxEvents uint) Option
- func WithMaxEventsPerSubject(maxEventsPerSubject uint) Option
- func WithStorageReplicas(replicas uint) Option
- func WithStorageType(storageType StorageType) Option
- func WithSubjects(subjects ...string) Option
- type Options
- type Pointer
- type PointerEnd
- type PointerOffset
- type PointerStart
- type PointerTimestamp
- type RetentionPolicy
- type Storage
- type StorageType
- type Stream
- type StreamSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataSource ¶
type DataSource interface {
// contains filtered or unexported methods
}
DataSource is a source of events for a stream. It can be a list of subjects, a mirror of another stream, or an aggregate of multiple streams.
type DataSourceAggregate ¶
type DataSourceAggregate struct {
Sources []*StreamSource
}
DataSourceAggregate is used to indicate that a stream is an aggregate of multiple streams.
type DataSourceSubjects ¶
type DataSourceSubjects struct {
Subjects []string
}
DataSourceSubjects is a list of subjects that will be used as a source of events for a stream.
type DiscardPolicy ¶
type DiscardPolicy string
DiscardPolicy defines the behavior when a limit on events is reached. The default behavior is to discard old events, but some applications may prefer to discard new events instead.
const ( // DiscardPolicyDefault is the default discard policy, which is equivalent // to [DiscardPolicyOld]. DiscardPolicyDefault DiscardPolicy = "" // DiscardPolicyOld discards old events when a limit is reached. DiscardPolicyOld DiscardPolicy = "old" // DiscardPolicyNew discards new events when a limit is reached. DiscardPolicyNew DiscardPolicy = "new" )
type Option ¶
Option defines an option for configuring a stream.
func AggregateStreams ¶
func AggregateStreams(sources ...*StreamSource) Option
AggregateStreams is used to indicate that a stream is an aggregate of multiple streams. At least one stream source is required.
This option is mutually exclusive with WithSubjects and [MirrorStream].
func WithDeduplicationWindow ¶
WithDeduplicationWindow is used to indicate how long events should be deduplicated for.
Defaults to 2 minutes if not set.
func WithDiscardNewPerSubject ¶
WithDiscardNewPerSubject sets whether to discard new events per subject when a limit is reached. The default is false.
Use with WithDiscardPolicy set to DiscardPolicyNew.
func WithDiscardPolicy ¶
func WithDiscardPolicy(discardPolicy DiscardPolicy) Option
WithDiscardPolicy sets the discard policy to use when a limit is reached. The default is to discard old events.
If this is set to DiscardPolicyNew, use WithDiscardNewPerSubject to control whether to discard new events per subject.
func WithMaxAge ¶
WithMaxAge sets the maximum age of events in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.
func WithMaxBytes ¶
WithMaxBytes sets the maximum number of bytes in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.
func WithMaxEventSize ¶
WithMaxEventSize is used to indicate the maximum size of an event. Can not be larger than 1 MiB.
Defaults to 1 MiB if not set.
func WithMaxEvents ¶
WithMaxEvents sets the maximum number of events in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.
func WithMaxEventsPerSubject ¶
WithMaxEventsPerSubject sets the maximum number of events per subject in the stream. Use WithDiscardPolicy to change the behavior when this limit is reached.
func WithStorageReplicas ¶
WithStorageReplicas is used to indicate how many replicas of a stream should be stored. If not set, the default value of 1 will be used.
func WithStorageType ¶
func WithStorageType(storageType StorageType) Option
WithStorageType is used to indicate where a stream should be stored. If not set, the default StorageTypeFile will be used.
func WithSubjects ¶
WithSubjects is used to indicate that a stream should receive events from the given subjects. At least one subject is required and wildcards are supported.
This option is mutually exclusive with [MirrorStream] and AggregateStreams.
Examples:
WithSubjects("orders.created", "orders.updated") WithSubjects("orders.*") WithSubjects("orders.>")
Subjects are case-sensitive and should only contain the following characters:
`a` to `z`, `A` to `Z` and `0` to `9` are allowed.
`_` and `-` are allowed for separating words, but the use of camelCase is recommended.
`.` is allowed and used as a hierarchy separator, such as `time.us.east` and `time.eu.sweden`, which share the `time` prefix.
`*` matches a single token, at any level of the subject. Such as `time.*.east` will match `time.us.east` and `time.eu.east` but not `time.us.west` or `time.us.central.east`. Similarly `time.us.*` will match `time.us.east` but not `time.us.east.atlanta`.
The `*` wildcard can be used multiple times in a subject, such as `time.*.*` will match `time.us.east` and `time.eu.west` but not `time.us.east.atlanta`.
`>` matches one or more tokens at the tail of a subject, and can only be used as the last token. Such as `time.us.>` will match `time.us.east` and `time.us.east.atlanta` but not `time.eu.east`.
See NATS concepts: https://docs.nats.io/nats-concepts/subjects
type Options ¶
type Options struct { // RetentionPolicy defines the policy for retaining events in the stream. RetentionPolicy RetentionPolicy // Source defines how this stream receives events. Source DataSource // Storage defines where this stream is stored. Storage Storage // DeduplicationWindow defines the window of time in which duplicate events // are discarded. DeduplicationWindow *time.Duration // MaxEventSize defines the maximum size of an event in bytes. MaxEventSize uint }
type Pointer ¶
type Pointer interface {
// contains filtered or unexported methods
}
Pointer describes where to start receiving events from. See the functions AtStreamStart, AtStreamEnd, AtStreamOffset, and AtStreamTimestamp to create a pointer.
func AtStreamEnd ¶
func AtStreamEnd() Pointer
AtStreamEnd returns a pointer that points at the end of the stream. Using this pointer means only new events are received.
func AtStreamOffset ¶
AtStreamOffset returns a pointer that points at the given offset. The ID should be the ID of an event in the stream.
func AtStreamStart ¶
func AtStreamStart() Pointer
AtStreamStart returns a pointer that points at the beginning of the stream.
func AtStreamTimestamp ¶
AtStreamTimestamp returns a pointer that points at the given timestamp.
type PointerEnd ¶
type PointerEnd struct{}
PointerEnd is a pointer that starts at the end of the stream.
type PointerOffset ¶
type PointerOffset struct {
ID uint64
}
PointerOffset is a pointer that starts at the given offset.
type PointerStart ¶
type PointerStart struct{}
PointerStart is a pointer that starts at the beginning of the stream.
type PointerTimestamp ¶
PointerTimestamp is a pointer that starts at the given timestamp.
type RetentionPolicy ¶
type RetentionPolicy struct { // MaxAge defines the maximum age of events in the stream. MaxAge time.Duration // MaxEvents defines the maximum number of events in the stream. MaxEvents uint // MaxEventsPerSubject defines the maximum number of events per subject in // the stream. MaxEventsPerSubject uint // MaxBytes defines the maximum number of bytes in the stream. MaxBytes uint // DiscardPolicy defines the behavior when a limit on events is reached. DiscardPolicy DiscardPolicy // DiscardNewPerSubject defines whether to discard new events per subject // if the discard policy is set to [DiscardPolicyNew]. DiscardNewPerSubject bool }
RetentionPolicy defines the policy for retaining events in the stream.
type Storage ¶
type Storage struct { // Type indicates where the stream should be stored. Defaults to // [StorageTypeFile] if not set. Type StorageType // Replicas indicates how many replicas of the stream should be stored. If // zero, the default value of 1 will be used. Replicas uint }
Storage is used to define how to store a stream.
type StorageType ¶
type StorageType string
StorageType indicates where the stream should be stored.
const ( // StorageTypeDefault is the default storage type, which is equivalent to // [StorageTypeFile]. StorageTypeDefault StorageType = "" // StorageTypeFile indicates that the stream should be stored in a file. // This is the default. StorageTypeFile StorageType = "file" // StorageTypeMemory indicates that the stream should be stored in memory. StorageTypeMemory StorageType = "memory" )
type Stream ¶
type Stream interface { // Name is the unique identifier of the stream. Name() string // RetentionPolicy defines the retention policy for the stream. RetentionPolicy() RetentionPolicy // Source defines how events are sourced into this stream. Source() DataSource // Storage defines the storage configuration for the stream. Storage() Storage // DeduplicationWindow defines the window of time in which duplicate events // are discarded. DeduplicationWindow() time.Duration // MaxEventSize defines the maximum size of an event in bytes. MaxEventSize() uint }
Stream contains information about a defined stream.
type StreamSource ¶
type StreamSource struct { // Name of the stream to receive events from. Name string // Pointer is a pointer where to start receiving events from. If nil, events // will be received from the beginning of the stream. Pointer Pointer // FilterSubjects is a list of subjects to filter events by. If empty, all // subjects will be received. // // See [WithSubjects] for more information on subjects. FilterSubjects []string }
StreamSource is used to define how to source events from a given stream. It can be used to specify where events are to be received from, and optionally filter the subjects that are received.
func CopyFromStream ¶
func CopyFromStream(name string) *StreamSource
CopyFromStream creates a new StreamSource that will receive events from the given stream.
func CopyFromStreamAt ¶
func CopyFromStreamAt(name string, pointer Pointer) *StreamSource
CopyFromStreamAt creates a new StreamSource that will receive events from the given stream, starting at the given Pointer.
func CopyFromStreamAtWithSubjects ¶
func CopyFromStreamAtWithSubjects(name string, pointer Pointer, subjects ...string) *StreamSource
CopyFromStreamAtWithSubjects creates a new StreamSource that will receive events from the given stream, starting at the given Pointer and filtering by the given subjects.
See WithSubjects for more information on subjects.
func CopyFromStreamWithSubjects ¶
func CopyFromStreamWithSubjects(name string, subjects ...string) *StreamSource
CopyFromStreamWithSubjects creates a new StreamSource that will receive events from the given stream, filtering by the given subjects.
See WithSubjects for more information on subjects.