Documentation ¶
Index ¶
- Constants
- func DefaultS3KeyNamingStrategy(modelId mdl.ModelId, datetime time.Time, prefixCallback S3PrefixNamingStrategy) string
- func RegisterS3PrefixNamingStrategy(name string, strategy S3PrefixNamingStrategy)
- func WithKeyNamingStrategy(strategy S3KeyNamingStrategy)
- func WithS3BucketNamingStrategy(strategy S3BucketNamingStrategy)
- type File
- type FileRecorder
- type Partition
- type Partitionable
- type Partitioner
- type PartitionerSettings
- type Progress
- type ReadResult
- type ReadResults
- type Reader
- type ReaderSettings
- type ResultCallback
- type S3BucketNamingStrategy
- type S3KeyNamingStrategy
- type S3PrefixNamingStrategy
- type Writer
- type WriterSettings
Constants ¶
View Source
const ( NamingStrategyDtErrored = "errors/yyyy/MM/dd" NamingStrategyDtSeparated = "yyyy/MM/dd" )
Variables ¶
This section is empty.
Functions ¶
func RegisterS3PrefixNamingStrategy ¶
func RegisterS3PrefixNamingStrategy(name string, strategy S3PrefixNamingStrategy)
func WithKeyNamingStrategy ¶
func WithKeyNamingStrategy(strategy S3KeyNamingStrategy)
func WithS3BucketNamingStrategy ¶
func WithS3BucketNamingStrategy(strategy S3BucketNamingStrategy)
Types ¶
type FileRecorder ¶
type FileRecorder interface { DeleteRecordedFiles(ctx context.Context) error Files() []File RecordFile(bucket string, key string) RenameRecordedFiles(ctx context.Context, newPrefix string) error }
func NewNopRecorder ¶
func NewNopRecorder() FileRecorder
func NewS3FileRecorder ¶
func NewS3FileRecorderWithInterfaces ¶
func NewS3FileRecorderWithInterfaces(logger log.Logger, s3Client gosoS3.Client) FileRecorder
type Partitionable ¶
type Partitioner ¶
type Partitioner interface { Flush() Ingest(data Partitionable) Out() <-chan *Partition Size() int Start() Stop() Trim(size int) }
func NewPartitioner ¶
func NewPartitioner(settings *PartitionerSettings) Partitioner
func NewPartitionerWithInterfaces ¶
func NewPartitionerWithInterfaces(clock clock.Clock, settings *PartitionerSettings) Partitioner
type PartitionerSettings ¶
type PartitionerSettings struct { // at what granularity do we divide the data into partitions? Needs to be at least 1 second. PartitionInterval time.Duration `cfg:"partition_interval" default:"900s" validate:"min=1000000000"` // how long do we buffer elements before we write them out even when the partition // is not yet full. Needs to be at least 1 second. BufferInterval time.Duration `cfg:"buffer_interval" default:"900s" validate:"min=1000000000"` // how many elements can a partition have before we have to flush it (to avoid excessive memory usage) MaxPartitionSize int `cfg:"max_partition_size" default:"50000" validate:"min=1"` }
type ReadResult ¶
type ReadResult map[string]interface{}
type ReadResults ¶
type ReadResults []ReadResult
type Reader ¶
type Reader interface { ReadDate(ctx context.Context, datetime time.Time, target interface{}) error ReadDateAsync(ctx context.Context, datetime time.Time, target interface{}, callback ResultCallback) error ReadFileIntoTarget(ctx context.Context, file string, target interface{}, batchSize int, offset int64) error }
func NewReaderWithInterfaces ¶
func NewReaderWithInterfaces( logger log.Logger, s3Client gosoS3.Client, modelId mdl.ModelId, prefixNaming S3PrefixNamingStrategy, recorder FileRecorder, ) Reader
type ReaderSettings ¶
type ReaderSettings struct { ClientName string `cfg:"client_name" default:"default"` ModelId mdl.ModelId NamingStrategy string Recorder FileRecorder }
type ResultCallback ¶
type S3BucketNamingStrategy ¶
type S3KeyNamingStrategy ¶
type S3PrefixNamingStrategy ¶
type Writer ¶
type Writer interface { Write(ctx context.Context, datetime time.Time, items interface{}) error WriteToKey(ctx context.Context, key string, items interface{}) error }
func NewWriterWithInterfaces ¶
type WriterSettings ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.