Documentation ¶
Overview ¶
Package pqarrow provides the implementation for connecting Arrow directly with the Parquet implementation, allowing isolation of all the explicitly arrow related code to this package which has the interfaces for reading and writing directly to and from arrow Arrays/Tables/Records
Index ¶
- func DecimalSize(precision int32) int32
- func FromParquet(sc *schema.Schema, props *ArrowReadProperties, kv metadata.KeyValueMetadata) (*arrow.Schema, error)
- func NewArrowWriteContext(ctx context.Context, props *ArrowWriterProperties) context.Context
- func ReadTable(ctx context.Context, r parquet.ReaderAtSeeker, props *parquet.ReaderProperties, ...) (arrow.Table, error)
- func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, ...) (*schema.Schema, error)
- func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, ...) error
- func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, ...) error
- type ArrowReadProperties
- type ArrowWriterProperties
- type ColumnChunkReader
- type ColumnReader
- type ExtensionCustomParquetType
- type FileReader
- func (fr *FileReader) GetColumn(ctx context.Context, i int) (*ColumnReader, error)
- func (fr *FileReader) GetFieldReader(ctx context.Context, i int, includedLeaves map[int]bool, rowGroups []int) (*ColumnReader, error)
- func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups []int) ([]*ColumnReader, *arrow.Schema, error)
- func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups []int) (RecordReader, error)
- func (fr *FileReader) ParquetReader() *file.Reader
- func (fr *FileReader) ReadColumn(rowGroups []int, rdr *ColumnReader) (*arrow.Chunked, error)
- func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []int) (arrow.Table, error)
- func (fr *FileReader) ReadTable(ctx context.Context) (arrow.Table, error)
- func (fr *FileReader) RowGroup(idx int) RowGroupReader
- func (fr *FileReader) Schema() (*arrow.Schema, error)
- type FileWriter
- func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error
- func (fw *FileWriter) Close() error
- func (fw *FileWriter) NewBufferedRowGroup()
- func (fw *FileWriter) NewRowGroup()
- func (fw *FileWriter) NumRows() int
- func (fw *FileWriter) RowGroupNumRows() (int, error)
- func (fw *FileWriter) RowGroupTotalBytesWritten() int64
- func (fw *FileWriter) RowGroupTotalCompressedBytes() int64
- func (fw *FileWriter) Write(rec arrow.Record) error
- func (fw *FileWriter) WriteBuffered(rec arrow.Record) error
- func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error
- func (fw *FileWriter) WriteColumnData(data arrow.Array) error
- func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error
- type RecordReader
- type RowGroupReader
- type SchemaField
- type SchemaManifest
- type WriterOption
- func WithAllocator(mem memory.Allocator) WriterOption
- func WithCoerceTimestamps(unit arrow.TimeUnit) WriterOption
- func WithDeprecatedInt96Timestamps(enabled bool) WriterOption
- func WithNoMapLogicalType() WriterOption
- func WithStoreSchema() WriterOption
- func WithTruncatedTimestamps(allow bool) WriterOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecimalSize ¶
DecimalSize returns the minimum number of bytes necessary to represent a decimal with the requested precision.
Taken from the Apache Impala codebase. The comments next to the return values are the maximum value that can be represented in 2's complement with the returned number of bytes
func FromParquet ¶
func FromParquet(sc *schema.Schema, props *ArrowReadProperties, kv metadata.KeyValueMetadata) (*arrow.Schema, error)
FromParquet generates an arrow Schema from a provided Parquet Schema
func NewArrowWriteContext ¶
func NewArrowWriteContext(ctx context.Context, props *ArrowWriterProperties) context.Context
NewArrowWriteContext is for creating a re-usable context object that contains writer properties and other re-usable buffers for writing. The resulting context should not be used to write multiple columns concurrently. If nil is passed, then DefaultWriterProps will be used.
func ReadTable ¶
func ReadTable(ctx context.Context, r parquet.ReaderAtSeeker, props *parquet.ReaderProperties, arrProps ArrowReadProperties, mem memory.Allocator) (arrow.Table, error)
ReadTable is a convenience function to quickly and easily read a parquet file into an arrow table.
The schema of the arrow table is generated based on the schema of the parquet file, including nested columns/lists/etc. in the same fashion as the FromParquetSchema function. This just encapsulates the logic of creating a separate file.Reader and pqarrow.FileReader to make a single easy function when you just want to construct a table from the entire parquet file rather than reading it piecemeal.
func ToParquet ¶
func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error)
ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make decisions when determining the logical/physical types of the columns.
func WriteArrowToColumn ¶
func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error
WriteArrowToColumn writes apache arrow columnar data directly to a ColumnWriter. Returns non-nil error if the array data type is not compatible with the concrete writer type.
leafArr is always a primitive (possibly dictionary encoded type). Leaf_field_nullable indicates whether the leaf array is considered nullable according to its schema in a Table or its parent array.
func WriteTable ¶
func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error
WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema and columns will be determined by the schema of the table, writing the file out to the provided writer. The chunksize will be utilized in order to determine the size of the row groups.
Types ¶
type ArrowReadProperties ¶
type ArrowReadProperties struct { // If Parallel is true, then functions which read multiple columns will read // those columns in parallel from the file with a number of readers equal // to the number of columns. Otherwise columns are read serially. Parallel bool // BatchSize is the size used for calls to NextBatch when reading whole columns BatchSize int64 // contains filtered or unexported fields }
ArrowReadProperties is the properties to define how to read a parquet file into arrow arrays.
func (*ArrowReadProperties) ForceLarge ¶
func (props *ArrowReadProperties) ForceLarge(colIdx int) bool
func (*ArrowReadProperties) ReadDict ¶
func (props *ArrowReadProperties) ReadDict(colIdx int) bool
func (*ArrowReadProperties) SetForceLarge ¶
func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool)
SetForceLarge determines whether a particular column, if it is String or Binary, will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32 offsets. This is specifically useful if you know that particular columns contain more than 2GB worth of byte data which would prevent use of int32 offsets.
Passing false will use the default variants while passing true will use the large variant. If the passed column index is not a string or binary column, then this will have no effect.
func (*ArrowReadProperties) SetReadDict ¶
func (props *ArrowReadProperties) SetReadDict(colIdx int, readDict bool)
SetReadDict determines whether to read a particular column as dictionary encoded or not.
type ArrowWriterProperties ¶
type ArrowWriterProperties struct {
// contains filtered or unexported fields
}
ArrowWriterProperties are used to determine how to manipulate the arrow data when writing it to a parquet file.
func DefaultWriterProps ¶
func DefaultWriterProps() ArrowWriterProperties
DefaultWriterProps returns the default properties for the arrow writer, which are to use memory.DefaultAllocator and coerceTimestampUnit: arrow.Second.
func NewArrowWriterProperties ¶
func NewArrowWriterProperties(opts ...WriterOption) ArrowWriterProperties
NewArrowWriterProperties creates a new writer properties object by passing in a set of options to control the properties. Once created, an individual instance of ArrowWriterProperties is immutable.
type ColumnChunkReader ¶
type ColumnChunkReader struct {
// contains filtered or unexported fields
}
ColumnChunkReader is a reader that reads only a single column chunk from a single column in a single row group
type ColumnReader ¶
type ColumnReader struct {
// contains filtered or unexported fields
}
ColumnReader is used for reading batches of data from a specific column across multiple row groups to return a chunked arrow array.
type ExtensionCustomParquetType ¶
type ExtensionCustomParquetType interface {
ParquetLogicalType() schema.LogicalType
}
ExtensionCustomParquetType is an interface that Arrow ExtensionTypes may implement to specify the target LogicalType to use when converting to Parquet.
The PrimitiveType is not configurable, and is determined by a fixed mapping from the extension's StorageType to a Parquet type (see getParquetType in pqarrow source).
type FileReader ¶
type FileReader struct { Props ArrowReadProperties Manifest *SchemaManifest // contains filtered or unexported fields }
FileReader is the base object for reading a parquet file into arrow object types.
It provides utility functions for reading record batches, a table, subsets of columns / rowgroups, and so on.
func NewFileReader ¶
func NewFileReader(rdr *file.Reader, props ArrowReadProperties, mem memory.Allocator) (*FileReader, error)
NewFileReader constructs a reader for converting to Arrow objects from an existing parquet file reader object.
Only returns an error if there is some error constructing the schema manifest from the parquet file metadata.
func (*FileReader) GetColumn ¶
func (fr *FileReader) GetColumn(ctx context.Context, i int) (*ColumnReader, error)
GetColumn returns a reader for pulling the data of leaf column index i across all row groups in the file.
func (*FileReader) GetFieldReader ¶
func (fr *FileReader) GetFieldReader(ctx context.Context, i int, includedLeaves map[int]bool, rowGroups []int) (*ColumnReader, error)
GetFieldReader returns a reader for the entire Field of index i which could potentially include reading multiple columns from the underlying parquet file if that field is a nested field.
IncludedLeaves and RowGroups are used to specify precisely which leaf indexes and row groups to read a subset of.
func (*FileReader) GetFieldReaders ¶
func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups []int) ([]*ColumnReader, *arrow.Schema, error)
GetFieldReaders is for retrieving readers for multiple fields at one time for only the list of column indexes and rowgroups requested. It returns a slice of the readers and the corresponding arrow.Schema for those columns.
func (*FileReader) GetRecordReader ¶
func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups []int) (RecordReader, error)
GetRecordReader returns a record reader that reads only the requested column indexes and row groups.
For both cases, if you pass nil for column indexes or rowgroups it will default to reading all of them.
func (*FileReader) ParquetReader ¶
func (fr *FileReader) ParquetReader() *file.Reader
ParquetReader returns the underlying parquet file reader that it was constructed with
func (*FileReader) ReadColumn ¶
func (fr *FileReader) ReadColumn(rowGroups []int, rdr *ColumnReader) (*arrow.Chunked, error)
ReadColumn reads data to create a chunked array only from the requested row groups.
func (*FileReader) ReadRowGroups ¶
func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []int) (arrow.Table, error)
ReadRowGroups is for generating an array.Table from the file but filtering to only read the requested columns and row groups rather than the entire file which ReadTable does.
func (*FileReader) RowGroup ¶
func (fr *FileReader) RowGroup(idx int) RowGroupReader
RowGroup creates a reader that will *only* read from the requested row group
type FileWriter ¶
type FileWriter struct {
// contains filtered or unexported fields
}
FileWriter is an object for writing Arrow directly to a parquet file.
func NewFileWriter ¶
func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error)
NewFileWriter returns a writer for writing Arrow directly to a parquetfile, rather than the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing file.Writer, this will create a new file.Writer based on the schema provided.
func (*FileWriter) AppendKeyValueMetadata ¶
func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error
AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata
func (*FileWriter) Close ¶
func (fw *FileWriter) Close() error
Close flushes out the data and closes the file. It can be called multiple times, subsequent calls after the first will have no effect.
func (*FileWriter) NewBufferedRowGroup ¶
func (fw *FileWriter) NewBufferedRowGroup()
NewBufferedRowGroup starts a new memory Buffered Row Group to allow writing columns / records without immediately flushing them to disk. This allows using WriteBuffered to write records and decide where to break your row group based on the TotalBytesWritten rather than on the max row group len. If using Records, this should be paired with WriteBuffered, while Write will always write a new record as a row group in and of itself.
func (*FileWriter) NewRowGroup ¶
func (fw *FileWriter) NewRowGroup()
NewRowGroup does what it says on the tin, creates a new row group in the underlying file. Equivalent to `AppendRowGroup` on a file.Writer
func (*FileWriter) NumRows ¶
func (fw *FileWriter) NumRows() int
NumRows returns the total number of rows that have been written so far.
func (*FileWriter) RowGroupNumRows ¶
func (fw *FileWriter) RowGroupNumRows() (int, error)
RowGroupNumRows returns the number of rows written to the current row group. Returns an error if they are unequal between columns that have been written so far.
func (*FileWriter) RowGroupTotalBytesWritten ¶
func (fw *FileWriter) RowGroupTotalBytesWritten() int64
RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in the current row group.
func (*FileWriter) RowGroupTotalCompressedBytes ¶
func (fw *FileWriter) RowGroupTotalCompressedBytes() int64
RowGroupTotalCompressedBytes returns the total number of bytes after compression that have been written to the current row group so far.
func (*FileWriter) Write ¶
func (fw *FileWriter) Write(rec arrow.Record) error
Write an arrow Record Batch to the file, respecting the MaxRowGroupLength in the writer properties to determine whether the record is broken up into more than one row group. At the very least a single row group is created per record, so calling Write always results in a new row group added.
Performance-wise Write might be more favorable than WriteBuffered if you're dealing with: * a highly-restricted memory environment * very large records with lots of rows (potentially close to the max row group length)
func (*FileWriter) WriteBuffered ¶
func (fw *FileWriter) WriteBuffered(rec arrow.Record) error
WriteBuffered will either append to an existing row group or create a new one based on the record length and max row group length.
Additionally, it allows to manually break your row group by checking RowGroupTotalBytesWritten and calling NewBufferedRowGroup, while Write will always create at least 1 row group for the record.
Performance-wise WriteBuffered might be more favorable than Write if you're dealing with: * a loose memory environment (meaning you have a lot of memory to utilize) * records that have only a small (~<1K?) amount of rows
More memory is utilized compared to Write as the whole row group data is kept in memory before it's written since Parquet files must have an entire column written before writing the next column.
func (*FileWriter) WriteColumnChunked ¶
func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error
WriteColumnChunked will write the data provided to the underlying file, using the provided offset and size to allow writing subsets of data from the chunked column. It uses the current column in the underlying row group writer as the starting point, allowing progressive building of writing columns to a file via arrow data without needing to already have a record or table.
func (*FileWriter) WriteColumnData ¶
func (fw *FileWriter) WriteColumnData(data arrow.Array) error
WriteColumnData writes the entire array to the file as the next columns. Like WriteColumnChunked it is based on the current column of the row group writer allowing progressive building of the file by columns without needing a full record or table to write.
func (*FileWriter) WriteTable ¶
func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error
WriteTable writes an arrow table to the underlying file using chunkSize to determine the size to break at for making row groups. Writing a table will always create a new row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will still write a 0 length Row Group to the file.
type RecordReader ¶
type RecordReader interface { array.RecordReader arrio.Reader }
RecordReader is a Record Batch Reader that meets the interfaces for both array.RecordReader and arrio.Reader to allow easy progressive reading of record batches from the parquet file. Ideal for streaming.
type RowGroupReader ¶
type RowGroupReader struct {
// contains filtered or unexported fields
}
RowGroupReader is a reader for getting data only from a single row group of the file rather than having to repeatedly pass the index to functions on the reader.
func (RowGroupReader) Column ¶
func (rgr RowGroupReader) Column(idx int) ColumnChunkReader
Column creates a reader for just the requested column chunk in only this row group.
type SchemaField ¶
type SchemaField struct { Field *arrow.Field Children []SchemaField ColIndex int LevelInfo file.LevelInfo }
SchemaField is a holder that defines a specific logical field in the schema which could potentially refer to multiple physical columns in the underlying parquet file if it is a nested type.
ColIndex is only populated (not -1) when it is a leaf column.
func (*SchemaField) IsLeaf ¶
func (s *SchemaField) IsLeaf() bool
IsLeaf returns true if the SchemaField is a leaf column, ie: ColIndex != -1
type SchemaManifest ¶
type SchemaManifest struct { OriginSchema *arrow.Schema SchemaMeta *arrow.Metadata ColIndexToField map[int]*SchemaField ChildToParent map[*SchemaField]*SchemaField Fields []SchemaField // contains filtered or unexported fields }
SchemaManifest represents a full manifest for mapping a Parquet schema to an arrow Schema.
func NewSchemaManifest ¶
func NewSchemaManifest(sc *schema.Schema, meta metadata.KeyValueMetadata, props *ArrowReadProperties) (*SchemaManifest, error)
NewSchemaManifest creates a manifest for mapping a parquet schema to a given arrow schema.
The metadata passed in should be the file level key value metadata from the parquet file or nil. If the ARROW:schema was in the metadata, then it is utilized to determine types.
func (*SchemaManifest) GetColumnField ¶
func (sm *SchemaManifest) GetColumnField(index int) (*SchemaField, error)
GetColumnField returns the corresponding Field for a given column index.
func (*SchemaManifest) GetFieldIndices ¶
func (sm *SchemaManifest) GetFieldIndices(indices []int) ([]int, error)
GetFieldIndices coalesces a list of field indices (relative to the equivalent arrow::Schema) which correspond to the column root (first node below the parquet schema's root group) of each leaf referenced in column_indices.
For example, for leaves `a.b.c`, `a.b.d.e`, and `i.j.k` (column_indices=[0,1,3]) the roots are `a` and `i` (return=[0,2]).
root -- a <------ -- -- b | | -- -- -- c | -- -- -- d | -- -- -- -- e -- f -- -- g -- -- -- h -- i <--- -- -- j | -- -- -- k
func (*SchemaManifest) GetParent ¶
func (sm *SchemaManifest) GetParent(field *SchemaField) *SchemaField
GetParent gets the parent field for a given field if it is a nested column, otherwise returns nil if there is no parent field.
type WriterOption ¶
type WriterOption func(*config)
WriterOption is a convenience for building up arrow writer properties
func WithAllocator ¶
func WithAllocator(mem memory.Allocator) WriterOption
WithAllocator specifies the allocator to be used by the writer whenever allocating buffers and memory.
func WithCoerceTimestamps ¶
func WithCoerceTimestamps(unit arrow.TimeUnit) WriterOption
WithCoerceTimestamps enables coercing of timestamp units to a specific time unit when constructing the schema and writing data so that regardless of the unit used by the datatypes being written, they will be converted to the desired time unit.
func WithDeprecatedInt96Timestamps ¶
func WithDeprecatedInt96Timestamps(enabled bool) WriterOption
WithDeprecatedInt96Timestamps allows specifying to enable conversion of arrow timestamps to int96 columns when constructing the schema. Since int96 is the impala standard, it's technically deprecated in terms of parquet files but is sometimes needed.
func WithNoMapLogicalType ¶
func WithNoMapLogicalType() WriterOption
func WithStoreSchema ¶
func WithStoreSchema() WriterOption
WithStoreSchema enables writing a binary serialized arrow schema to the file in metadata to enable certain read options (like "read_dictionary") to be set automatically
If called, the arrow schema is serialized and base64 encoded before being added to the metadata of the parquet file with the key "ARROW:schema". If the key exists when opening a file for read with pqarrow.FileReader, the schema will be used to choose types and options when constructing the arrow schema of the resulting data.
func WithTruncatedTimestamps ¶
func WithTruncatedTimestamps(allow bool) WriterOption
WithTruncatedTimestamps called with true turns off the error that would be returned if coercing a timestamp unit would cause a loss of data such as converting from nanoseconds to seconds.