reader

package
v0.0.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	FileNameSystemCol = "__file_name"
	RowIndexSystemCol = "__row_index"

	EstimateFilesLimit = 10
)
View Source
var (
	RestColumnName = "rest"
)

Functions

func ListFiles

func ListFiles(bucket, pathPrefix, pathPattern string, client s3iface.S3API, logger log.Logger, maxResults *int, isObj IsObj) ([]*aws_s3.Object, error)

ListFiles lists all files matching the pathPattern in a bucket. A fast circuit breaker is built in for schema resolution where we do not need the full list of objects.

func NewChunkedReader

func NewChunkedReader(fetcher *s3Fetcher, stats *stats.SourceStats) (io.ReaderAt, error)

func NewGzipReader

func NewGzipReader(fetcher *s3Fetcher, downloader *s3manager.Downloader, stats *stats.SourceStats) (io.ReaderAt, error)

func NewS3Fetcher

func NewS3Fetcher(ctx context.Context, client s3iface.S3API, bucket string, key string) (*s3Fetcher, error)

func SkipObject

func SkipObject(file *aws_s3.Object, pathPattern, splitter string, isObj IsObj) bool

SkipObject returns true if an object should be skipped. An object is skipped if the file type does not match the one covered by the reader or if the objects name/path is not included in the path pattern.

Types

type CSVReader

type CSVReader struct {
	// contains filtered or unexported fields
}

func NewCSVReader

func NewCSVReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*CSVReader, error)

func (*CSVReader) IsObj

func (r *CSVReader) IsObj(file *aws_s3.Object) bool

func (*CSVReader) ParseCSVRows

func (r *CSVReader) ParseCSVRows(csvReader *csv.Reader, filePath string, lastModified time.Time, lineCounter *uint64) ([]abstract.ChangeItem, error)

ParseCSVRows reads and parses line by line the fetched data block from S3. If EOF or batchSize limit is reached the extracted changeItems are returned.

func (*CSVReader) ParsePassthrough

func (r *CSVReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*CSVReader) Read

func (r *CSVReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*CSVReader) ResolveSchema

func (r *CSVReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*CSVReader) RowCount

func (r *CSVReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*CSVReader) TotalRowCount

func (r *CSVReader) TotalRowCount(ctx context.Context) (uint64, error)

type GenericParserReader

type GenericParserReader struct {
	// contains filtered or unexported fields
}

func NewGenericParserReader

func NewGenericParserReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats, parser parsers.Parser) (*GenericParserReader, error)

func (*GenericParserReader) IsObj

func (r *GenericParserReader) IsObj(file *aws_s3.Object) bool

func (*GenericParserReader) ParseFile

func (r *GenericParserReader) ParseFile(ctx context.Context, filePath string, s3Reader *S3Reader) ([]abstract.ChangeItem, error)

func (*GenericParserReader) ParsePassthrough

func (r *GenericParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*GenericParserReader) Read

func (r *GenericParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*GenericParserReader) ResolveSchema

func (r *GenericParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*GenericParserReader) RowCount

func (r *GenericParserReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*GenericParserReader) TotalRowCount

func (r *GenericParserReader) TotalRowCount(ctx context.Context) (uint64, error)

type IsObj

type IsObj func(file *aws_s3.Object) bool

type JSONLineReader

type JSONLineReader struct {
	// contains filtered or unexported fields
}

func NewJSONLineReader

func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*JSONLineReader, error)

func (*JSONLineReader) IsObj

func (r *JSONLineReader) IsObj(file *aws_s3.Object) bool

func (*JSONLineReader) ParsePassthrough

func (r *JSONLineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*JSONLineReader) Read

func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*JSONLineReader) ResolveSchema

func (r *JSONLineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*JSONLineReader) RowCount

func (r *JSONLineReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*JSONLineReader) TotalRowCount

func (r *JSONLineReader) TotalRowCount(ctx context.Context) (uint64, error)

type JSONParserReader

type JSONParserReader struct {
	// contains filtered or unexported fields
}

func NewJSONParserReader

func NewJSONParserReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*JSONParserReader, error)

func (*JSONParserReader) IsObj

func (r *JSONParserReader) IsObj(file *aws_s3.Object) bool

func (*JSONParserReader) ParsePassthrough

func (r *JSONParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*JSONParserReader) Read

func (r *JSONParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*JSONParserReader) ResolveSchema

func (r *JSONParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*JSONParserReader) RowCount

func (r *JSONParserReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*JSONParserReader) TotalRowCount

func (r *JSONParserReader) TotalRowCount(ctx context.Context) (uint64, error)

type LineReader

type LineReader struct {
	ColumnNames []string
	// contains filtered or unexported fields
}

func NewLineReader

func NewLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*LineReader, error)

func (*LineReader) IsObj

func (r *LineReader) IsObj(file *aws_s3.Object) bool

func (*LineReader) ParsePassthrough

func (r *LineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*LineReader) Read

func (r *LineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*LineReader) ResolveSchema

func (r *LineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*LineReader) RowCount

func (r *LineReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*LineReader) TotalRowCount

func (r *LineReader) TotalRowCount(ctx context.Context) (uint64, error)

type Reader

type Reader interface {
	Read(ctx context.Context, filePath string, pusher pusher.Pusher) error
	// ParsePassthrough is used in the parsqueue pusher for replications.
	// Since actual parsing in the S3 parsers is a rather complex process, tailored to each format, this methods
	// is just mean as a simple passthrough to fulfill the parsqueue signature contract and forwards the already parsed CI elements for pushing.
	ParsePassthrough(chunk pusher.Chunk) []abstract.ChangeItem
	IsObj(file *aws_s3.Object) bool
	ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
}

func New

func New(
	src *s3.S3Source,
	lgr log.Logger,
	sess *session.Session,
	metrics *stats.SourceStats,
) (Reader, error)

type ReaderParquet

type ReaderParquet struct {
	// contains filtered or unexported fields
}

func NewParquet

func NewParquet(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*ReaderParquet, error)

func (*ReaderParquet) IsObj

func (r *ReaderParquet) IsObj(file *aws_s3.Object) bool

func (*ReaderParquet) ParsePassthrough

func (r *ReaderParquet) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem

func (*ReaderParquet) Read

func (r *ReaderParquet) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error

func (*ReaderParquet) ResolveSchema

func (r *ReaderParquet) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)

func (*ReaderParquet) RowCount

func (r *ReaderParquet) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)

func (*ReaderParquet) TotalRowCount

func (r *ReaderParquet) TotalRowCount(ctx context.Context) (uint64, error)

type RowCounter

type RowCounter interface {
	TotalRowCount(ctx context.Context) (uint64, error)
	RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
}

type S3Reader

type S3Reader struct {
	// contains filtered or unexported fields
}

S3Reader is a wrapper holding is io.ReaderAt implementations. In the case of non gzipped files it will perform HTTP Range request to the s3 bucket. In the case of gzipped files it will read a chunk of the in memory decompressed file. New instances must be created with the NewS3Reader function. It is safe for concurrent use.

func NewS3Reader

func NewS3Reader(ctx context.Context, client s3iface.S3API, downloader *s3manager.Downloader, bucket string, key string, metrics *stats.SourceStats) (*S3Reader, error)

func (*S3Reader) LastModified

func (r *S3Reader) LastModified() time.Time

Size is a proxy call to the underlying fetcher method

func (*S3Reader) ReadAt

func (r *S3Reader) ReadAt(p []byte, off int64) (int, error)

ReadAt is a proxy call to the underlying reader implementation

func (*S3Reader) Size

func (r *S3Reader) Size() int64

Size is a proxy call to the underlying fetcher method

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL