Documentation ¶
Index ¶
- Constants
- func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)
- func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App
- func SetDefaultLogger(newLogger *zap.Logger)
- func SubjectToDestKey(msg *nats.Msg) string
- type AzureBlobStore
- type BlockStore
- type BuildURLBase
- type CSVWriter
- type Capture
- type Compression
- type DestKey
- type FormattedDataWriter
- type LocalFSStore
- type NatsMessage
- type NewLineDelimitedJSON
- type Options
- type Payload
Constants ¶
View Source
const ( None Compression = "none" GZip = "gzip" Snappy = "snappy" )
View Source
const (
DefaultMaxAge = time.Minute * 15
)
Variables ¶
This section is empty.
Functions ¶
func NatsToNats ¶
func NatsToNats[K DestKey](resolve func(msg *nats.Msg) K) func(msg *nats.Msg) (*NatsMessage, K, error)
func NewAppSkeleton ¶
func NewAppSkeleton[P Payload, K DestKey](setup func(c *cli.Context, options *Options[P, K]) error) *cli.App
NewAppSkeleton returns an opinionated, partially filled out cli app struct. The caller is responsible for setting any additional app properties (i.e. name), adding cli flags, and completing the options struct (via the setup callback) See /apps/simple/main.go for an example
func SetDefaultLogger ¶
func SubjectToDestKey ¶
func SubjectToDestKey(msg *nats.Msg) string
Types ¶
type AzureBlobStore ¶
type AzureBlobStore[K DestKey] struct { // contains filtered or unexported fields }
func NewAzureBlobStore ¶
func NewAzureBlobStore[K DestKey]( credential azcore.TokenCredential, buildURLBase BuildURLBase[K], ) (*AzureBlobStore[K], error)
type BlockStore ¶
type BlockStore[K DestKey] interface { // Write takes a block and writes it out to a "final" destination as specified by the deskKey, dir and file name. // returns a stringified version of the destination // TODO(jonathan): should the block be an io.ReadCloser? Write(ctx context.Context, block io.Reader, destKey K, dir, fileName string) (string, error) }
func SingleDirStore ¶
func SingleDirStore[K DestKey](path string) BlockStore[K]
type BuildURLBase ¶
BuildURLBase should return a URL that serves as the base for the block For example: https://capture.blob.core.windows.net/backup/from-stream-foo/
type Compression ¶
type Compression string
type DestKey ¶
type DestKey interface { comparable }
type FormattedDataWriter ¶
type FormattedDataWriter[P Payload] interface { InitNew(out io.Writer) error Write(payload P) (int, error) Flush() error }
func NewCSVWriter ¶
func NewCSVWriter[P Payload]( header []string, flattenFn func(payload P) ([][]string, error), ) FormattedDataWriter[P]
type LocalFSStore ¶
type NatsMessage ¶
type NewLineDelimitedJSON ¶
type NewLineDelimitedJSON[P Payload] struct { // contains filtered or unexported fields }
func (*NewLineDelimitedJSON[P]) Flush ¶
func (j *NewLineDelimitedJSON[P]) Flush() error
func (*NewLineDelimitedJSON[P]) InitNew ¶
func (j *NewLineDelimitedJSON[P]) InitNew(out io.Writer) error
func (*NewLineDelimitedJSON[P]) Write ¶
func (j *NewLineDelimitedJSON[P]) Write(m P) (int, error)
type Options ¶
type Options[P Payload, K DestKey] struct { NATSStreamName string NATSConsumerName string Compression Compression Suffix string BufferToDisk bool MaxAge time.Duration MaxMessages int TempDir string MessageDecoder func(*nats.Msg) (P, K, error) WriterFactory func() FormattedDataWriter[P] Store BlockStore[K] }
func DefaultOptions ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.