parquet

package
v0.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NamingStrategyDtErrored   = "errors/yyyy/MM/dd"
	NamingStrategyDtSeparated = "yyyy/MM/dd"
)

Variables

This section is empty.

Functions

func DefaultS3KeyNamingStrategy

func DefaultS3KeyNamingStrategy(modelId mdl.ModelId, datetime time.Time, prefixCallback S3PrefixNamingStrategy) string

func RegisterS3PrefixNamingStrategy

func RegisterS3PrefixNamingStrategy(name string, strategy S3PrefixNamingStrategy)

func WithKeyNamingStrategy

func WithKeyNamingStrategy(strategy S3KeyNamingStrategy)

func WithS3BucketNamingStrategy

func WithS3BucketNamingStrategy(strategy S3BucketNamingStrategy)

Types

type File

type File struct {
	Bucket string
	Key    string
}

type FileRecorder

type FileRecorder interface {
	DeleteRecordedFiles(ctx context.Context) error
	Files() []File
	RecordFile(bucket string, key string)
	RenameRecordedFiles(ctx context.Context, newPrefix string) error
}

func NewNopRecorder

func NewNopRecorder() FileRecorder

func NewS3FileRecorder

func NewS3FileRecorder(ctx context.Context, config cfg.Config, logger log.Logger, name string) (FileRecorder, error)

func NewS3FileRecorderWithInterfaces

func NewS3FileRecorderWithInterfaces(logger log.Logger, s3Client gosoS3.Client) FileRecorder

type Partition

type Partition struct {
	Timestamp     time.Time
	Elements      []Partitionable
	StartedAt     time.Time
	LastUpdatedAt time.Time
}

type Partitionable

type Partitionable interface {
	GetPartitionTimestamp() time.Time
}

type Partitioner

type Partitioner interface {
	Flush()
	Ingest(data Partitionable)
	Out() <-chan *Partition
	Size() int
	Start()
	Stop()
	Trim(size int)
}

func NewPartitioner

func NewPartitioner(settings *PartitionerSettings) Partitioner

func NewPartitionerWithInterfaces

func NewPartitionerWithInterfaces(clock clock.Clock, settings *PartitionerSettings) Partitioner

type PartitionerSettings

type PartitionerSettings struct {
	// at what granularity do we divide the data into partitions? Needs to be at least 1 second.
	PartitionInterval time.Duration `cfg:"partition_interval" default:"900s" validate:"min=1000000000"`
	// how long do we buffer elements before we write them out even when the partition
	// is not yet full. Needs to be at least 1 second.
	BufferInterval time.Duration `cfg:"buffer_interval" default:"900s" validate:"min=1000000000"`
	// how many elements can a partition have before we have to flush it (to avoid excessive memory usage)
	MaxPartitionSize int `cfg:"max_partition_size" default:"50000" validate:"min=1"`
}

type Progress

type Progress struct {
	Current   int
	FileCount int
}

type ReadResult

type ReadResult map[string]interface{}

type ReadResults

type ReadResults []ReadResult

type Reader

type Reader interface {
	ReadDate(ctx context.Context, datetime time.Time, target interface{}) error
	ReadDateAsync(ctx context.Context, datetime time.Time, target interface{}, callback ResultCallback) error
	ReadFileIntoTarget(ctx context.Context, file string, target interface{}, batchSize int, offset int64) error
}

func NewReader

func NewReader(ctx context.Context, config cfg.Config, logger log.Logger, settings *ReaderSettings) (Reader, error)

func NewReaderWithInterfaces

func NewReaderWithInterfaces(
	logger log.Logger,
	s3Client gosoS3.Client,
	modelId mdl.ModelId,
	prefixNaming S3PrefixNamingStrategy,
	recorder FileRecorder,
) Reader

type ReaderSettings

type ReaderSettings struct {
	ClientName     string `cfg:"client_name" default:"default"`
	ModelId        mdl.ModelId
	NamingStrategy string
	Recorder       FileRecorder
}

type ResultCallback

type ResultCallback func(progress Progress, results interface{}) (bool, error)

type S3BucketNamingStrategy

type S3BucketNamingStrategy func(appId cfg.AppId) string

type S3KeyNamingStrategy

type S3KeyNamingStrategy func(modelId mdl.ModelId, datetime time.Time, prefixCallback S3PrefixNamingStrategy) string

type S3PrefixNamingStrategy

type S3PrefixNamingStrategy func(modelId mdl.ModelId, datetime time.Time) string

type Writer

type Writer interface {
	Write(ctx context.Context, datetime time.Time, items interface{}) error
	WriteToKey(ctx context.Context, key string, items interface{}) error
}

func NewWriter

func NewWriter(ctx context.Context, config cfg.Config, logger log.Logger, settings *WriterSettings) (Writer, error)

func NewWriterWithInterfaces

func NewWriterWithInterfaces(
	logger log.Logger,
	s3Client gosoS3.Client,
	modelId mdl.ModelId,
	prefixNaming S3PrefixNamingStrategy,
	tags map[string]string,
	recorder FileRecorder,
) Writer

type WriterSettings

type WriterSettings struct {
	ClientName     string `cfg:"client_name" default:"default"`
	ModelId        mdl.ModelId
	NamingStrategy string
	Recorder       FileRecorder
	Tags           map[string]string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL