Documentation ¶
Index ¶
- Variables
- func ListFiles(bucket, pathPrefix, pathPattern string, client s3iface.S3API, ...) ([]*aws_s3.Object, error)
- func NewChunkedReader(fetcher *s3Fetcher, stats *stats.SourceStats) (io.ReaderAt, error)
- func NewGzipReader(fetcher *s3Fetcher, downloader *s3manager.Downloader, stats *stats.SourceStats) (io.ReaderAt, error)
- func NewS3Fetcher(ctx context.Context, client s3iface.S3API, bucket string, key string) (*s3Fetcher, error)
- func SkipObject(file *aws_s3.Object, pathPattern, splitter string, isObj IsObj) bool
- type CSVReader
- func (r *CSVReader) IsObj(file *aws_s3.Object) bool
- func (r *CSVReader) ParseCSVRows(csvReader *csv.Reader, filePath string, lastModified time.Time, ...) ([]abstract.ChangeItem, error)
- func (r *CSVReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *CSVReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *CSVReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *CSVReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *CSVReader) TotalRowCount(ctx context.Context) (uint64, error)
- type GenericParserReader
- func (r *GenericParserReader) IsObj(file *aws_s3.Object) bool
- func (r *GenericParserReader) ParseFile(ctx context.Context, filePath string, s3Reader *S3Reader) ([]abstract.ChangeItem, error)
- func (r *GenericParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *GenericParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *GenericParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *GenericParserReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *GenericParserReader) TotalRowCount(ctx context.Context) (uint64, error)
- type IsObj
- type JSONLineReader
- func (r *JSONLineReader) IsObj(file *aws_s3.Object) bool
- func (r *JSONLineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *JSONLineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *JSONLineReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *JSONLineReader) TotalRowCount(ctx context.Context) (uint64, error)
- type JSONParserReader
- func (r *JSONParserReader) IsObj(file *aws_s3.Object) bool
- func (r *JSONParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *JSONParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *JSONParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *JSONParserReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *JSONParserReader) TotalRowCount(ctx context.Context) (uint64, error)
- type LineReader
- func (r *LineReader) IsObj(file *aws_s3.Object) bool
- func (r *LineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *LineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *LineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *LineReader) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *LineReader) TotalRowCount(ctx context.Context) (uint64, error)
- type Reader
- type ReaderParquet
- func (r *ReaderParquet) IsObj(file *aws_s3.Object) bool
- func (r *ReaderParquet) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
- func (r *ReaderParquet) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
- func (r *ReaderParquet) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
- func (r *ReaderParquet) RowCount(ctx context.Context, obj *aws_s3.Object) (uint64, error)
- func (r *ReaderParquet) TotalRowCount(ctx context.Context) (uint64, error)
- type RowCounter
- type S3Reader
Constants ¶
This section is empty.
Variables ¶
var ( FileNameSystemCol = "__file_name" RowIndexSystemCol = "__row_index" EstimateFilesLimit = 10 )
var (
RestColumnName = "rest"
)
Functions ¶
func ListFiles ¶
func ListFiles(bucket, pathPrefix, pathPattern string, client s3iface.S3API, logger log.Logger, maxResults *int, isObj IsObj) ([]*aws_s3.Object, error)
ListFiles lists all files matching the pathPattern in a bucket. A fast circuit breaker is built in for schema resolution where we do not need the full list of objects.
func NewChunkedReader ¶
func NewChunkedReader(fetcher *s3Fetcher, stats *stats.SourceStats) (io.ReaderAt, error)
func NewGzipReader ¶
func NewGzipReader(fetcher *s3Fetcher, downloader *s3manager.Downloader, stats *stats.SourceStats) (io.ReaderAt, error)
func NewS3Fetcher ¶
func SkipObject ¶
SkipObject returns true if an object should be skipped. An object is skipped if the file type does not match the one covered by the reader or if the objects name/path is not included in the path pattern.
Types ¶
type CSVReader ¶
type CSVReader struct {
// contains filtered or unexported fields
}
func NewCSVReader ¶
func (*CSVReader) ParseCSVRows ¶
func (r *CSVReader) ParseCSVRows(csvReader *csv.Reader, filePath string, lastModified time.Time, lineCounter *uint64) ([]abstract.ChangeItem, error)
ParseCSVRows reads and parses line by line the fetched data block from S3. If EOF or batchSize limit is reached the extracted changeItems are returned.
func (*CSVReader) ParsePassthrough ¶
func (r *CSVReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*CSVReader) ResolveSchema ¶
type GenericParserReader ¶
type GenericParserReader struct {
// contains filtered or unexported fields
}
func NewGenericParserReader ¶
func (*GenericParserReader) ParseFile ¶
func (r *GenericParserReader) ParseFile(ctx context.Context, filePath string, s3Reader *S3Reader) ([]abstract.ChangeItem, error)
func (*GenericParserReader) ParsePassthrough ¶
func (r *GenericParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*GenericParserReader) Read ¶
func (r *GenericParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
func (*GenericParserReader) ResolveSchema ¶
func (r *GenericParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
func (*GenericParserReader) TotalRowCount ¶
func (r *GenericParserReader) TotalRowCount(ctx context.Context) (uint64, error)
type JSONLineReader ¶
type JSONLineReader struct {
// contains filtered or unexported fields
}
func NewJSONLineReader ¶
func NewJSONLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*JSONLineReader, error)
func (*JSONLineReader) ParsePassthrough ¶
func (r *JSONLineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*JSONLineReader) Read ¶
func (r *JSONLineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
func (*JSONLineReader) ResolveSchema ¶
func (r *JSONLineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
func (*JSONLineReader) TotalRowCount ¶
func (r *JSONLineReader) TotalRowCount(ctx context.Context) (uint64, error)
type JSONParserReader ¶
type JSONParserReader struct {
// contains filtered or unexported fields
}
func NewJSONParserReader ¶
func NewJSONParserReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*JSONParserReader, error)
func (*JSONParserReader) ParsePassthrough ¶
func (r *JSONParserReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*JSONParserReader) Read ¶
func (r *JSONParserReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
func (*JSONParserReader) ResolveSchema ¶
func (r *JSONParserReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
func (*JSONParserReader) TotalRowCount ¶
func (r *JSONParserReader) TotalRowCount(ctx context.Context) (uint64, error)
type LineReader ¶
type LineReader struct { ColumnNames []string // contains filtered or unexported fields }
func NewLineReader ¶
func NewLineReader(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*LineReader, error)
func (*LineReader) ParsePassthrough ¶
func (r *LineReader) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*LineReader) Read ¶
func (r *LineReader) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
func (*LineReader) ResolveSchema ¶
func (r *LineReader) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
func (*LineReader) TotalRowCount ¶
func (r *LineReader) TotalRowCount(ctx context.Context) (uint64, error)
type Reader ¶
type Reader interface { Read(ctx context.Context, filePath string, pusher pusher.Pusher) error // ParsePassthrough is used in the parsqueue pusher for replications. // Since actual parsing in the S3 parsers is a rather complex process, tailored to each format, this methods // is just mean as a simple passthrough to fulfill the parsqueue signature contract and forwards the already parsed CI elements for pushing. ParsePassthrough(chunk pusher.Chunk) []abstract.ChangeItem IsObj(file *aws_s3.Object) bool ResolveSchema(ctx context.Context) (*abstract.TableSchema, error) }
type ReaderParquet ¶
type ReaderParquet struct {
// contains filtered or unexported fields
}
func NewParquet ¶
func NewParquet(src *s3.S3Source, lgr log.Logger, sess *session.Session, metrics *stats.SourceStats) (*ReaderParquet, error)
func (*ReaderParquet) ParsePassthrough ¶
func (r *ReaderParquet) ParsePassthrough(chunk chunk_pusher.Chunk) []abstract.ChangeItem
func (*ReaderParquet) Read ¶
func (r *ReaderParquet) Read(ctx context.Context, filePath string, pusher chunk_pusher.Pusher) error
func (*ReaderParquet) ResolveSchema ¶
func (r *ReaderParquet) ResolveSchema(ctx context.Context) (*abstract.TableSchema, error)
func (*ReaderParquet) TotalRowCount ¶
func (r *ReaderParquet) TotalRowCount(ctx context.Context) (uint64, error)
type RowCounter ¶
type S3Reader ¶
type S3Reader struct {
// contains filtered or unexported fields
}
S3Reader is a wrapper holding is io.ReaderAt implementations. In the case of non gzipped files it will perform HTTP Range request to the s3 bucket. In the case of gzipped files it will read a chunk of the in memory decompressed file. New instances must be created with the NewS3Reader function. It is safe for concurrent use.
func NewS3Reader ¶
func (*S3Reader) LastModified ¶
Size is a proxy call to the underlying fetcher method