Documentation ¶
Index ¶
- Constants
- func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)
- func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App
- func SetDefaultLogger(newLogger *zap.Logger)
- func SubjectToDestKey(msg *nats.Msg) string
- type AzureBlobStore
- type BlockStore
- type BuildURLBase
- type CSVWriter
- type Capture
- type Compression
- type DestKey
- type FormattedDataWriter
- type LocalFSStore
- type NatsMessage
- type NewLineDelimitedJSON
- type Options
- type OverrideUploadOptions
- type Payload
Constants ¶
const ( None Compression = "none" GZip = "gzip" Snappy = "snappy" )
const (
DefaultMaxAge = time.Minute * 15
)
Variables ¶
This section is empty.
Functions ¶
func NatsToNats ¶
func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)
func NewAppSkeleton ¶
func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App
NewAppSkeleton returns an opinionated, partially filled out cli app struct. The caller is responsible for setting any additional app properties (i.e. name), adding cli flags, and completing the options struct (via the setup callback) See /apps/simple/main.go for an example
func SetDefaultLogger ¶
func SubjectToDestKey ¶
func SubjectToDestKey(msg *nats.Msg) string
Types ¶
type AzureBlobStore ¶
type AzureBlobStore[K DestKey] struct { // contains filtered or unexported fields }
func NewAzureBlobStore ¶
func NewAzureBlobStore[K DestKey]( credential azcore.TokenCredential, buildURLBaseFn BuildURLBase[K], optionsFn OverrideUploadOptions[K], ) (*AzureBlobStore[K], error)
type BlockStore ¶
type BlockStore[K DestKey] interface { // Write takes a block and writes it out to a "final" destination as specified by the deskKey, dir and file name. // returns a stringified version of the destination, number of bytes written, and the duration // TODO(jonathan): should the block be an io.ReadCloser? Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, int64, time.Duration, error) }
func SingleDirStore ¶
func SingleDirStore[K DestKey](path string) BlockStore[K]
type BuildURLBase ¶
BuildURLBase should return a URL that serves as the base for the block For example: https://capture.blob.core.windows.net/backup/from-stream-foo/
type Compression ¶
type Compression string
type DestKey ¶
type DestKey interface { comparable }
type FormattedDataWriter ¶
type FormattedDataWriter[P Payload] interface { // InitNew is called by jetcapture passing in an `io.Writer` that is the underlying temporary storage for this block // The implementation is expected to keep a reference to it and use it during the `Write()` calls InitNew(out io.Writer) error // Write should write the payload internally and eventually the underlying `io.Writer` // Buffering data/writes is fine as long as a call to `Flush` ensures that everything is written Write(payload P) (int, error) // Flush is called after each block of messages is processed. Can be a nop depending on the implementation. Flush() error }
FormattedDataWriter specifies a method for writing a `Payload` to an `io.Writer` It will be used from a "factory" function. Meaning a new writer will be created for each block
func NewCSVWriter ¶
func NewCSVWriter[P Payload]( header []string, flattenFn func(payload P) ([][]string, error), ) FormattedDataWriter[P]
type LocalFSStore ¶
type NatsMessage ¶
type NewLineDelimitedJSON ¶
type NewLineDelimitedJSON[P Payload] struct { // contains filtered or unexported fields }
func (*NewLineDelimitedJSON[P]) Flush ¶
func (j *NewLineDelimitedJSON[P]) Flush() error
func (*NewLineDelimitedJSON[P]) InitNew ¶
func (j *NewLineDelimitedJSON[P]) InitNew(out io.Writer) error
func (*NewLineDelimitedJSON[P]) Write ¶
func (j *NewLineDelimitedJSON[P]) Write(m P) (int, error)
type Options ¶
type Options[P Payload, K DestKey] struct { NATSStreamName string // which stream should jetcapture bind to NATSConsumerName string // which consumer should jetcapture bind to Compression Compression // apply compression to the resulting files Suffix string // add a suffix BufferToDisk bool // should jetcapture buffer to disk using temp files, or keep blocks in memory MaxAge time.Duration // what is the max duration for a single block MaxMessages int // rough limit to the number of messages in a block before a new one is created TempDir string // override the default OS temp dir MessageDecoder func(*nats.Msg) (P, K, error) WriterFactory func() FormattedDataWriter[P] Store BlockStore[K] OnStoreComplete func(K, string, int64, time.Duration, error) // optional callback for metrics capture }
func DefaultOptions ¶
type OverrideUploadOptions ¶ added in v0.2.4
type OverrideUploadOptions[K DestKey] func(options *azblob.UploadStreamOptions, destKey K)
OverrideUploadOptions is an optional function to override various upload option (e.g. AccessTier)