jetcapture

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2022 License: Apache-2.0 Imports: 24 Imported by: 0

README

NATS JetStream Capture

Implementation Requirements

Types
// Payload
type Payload interface {
	any
}

// DestKey is a type that represents how you want to group (i.e. bucket) your messages.
type DestKey interface {
	comparable
}
Implement
type FormattedDataWriter[P Payload] interface {
	InitNew(out io.Writer) error
	Write(payload P) (int, error)
	Flush() error
}

type BlockStore[K DestKey] interface {
	// Write takes a block and writes it out to a "final" destination as specified
	// by the deskKey, dir and file name. returns a stringified version of the
	// destination
	Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, error)
}

type Options[P Payload, K DestKey] struct {
	...
	MessageDecoder func(*nats.Msg) (P, K, error)
	WriterFactory  func() FormattedDataWriter[P]
	Store          BlockStore[K]
}

MessageDecoder

func(*nats.Msg) (P, K, error)

A MessageDecoder is a function that takes a *nats.Msg and returns a decoded message of type P and a "destination key" of type K.

WriterFactory

func() FormattedDataWriter[P]

A WriterFactory is a function that returns a FormattedDataWriter[P]. It is called once for each new "block", where a block is a group of related messages (by DestKey) for a specific time range.

A WriterFactory implements FormattedDataWriter[P], which writes formated data (e.g. CSV, Parquet, etc.) to an io.Writer.

type FormattedDataWriter[P Payload] interface {
	InitNew(out io.Writer) error
	Write(payload P) (int, error)
	Flush() error
}

Store

A BlockStore takes a block and writes it out to a "final" destination as specified by the DestKey.

type BlockStore[K DestKey] interface {
	Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, error)
}

Example

// ExamplePayload is our explicit struct for the NATS messages
type ExamplePayload struct {
	FirstName string `json:"first_name"`
	LastName  string `json:"last_name"`
	Region    string `json:"region"`
}

// ExampleDestKey is just a simple string alias
type ExampleDestKey = string

// JSONToLocalFsCSV is an example configuration that will decode JSON messages that sent over NATS, write them out into
// CSV files, and group the output into `region` specific folder on the local file system.
//
// Use a pointer to an ExamplePayload as the Payload type parameter and ExampleDestKey as the DestKey type parameter
var JSONToLocalFsCSV = &jetcapture.Options[*ExamplePayload, ExampleDestKey]{
	Compression: jetcapture.GZip, // use gzip compression
	Suffix:      "csv",           // suffix will end up being `.csv.gz`
	MaxAge:      time.Hour,       // messages will be written once an hour

	// configure the decoder
	// the incoming NATS messages contain a JSON string which we will decode
	// we also need to return a `DestKey` which we've defined to by a string
	// this key returned is the _region_ field of the decoded message
	MessageDecoder: func(msg *nats.Msg) (*ExamplePayload, ExampleDestKey, error) {
		var p ExamplePayload
		if err := json.Unmarshal(msg.Data, &p); err != nil {
			return nil, "", err
		}
		return &p, p.Region, nil
	},

	// use the jetcapture.NewCSVWriter helper
	// we need to specify the headers, and a function that will "flatten" the payload
	// into one or more CSV rows
	WriterFactory: func() jetcapture.FormattedDataWriter[*ExamplePayload] {
		return jetcapture.NewCSVWriter(
			[]string{"first_name", "last_name", "region"},
			func(p *ExamplePayload) ([][]string, error) {
				return [][]string{{
					p.FirstName,
					p.LastName,
					p.Region,
				}}, nil
			},
		)
	},

	// use the jetcapture.LocalFSStore helper
	// we need to provide a `Resolver` that returns a filesystem path using the destination key
	// the path will use the `region` field to group output
	Store: &jetcapture.LocalFSStore[ExampleDestKey]{
		Resolver: func(dk ExampleDestKey) (string, error) {
			return filepath.Join("backup", dk), nil
		},
	},
}

Documentation

Index

Constants

View Source
const (
	None   Compression = "none"
	GZip               = "gzip"
	Snappy             = "snappy"
)
View Source
const (
	DefaultMaxAge = time.Minute * 15
)

Variables

This section is empty.

Functions

func NatsToNats

func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)

func NewAppSkeleton

func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App

NewAppSkeleton returns an opinionated, partially filled out cli app struct. The caller is responsible for setting any additional app properties (i.e. name), adding cli flags, and completing the options struct (via the setup callback) See /apps/simple/main.go for an example

func SetDefaultLogger

func SetDefaultLogger(newLogger *zap.Logger)

func SubjectToDestKey

func SubjectToDestKey(msg *nats.Msg) string

Types

type AzureBlobStore

type AzureBlobStore[K DestKey] struct {
	// contains filtered or unexported fields
}

func NewAzureBlobStore

func NewAzureBlobStore[K DestKey](
	credential azcore.TokenCredential,
	buildURLBase BuildURLBase[K],
) (*AzureBlobStore[K], error)

func (*AzureBlobStore[K]) Write

func (a *AzureBlobStore[K]) Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, error)

type BlockStore

type BlockStore[K DestKey] interface {
	// Write takes a block and writes it out to a "final" destination as specified by the deskKey, dir and file name.
	// returns a stringified version of the destination
	// TODO(jonathan): should the block be an io.ReadCloser?
	Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, error)
}

func SingleDirStore

func SingleDirStore[K DestKey](path string) BlockStore[K]

type BuildURLBase

type BuildURLBase[K DestKey] func(ctx context.Context, destKey K) (string, error)

BuildURLBase should return a URL that serves as the base for the block For example: https://capture.blob.core.windows.net/backup/from-stream-foo/

type CSVWriter

type CSVWriter[P Payload] struct {
	// contains filtered or unexported fields
}

func (*CSVWriter[P]) Flush

func (c *CSVWriter[P]) Flush() error

func (*CSVWriter[P]) InitNew

func (c *CSVWriter[P]) InitNew(out io.Writer) error

func (*CSVWriter[P]) Write

func (c *CSVWriter[P]) Write(m P) (int, error)

type Capture

type Capture[P Payload, K DestKey] struct {
	// contains filtered or unexported fields
}

func New

func New[P Payload, K DestKey](opts Options[P, K]) *Capture[P, K]

func (*Capture[P, K]) Run

func (c *Capture[P, K]) Run(ctx context.Context, nc *nats.Conn) error

type Compression

type Compression string

type DestKey

type DestKey interface {
	comparable
}

type FormattedDataWriter

type FormattedDataWriter[P Payload] interface {
	InitNew(out io.Writer) error
	Write(payload P) (int, error)
	Flush() error
}

func NewCSVWriter

func NewCSVWriter[P Payload](
	header []string,
	flattenFn func(payload P) ([][]string, error),
) FormattedDataWriter[P]

type LocalFSStore

type LocalFSStore[K DestKey] struct {
	Resolver func(destKey K) (string, error)
}

func (*LocalFSStore[K]) Write

func (f *LocalFSStore[K]) Write(_ context.Context, block io.Reader, destKey K, dir, fileName string) (string, error)

type NatsMessage

type NatsMessage struct {
	Subject  string              `json:"subject"`
	Reply    string              `json:"reply"` // TODO(jonathan): should reply be included?
	Header   map[string][]string `json:"header"`
	Data     []byte              `json:"data"`
	Metadata *nats.MsgMetadata   `json:"metadata"`
}

type NewLineDelimitedJSON

type NewLineDelimitedJSON[P Payload] struct {
	// contains filtered or unexported fields
}

func (*NewLineDelimitedJSON[P]) Flush

func (j *NewLineDelimitedJSON[P]) Flush() error

func (*NewLineDelimitedJSON[P]) InitNew

func (j *NewLineDelimitedJSON[P]) InitNew(out io.Writer) error

func (*NewLineDelimitedJSON[P]) Write

func (j *NewLineDelimitedJSON[P]) Write(m P) (int, error)

type Options

type Options[P Payload, K DestKey] struct {
	NATSStreamName   string
	NATSConsumerName string
	Compression      Compression
	Suffix           string
	BufferToDisk     bool
	MaxAge           time.Duration
	MaxMessages      int
	TempDir          string

	MessageDecoder func(*nats.Msg) (P, K, error)
	WriterFactory  func() FormattedDataWriter[P]
	Store          BlockStore[K]
}

func DefaultOptions

func DefaultOptions[P Payload, K DestKey]() *Options[P, K]

func (*Options[P, K]) Build

func (o *Options[P, K]) Build() *Capture[P, K]

func (*Options[P, K]) Validate

func (o *Options[P, K]) Validate() error

type Payload

type Payload interface {
	any
}

Directories

Path Synopsis
apps

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL