rblob

package
v0.0.0-...-05f78b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package rblob leverages the gocloud.dev/blob package and provides a reflex stream for events persisted in a bucket of strictly ordered append-only log of flat files.

rblob provides at-least-once delivery semantics ONLY in the following conditions:

  • Blobs (files) in the bucket must a strictly ordered append-only log. A new blob must be become available for reading as the last blob in the log. This ensures blobs are never skipped.
  • Blobs must be immutable, they may not be modified or deleted prematurely. This ensures a consistent ordered log.

This is most commonly achieved by a single writer that a) writes blobs ordered (named) by timestamp and b) writes blobs slow enough that they become available for reading in order they are written. The resulting bucket of an AWS Kineses Firehouse is a perfect example.

Index

Constants

This section is empty.

Variables

View Source
var JSONDecoder = func(r io.Reader) (Decoder, error) {
	return &jsonDecoder{
		decoder: json.NewDecoder(r),
	}, nil
}

JSONDecoder is the default decoder function that decodes blobs into raw json byte slices.

Functions

This section is empty.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket defines a bucket from which to stream the content of consecutive blobs as events.

func NewBucket

func NewBucket(label string, bucket *blob.Bucket, opts ...Option) *Bucket

NewBucket returns a bucket using the provided underlying bucket.

func OpenBucket

func OpenBucket(ctx context.Context, label, urlstr string,
	opts ...Option,
) (*Bucket, error)

OpenBucket opens and returns a bucket for the provided url.

label defines the bucket label used for metrics.

urlstr defines the url of the blob bucket. See the gocloud URLOpener documentation in driver subpackages for details on supported URL formats. Also see https://gocloud.dev/concepts/urls/ and https://gocloud.dev/howto/blob/.

func (*Bucket) Close

func (b *Bucket) Close() error

Close releases any resources used by the underlying bucket.

func (*Bucket) Stream

func (b *Bucket) Stream(ctx context.Context, after string,
	opts ...reflex.StreamOption,
) (reflex.StreamClient, error)

Stream implements reflex.StreamFunc and returns a StreamClient that streams events from bucket blobs after the provided cursor. Stream is safe to call from multiple goroutines, but the returned StreamClient is only safe for a single goroutine to use.

Note: The returned StreamClient implementation also exposes a Close method which releases underlying resources. Close is called internally when Recv returns an error.

type Decoder

type Decoder interface {
	// Decode returns the next non-empty byte slice or an error. It returns io.EOF if no more
	// are available.
	Decode() ([]byte, error)
}

Decoder decodes a blob into event byte slices (usually DTOs) which are streamed as event metadata.

type Option

type Option func(*Bucket)

Option is a functional option that configures a bucket.

func WithBackoff

func WithBackoff(d time.Duration) Option

WithBackoff returns an option to configure the backoff duration before querying the underlying bucket for new blobs. It defaults to one minute.

func WithDecoder

func WithDecoder(fn func(io.Reader) (Decoder, error)) Option

WithDecoder returns an option to configure the blob content decoder function. It defaults to the JSONDecoder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL