jetcapture

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

NATS JetStream Capture

Note: requires Go 1.19 or later

Overview

jetcapture is a library for building reliable and continuous NATS JetStream backup processes.

  • Decode -- take the incoming NATS message and deserialize it into something concrete
  • Split -- optionally split the incoming message stream and group the messages by one or more user-defined attributes
  • Serialize -- write decoded messages to group-specific "blocks" using a provided serialization method (e.g. CSV, Parquet, raw-bytes)
  • Store -- copy the completed blocks to a user-provided storage endpoint (e.g. local folders, Azure, etc.)

For example, you can take a stream of JSON pizza orders, group them by pizza_store_id, and write them out as flattened CSV in 15 minute blocks with a separate location (e.g. folder, or S3 bucket) for each store.

jetcapture uses a pull consumer which means horizontal scalability is built in. Just add more instances to increase throughput.

Internal Data Flow
  1. jetcapture begins pulling messages from a named consumer and stream
  2. For each message:
    1. Pass the raw NATS message into the user-provided decoder, returning a typed struct and an optional "destination key"
    2. Find a corresponding "block" using the destination key and the message timestamp (truncated to the storage interval)
    3. Call the user-provided serialize function and write the decoded message into a block-specific temporary buffer
    4. Cache the message ack information for later
  3. After each storage interval has passed, for each block:
    1. Call the user-provided store function to persist the block to a permanent storage location
    2. Assuming storage of the block succeeded, "ack" all the messages in the block

Custom Implementation Requirements

Note: jetcapture uses Go generics to enable strongly typed callback implementations

  1. Define your Payload P and DestKey K types
  2. Implement a MessageDecoder that takes a *nats.Msg and returns a decoded message of type P and a "destination key" of type K
  3. Implement a FormattedDataWriter[P Payload] which takes a payload P "writes" it to an underlying io.Writer. Or, use a helper writer like CSVWriter[P Payload] or NewLineDelimitedJSON[P Payload]
  4. Implement a BlockStore[K DestKey] which can write out the finalized "block" (exposed as io.Reader). Or, use a helper like LocalFSStore[K DestKey] or AzureBlobStore[K DestKey]
  5. Create a typed jetcapture.Options[P, K] instance with options set
  6. Connect to a NATS server
  7. Call options.Build().Run(ctx, natsConn)

For a full example see the sample application that takes incoming NATS messages, encodes the entire message itself as JSON, and writes it out using newline-delimited JSON.

For an example of a custom decoder (which most libary users will need), see the example below

Types
// Payload is a type that represents your deserialized NATS message
type Payload interface {
	any
}

// DestKey is a type that represents how you want to group (i.e. split) your messages.
type DestKey interface {
	comparable
}

Example

// ExamplePayload is our explicit struct for the NATS messages
type ExamplePayload struct {
	FirstName string `json:"first_name"`
	LastName  string `json:"last_name"`
	Region    string `json:"region"`
}

// ExampleDestKey is just a simple string alias
type ExampleDestKey = string

// JSONToLocalFsCSV is an example configuration that will decode JSON messages that sent over NATS, write them out into
// CSV files, and group the output into `region` specific folder on the local file system.
//
// Use a pointer to an ExamplePayload as the Payload type parameter and ExampleDestKey as the DestKey type parameter
var JSONToLocalFsCSV = &jetcapture.Options[*ExamplePayload, ExampleDestKey]{
	Compression: jetcapture.GZip, // use gzip compression
	Suffix:      "csv",           // suffix will end up being `.csv.gz`
	MaxAge:      time.Hour,       // messages will be written once an hour

	// configure the decoder
	// the incoming NATS messages contain a JSON string which we will decode
	// we also need to return a `DestKey` which we've defined to by a string
	// this key returned is the _region_ field of the decoded message
	MessageDecoder: func(msg *nats.Msg) (*ExamplePayload, ExampleDestKey, error) {
		var p ExamplePayload
		if err := json.Unmarshal(msg.Data, &p); err != nil {
			return nil, "", err
		}
		return &p, p.Region, nil
	},

	// use the jetcapture.NewCSVWriter helper
	// we need to specify the headers, and a function that will "flatten" the payload
	// into one or more CSV rows
	WriterFactory: func() jetcapture.FormattedDataWriter[*ExamplePayload] {
		return jetcapture.NewCSVWriter(
			[]string{"first_name", "last_name", "region"},
			func(p *ExamplePayload) ([][]string, error) {
				return [][]string{{
					p.FirstName,
					p.LastName,
					p.Region,
				}}, nil
			},
		)
	},

	// use the jetcapture.LocalFSStore helper
	// we need to provide a `Resolver` that returns a filesystem path using the destination key
	// the path will use the `region` field to group output
	Store: &jetcapture.LocalFSStore[ExampleDestKey]{
		Resolver: func(dk ExampleDestKey) (string, error) {
			return filepath.Join("backup", dk), nil
		},
	},
}

TODO

  • Decide on explicit nack strategy where possible
  • Add S3 store example
  • Stats export
  • Add DrainTimeout for Capture.sweepBlocks. Right now a canceled context (e.g. CTRL-C) triggers a final sweep. However, for calls that take a context during a BlockStore.Write call (e.g. Azure blob store), the call will often be short-circuited. A separate drain/sweep context should be created with a timeout.
  • Add better logging configuration/interface
  • Add support for checking outstanding acks and warning if near or at limit
  • Investigate a Go routine pool for BlockStore.Write (current code blocks during the write phase)
  • Output filenames need some more thought

Credits

  • Jonathan Camp @intelecy

Documentation

Index

Constants

View Source
const (
	None   Compression = "none"
	GZip               = "gzip"
	Snappy             = "snappy"
)
View Source
const (
	DefaultMaxAge = time.Minute * 15
)

Variables

This section is empty.

Functions

func NatsToNats

func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)

func NewAppSkeleton

func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App

NewAppSkeleton returns an opinionated, partially filled out cli app struct. The caller is responsible for setting any additional app properties (i.e. name), adding cli flags, and completing the options struct (via the setup callback) See /apps/simple/main.go for an example

func SetDefaultLogger

func SetDefaultLogger(newLogger *zap.Logger)

func SubjectToDestKey

func SubjectToDestKey(msg *nats.Msg) string

Types

type AzureBlobStore

type AzureBlobStore[K DestKey] struct {
	// contains filtered or unexported fields
}

func NewAzureBlobStore

func NewAzureBlobStore[K DestKey](
	credential azcore.TokenCredential,
	buildURLBaseFn BuildURLBase[K],
	optionsFn OverrideUploadOptions[K],

) (*AzureBlobStore[K], error)

func (*AzureBlobStore[K]) Write

func (a *AzureBlobStore[K]) Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, int64, time.Duration, error)

type BlockStore

type BlockStore[K DestKey] interface {
	// Write takes a block and writes it out to a "final" destination as specified by the deskKey, dir and file name.
	// returns a stringified version of the destination, number of bytes written, and the duration
	// TODO(jonathan): should the block be an io.ReadCloser?
	Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, int64, time.Duration, error)
}

func SingleDirStore

func SingleDirStore[K DestKey](path string) BlockStore[K]

type BuildURLBase

type BuildURLBase[K DestKey] func(ctx context.Context, destKey K) (string, error)

BuildURLBase should return a URL that serves as the base for the block For example: https://capture.blob.core.windows.net/backup/from-stream-foo/

type CSVWriter

type CSVWriter[P Payload] struct {
	// contains filtered or unexported fields
}

func (*CSVWriter[P]) Flush

func (c *CSVWriter[P]) Flush() error

func (*CSVWriter[P]) InitNew

func (c *CSVWriter[P]) InitNew(out io.Writer) error

func (*CSVWriter[P]) Write

func (c *CSVWriter[P]) Write(m P) (int, error)

type Capture

type Capture[P Payload, K DestKey] struct {
	// contains filtered or unexported fields
}

func New

func New[P Payload, K DestKey](opts Options[P, K]) *Capture[P, K]

func (*Capture[P, K]) Run

func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) (err error)

type Compression

type Compression string

type DestKey

type DestKey interface {
	comparable
}

type FormattedDataWriter

type FormattedDataWriter[P Payload] interface {
	// InitNew is called by jetcapture passing in an `io.Writer` that is the underlying temporary storage for this block
	// The implementation is expected to keep a reference to it and use it during the `Write()` calls
	InitNew(out io.Writer) error

	// Write should write the payload internally and eventually the underlying `io.Writer`
	// Buffering data/writes is fine as long as a call to `Flush` ensures that everything is written
	Write(payload P) (int, error)

	// Flush is called after each block of messages is processed. Can be a nop depending on the implementation.
	Flush() error
}

FormattedDataWriter specifies a method for writing a `Payload` to an `io.Writer` It will be used from a "factory" function. Meaning a new writer will be created for each block

func NewCSVWriter

func NewCSVWriter[P Payload](
	header []string,
	flattenFn func(payload P) ([][]string, error),
) FormattedDataWriter[P]

type LocalFSStore

type LocalFSStore[K DestKey] struct {
	Resolver func(destKey K) (string, error)
}

func (*LocalFSStore[K]) Write

func (f *LocalFSStore[K]) Write(_ context.Context, block io.Reader, destKey K, dir, fileName string) (string, int64, time.Duration, error)

type NatsMessage

type NatsMessage struct {
	Subject  string              `json:"subject"`
	Reply    string              `json:"reply"` // TODO(jonathan): should reply be included?
	Header   map[string][]string `json:"header"`
	Data     []byte              `json:"data"`
	Metadata *nats.MsgMetadata   `json:"metadata"`
}

type NewLineDelimitedJSON

type NewLineDelimitedJSON[P Payload] struct {
	// contains filtered or unexported fields
}

func (*NewLineDelimitedJSON[P]) Flush

func (j *NewLineDelimitedJSON[P]) Flush() error

func (*NewLineDelimitedJSON[P]) InitNew

func (j *NewLineDelimitedJSON[P]) InitNew(out io.Writer) error

func (*NewLineDelimitedJSON[P]) Write

func (j *NewLineDelimitedJSON[P]) Write(m P) (int, error)

type Options

type Options[P Payload, K DestKey] struct {
	NATSStreamName   string        // which stream should jetcapture bind to
	NATSConsumerName string        // which consumer should jetcapture bind to
	Compression      Compression   // apply compression to the resulting files
	Suffix           string        // add a suffix
	BufferToDisk     bool          // should jetcapture buffer to disk using temp files, or keep blocks in memory
	MaxAge           time.Duration // what is the max duration for a single block
	MaxMessages      int           // rough limit to the number of messages in a block before a new one is created
	TempDir          string        // override the default OS temp dir

	MessageDecoder  func(*nats.Msg) (P, K, error)
	WriterFactory   func() FormattedDataWriter[P]
	Store           BlockStore[K]
	OnStoreComplete func(K, string, int64, time.Duration, error) // optional callback for metrics capture
}

func DefaultOptions

func DefaultOptions[P Payload, K DestKey]() *Options[P, K]

func (*Options[P, K]) Build

func (o *Options[P, K]) Build() *Capture[P, K]

func (*Options[P, K]) Validate

func (o *Options[P, K]) Validate() error

type OverrideUploadOptions added in v0.2.4

type OverrideUploadOptions[K DestKey] func(options *azblob.UploadStreamOptions, destKey K)

OverrideUploadOptions is an optional function to override various upload option (e.g. AccessTier)

type Payload

type Payload interface {
	any
}

Directories

Path Synopsis
apps

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL