events

package
v0.0.0-...-87a4e12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrConsumerRequired = NewValidationError("consumer is required")

ErrConsumerRequired is used when a consumer is required but not provided.

View Source
var ErrDataRequired = errors.New("data is required")

ErrDataRequired is used when data is required but not provided.

View Source
var ErrPublishTimeout = errors.New("publish timeout")

ErrPublishTimeout is used when publishing of an event does not meet the deadline.

View Source
var ErrStreamRequired = NewValidationError("stream is required")

ErrStreamRequired is used when a stream is required but not provided.

View Source
var ErrSubjectRequired = NewValidationError("subject is required")

ErrSubjectRequired is used when a name is required but not provided.

View Source
var ErrSubjectsRequired = NewValidationError("one or more subjects are required")

ErrSubjectsRequired is used when subjects are required but not provided.

View Source
var ErrUnboundSubject = errors.New("unbound subject, no stream found for subject")

ErrUnboundSubject is used when a subject is not bound to a stream.

View Source
var ErrWrongSequence = errors.New("wrong sequence")

ErrWrongSequence is used when the expected sequence number does not match the actual sequence number.

Functions

func IsValidConsumerName

func IsValidConsumerName(name string) bool

IsValidConsumerName checks if the consumer name is valid.

NATS provides the guidance that durable names of consumers cannot contain whitespace, `.`, `*`, `>`, path separators (forward or backwards slash), and non-printable characters.

In Windshift consumer names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.

Empty consumer names are not allowed.

func IsValidStreamName

func IsValidStreamName(name string) bool

IsValidStreamName checks if the stream name is valid.

NATS provides the guidance that stream names cannot contain whitespace, `.`, `*`, `>“, path separators (forward or backwards slash), and non-printable characters.

In Windshift stream names are aligned with subject names and allow only the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, and `-`.

Empty stream names are not allowed.

func IsValidSubject

func IsValidSubject(subject string, allowWildcards bool) bool

IsValidSubject checks if the subject is valid, either allowing or disallowing wildcards.

NATS recommends ASCII characters for subjects, but does not enforce it. In Windshift subjects are limited to the characters `a`-`z`, `A`-`Z`, `0`-`9`, `_`, `-`, and `.`.

If wildcards are allowed, then `*` and `>` are also allowed. `*` can be used to match a single token, and `>` can be used to match one or more tokens but only at the end of the subject.

Period is a special character that indicates a subject hierarchy. We validate that the subject does not start or end with a period, and that periods are not adjacent.

Empty subjects are not allowed.

func IsValidationError

func IsValidationError(err error) bool

func NewValidationError

func NewValidationError(err string) error

NewValidationError creates a new validation error.

Types

type AckOption

type AckOption interface {
	// contains filtered or unexported methods
}

type AckOptions

type AckOptions struct {
	Backoff delays.DelayDecider
}

func (*AckOptions) Apply

func (o *AckOptions) Apply(opts []AckOption)

type CallOption

type CallOption interface {
	AckOption
	RejectOption
	PingOption
}

func WithBackoff

func WithBackoff(decider delays.DelayDecider) CallOption

WithBackoff sets the backoff strategy to use when retrying an operation.

The default retry strategy for acknowledging, rejecting and pinging events is to retry after 10 milliseconds, with a maximum total time of 5 seconds.

Example:

event.Ack(ctx, events.WithBackoff(
  delays.StopAfterMaxTime(delays.Exponential(10*time.Millisecond, 2), 10*time.Second),
))

func WithNoRetry

func WithNoRetry() CallOption

WithNoRetry disables retrying an operation.

type Client

type Client interface {
	// EnsureStream creates or updates a stream with the given name.
	//
	// Streams are collections of events that can later be consumed, they can
	// source events from subjects and other streams.
	EnsureStream(ctx context.Context, name string, opts ...streams.Option) (streams.Stream, error)

	// Publish an event to a stream.
	Publish(ctx context.Context, event *OutgoingEvent) (PublishedEvent, error)

	// EnsureConsumer creates or updates a consumer for a given stream. There
	// are two types of consumers, durable and ephemeral.
	//
	// Durable consumers are created when a name is provided via
	// [consumers.WithName]. Durable consumers can be subscribed to by several
	// clients to distribute the load of processing events.
	//
	// Ephemeral consumers are created when no name is provided. An ephemeral
	// consumer will have an auto-generated name that is returned in the
	// response - this can be used when subscribing to it. If an ephemeral
	// consumer is unused for a period of time, an hour by default, it will be
	// automatically deleted.
	//
	// Filtering of what subjects a consumer should receive events from can be
	// done via [consumers.WithSubjects].
	//
	// [consumers.WithConsumeFrom] can be used to control which events the
	// consumer should receive, such as starting from the beginning of the stream
	// or from a specific event id.
	//
	// To subscribe to the events from a consumer, use [Client.Subscribe].
	EnsureConsumer(ctx context.Context, stream string, opts ...consumers.Option) (consumers.Consumer, error)

	// Subscribe starts consuming events from the given stream. The consumer
	// must have been created before calling this method, use [Client.EnsureConsumer]
	// to create a consumer.
	//
	// Subscriptions are valid until the context is canceled.
	//
	// To control the number of events that can be processed concurrently, use
	// [subscribe.MaxProcessingEvents].
	Subscribe(ctx context.Context, stream string, consumer string, opts ...subscribe.Option) (<-chan Event, error)
}

Client is used to interact with events.

type DataInvalidError

type DataInvalidError struct {
	Err error
}

DataInvalidError represents an error where the data is invalid.

func (*DataInvalidError) Error

func (e *DataInvalidError) Error() string

func (*DataInvalidError) Is

func (e *DataInvalidError) Is(target error) bool

func (*DataInvalidError) Unwrap

func (e *DataInvalidError) Unwrap() error

type Event

type Event interface {
	// Context is the context that the event was created with. This context
	// will carry OpenTelemetry tracing information.
	Context() context.Context

	// ID is the identifier of the event.
	ID() uint64

	// Subject contains the subject the event was published to.
	Subject() string

	// DeliveryAttempt returns the number of times the event has been
	// delivered. The first delivery attempt will return 1.
	DeliveryAttempt() uint

	// Headers contain static information about the event, such as when it was
	// published.
	Headers() Headers

	// UnmarshalNew unmarshals the data of the event into a new instance of
	// the correct type. The type must be imported into your code before
	// calling this method.
	//
	// Example:
	//
	//   import "path/to/your/proto/messages"
	//
	//   data, err := event.UnmarshalNew()
	//
	// Use [Event.UnmarshalTo] if you want to unmarshal into an existing instance.
	UnmarshalNew() (proto.Message, error)

	// UnmarshalTo unmarshals the data of the event into the provided
	// instance. The instance must be a pointer to the correct type.
	//
	// Example:
	//
	//   import "path/to/your/proto/messages"
	//
	//   var data messages.YourMessageType
	//   if err := event.UnmarshalNew(&data); err != nil {
	//     return err
	//   }
	//
	// Use [Event.UnmarshalNew] if you want to unmarshal into a new instance.
	UnmarshalTo(v proto.Message) error

	// Ack acknowledges the event, indicating that it was processed
	// successfully.
	Ack(ctx context.Context, opts ...AckOption) error

	// Reject rejects the event, indicating that it was not processed
	// successfully. Depending on the options passed the event may be
	// redelivered.
	//
	// Options may be passed to control how the event is rejected, such as
	// how long to wait before redelivering the event or to indicate that
	// the event should not be redelivered.
	//
	// Examples:
	//
	//   event.Reject(ctx, events.WithRedeliverDelay(5 * time.Second))
	//   event.Reject(ctx, events.Permanently())
	Reject(ctx context.Context, opts ...RejectOption) error

	// Ping indicates that the event is still being processed and that the
	// event should not be redelivered. This is useful for long running
	// processes.
	Ping(ctx context.Context, opts ...PingOption) error
}

Event is a received event that should be processed. Events must be acknowledged using [Event.Ack] or rejected using [Event.Reject]. If the event is not acknowledged or rejected within the time frame set by the consumer configuration the event will be redelivered.

During processing the event can be pinged using [Event.Ping] to indicate that the event is still being processed and that the event should not be redelivered.

Events include a [Event.Context] that contains OpenTelemetry tracing information. This context should be used when creating new spans to ensure that the spans are correctly linked to the event.

Information about the event, such as when it was published, the redelivery attempt etc, can be found in [Event.Headers].

type Headers

type Headers interface {
	// OccurredAt returns the time that the event occurred.
	OccurredAt() time.Time

	// IDempotencyKey returns the idempotency key of the event. Will be an
	// empty string if the event was not published with an idempotency key.
	IdempotencyKey() string
}

Headers contains information about an event.

type OutgoingEvent

type OutgoingEvent struct {
	// Subject of the event, used to route the event to the correct stream and
	// by consumers to filter events. If no stream exists that can handle the
	// subject then the publish will fail.
	//
	// Can not be blank.
	Subject string
	// Data is the data of the message, can not be nil. Will be marshaled
	// into a [anypb.Any] instance. If the message is already an [anypb.Any]
	// instance then it will be used as is.
	Data proto.Message
	// Timestamp is an optional time when the event occurred, if not set the
	// current time will be used.
	Timestamp time.Time
	// IdempotencyKey is used to deduplicate events, setting this to the same
	// value as a previous event will disable the new event from being
	// published for a certain period of time.
	//
	// The time period in which deduplication occurs is controlled by
	// the stream configuration.
	//
	// Leave blank to use no deduplication.
	IdempotencyKey string
	// ExpectedLastID is the last id that the stream is expected to have
	// received, if the stream has not received this id then the event will
	// not be published. This can be used for optimistic concurrency control.
	ExpectedLastID *uint64
}

OutgoingEvent describes an event that is to be published to a stream.

type PingOption

type PingOption interface {
	// contains filtered or unexported methods
}

type PingOptions

type PingOptions struct {
	Backoff delays.DelayDecider
}

func (*PingOptions) Apply

func (o *PingOptions) Apply(opts []PingOption)

type PublishedEvent

type PublishedEvent interface {
	// ID of the event, this is the id that the stream assigned to the event.
	// This will also be the [Event.ID] when the event is consumed.
	//
	// See [OutgoingEvent.ExpectedLastID] to use this for optimistic concurrency
	// control.
	ID() uint64
}

PublishedEvent contains information about an event that has been published to a stream.

type RejectOption

type RejectOption interface {
	// contains filtered or unexported methods
}

RejectOption is an option that can be passed to [Reject] to control how the event is rejected.

func Permanently

func Permanently() RejectOption

Permanently indicates that the event should not be redelivered. Use this option when the event is invalid and will never be valid, meaning the processing of the event will never succeed.

func WithRedeliveryDecider

func WithRedeliveryDecider(decider func(Event) time.Duration) RejectOption

WithRedeliveryDecider indicates that the event might be redelivered after a certain time based on what the decider returns:

  • A negative duration indicates a permanent rejection.
  • A zero duration indicates that the event should be redelivered according to the consumer defaults.
  • A positive duration indicates that the event should be redelivered after the specified duration.

func WithRedeliveryDelay

func WithRedeliveryDelay(delay time.Duration) RejectOption

WithRedeliveryDelay indicates that the event should be redelivered after the specified delay. This can be used to control how long to wait in case of a temporary error.

type RejectOptions

type RejectOptions struct {
	Backoff delays.DelayDecider

	RejectPermanently bool
	Delay             time.Duration
	RedeliveryDecider func(Event) time.Duration
}

func (*RejectOptions) Apply

func (o *RejectOptions) Apply(opts []RejectOption)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL