pqarrow

package
v18.0.0-...-6131e10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: Apache-2.0, BSD-3-Clause Imports: 32 Imported by: 0

Documentation

Overview

Package pqarrow provides the implementation for connecting Arrow directly with the Parquet implementation, allowing isolation of all the explicitly arrow related code to this package which has the interfaces for reading and writing directly to and from arrow Arrays/Tables/Records

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DecimalSize

func DecimalSize(precision int32) int32

DecimalSize returns the minimum number of bytes necessary to represent a decimal with the requested precision.

Taken from the Apache Impala codebase. The comments next to the return values are the maximum value that can be represented in 2's complement with the returned number of bytes

func FromParquet

FromParquet generates an arrow Schema from a provided Parquet Schema

func NewArrowWriteContext

func NewArrowWriteContext(ctx context.Context, props *ArrowWriterProperties) context.Context

NewArrowWriteContext is for creating a re-usable context object that contains writer properties and other re-usable buffers for writing. The resulting context should not be used to write multiple columns concurrently. If nil is passed, then DefaultWriterProps will be used.

func ReadTable

ReadTable is a convenience function to quickly and easily read a parquet file into an arrow table.

The schema of the arrow table is generated based on the schema of the parquet file, including nested columns/lists/etc. in the same fashion as the FromParquetSchema function. This just encapsulates the logic of creating a separate file.Reader and pqarrow.FileReader to make a single easy function when you just want to construct a table from the entire parquet file rather than reading it piecemeal.

func ToParquet

func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error)

ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make decisions when determining the logical/physical types of the columns.

func WriteArrowToColumn

func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr arrow.Array, defLevels, repLevels []int16, leafFieldNullable bool) error

WriteArrowToColumn writes apache arrow columnar data directly to a ColumnWriter. Returns non-nil error if the array data type is not compatible with the concrete writer type.

leafArr is always a primitive (possibly dictionary encoded type). Leaf_field_nullable indicates whether the leaf array is considered nullable according to its schema in a Table or its parent array.

func WriteTable

func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props *parquet.WriterProperties, arrprops ArrowWriterProperties) error

WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema and columns will be determined by the schema of the table, writing the file out to the provided writer. The chunksize will be utilized in order to determine the size of the row groups.

Types

type ArrowReadProperties

type ArrowReadProperties struct {
	// If Parallel is true, then functions which read multiple columns will read
	// those columns in parallel from the file with a number of readers equal
	// to the number of columns. Otherwise columns are read serially.
	Parallel bool
	// BatchSize is the size used for calls to NextBatch when reading whole columns
	BatchSize int64
	// contains filtered or unexported fields
}

ArrowReadProperties is the properties to define how to read a parquet file into arrow arrays.

func (*ArrowReadProperties) ForceLarge

func (props *ArrowReadProperties) ForceLarge(colIdx int) bool

func (*ArrowReadProperties) ReadDict

func (props *ArrowReadProperties) ReadDict(colIdx int) bool

func (*ArrowReadProperties) SetForceLarge

func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool)

SetForceLarge determines whether a particular column, if it is String or Binary, will use the LargeString/LargeBinary variants (with int64 offsets) instead of int32 offsets. This is specifically useful if you know that particular columns contain more than 2GB worth of byte data which would prevent use of int32 offsets.

Passing false will use the default variants while passing true will use the large variant. If the passed column index is not a string or binary column, then this will have no effect.

func (*ArrowReadProperties) SetReadDict

func (props *ArrowReadProperties) SetReadDict(colIdx int, readDict bool)

SetReadDict determines whether to read a particular column as dictionary encoded or not.

type ArrowWriterProperties

type ArrowWriterProperties struct {
	// contains filtered or unexported fields
}

ArrowWriterProperties are used to determine how to manipulate the arrow data when writing it to a parquet file.

func DefaultWriterProps

func DefaultWriterProps() ArrowWriterProperties

DefaultWriterProps returns the default properties for the arrow writer, which are to use memory.DefaultAllocator and coerceTimestampUnit: arrow.Second.

func NewArrowWriterProperties

func NewArrowWriterProperties(opts ...WriterOption) ArrowWriterProperties

NewArrowWriterProperties creates a new writer properties object by passing in a set of options to control the properties. Once created, an individual instance of ArrowWriterProperties is immutable.

type ColumnChunkReader

type ColumnChunkReader struct {
	// contains filtered or unexported fields
}

ColumnChunkReader is a reader that reads only a single column chunk from a single column in a single row group

func (ColumnChunkReader) Read

func (ccr ColumnChunkReader) Read(ctx context.Context) (*arrow.Chunked, error)

type ColumnReader

type ColumnReader struct {
	// contains filtered or unexported fields
}

ColumnReader is used for reading batches of data from a specific column across multiple row groups to return a chunked arrow array.

func (*ColumnReader) NextBatch

func (c *ColumnReader) NextBatch(size int64) (*arrow.Chunked, error)

NextBatch returns a chunked array after reading `size` values, potentially across multiple row groups.

type ExtensionCustomParquetType

type ExtensionCustomParquetType interface {
	ParquetLogicalType() schema.LogicalType
}

ExtensionCustomParquetType is an interface that Arrow ExtensionTypes may implement to specify the target LogicalType to use when converting to Parquet.

The PrimitiveType is not configurable, and is determined by a fixed mapping from the extension's StorageType to a Parquet type (see getParquetType in pqarrow source).

type FileReader

type FileReader struct {
	Props    ArrowReadProperties
	Manifest *SchemaManifest
	// contains filtered or unexported fields
}

FileReader is the base object for reading a parquet file into arrow object types.

It provides utility functions for reading record batches, a table, subsets of columns / rowgroups, and so on.

func NewFileReader

func NewFileReader(rdr *file.Reader, props ArrowReadProperties, mem memory.Allocator) (*FileReader, error)

NewFileReader constructs a reader for converting to Arrow objects from an existing parquet file reader object.

Only returns an error if there is some error constructing the schema manifest from the parquet file metadata.

func (*FileReader) GetColumn

func (fr *FileReader) GetColumn(ctx context.Context, i int) (*ColumnReader, error)

GetColumn returns a reader for pulling the data of leaf column index i across all row groups in the file.

func (*FileReader) GetFieldReader

func (fr *FileReader) GetFieldReader(ctx context.Context, i int, includedLeaves map[int]bool, rowGroups []int) (*ColumnReader, error)

GetFieldReader returns a reader for the entire Field of index i which could potentially include reading multiple columns from the underlying parquet file if that field is a nested field.

IncludedLeaves and RowGroups are used to specify precisely which leaf indexes and row groups to read a subset of.

func (*FileReader) GetFieldReaders

func (fr *FileReader) GetFieldReaders(ctx context.Context, colIndices, rowGroups []int) ([]*ColumnReader, *arrow.Schema, error)

GetFieldReaders is for retrieving readers for multiple fields at one time for only the list of column indexes and rowgroups requested. It returns a slice of the readers and the corresponding arrow.Schema for those columns.

func (*FileReader) GetRecordReader

func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups []int) (RecordReader, error)

GetRecordReader returns a record reader that reads only the requested column indexes and row groups.

For both cases, if you pass nil for column indexes or rowgroups it will default to reading all of them.

func (*FileReader) ParquetReader

func (fr *FileReader) ParquetReader() *file.Reader

ParquetReader returns the underlying parquet file reader that it was constructed with

func (*FileReader) ReadColumn

func (fr *FileReader) ReadColumn(rowGroups []int, rdr *ColumnReader) (*arrow.Chunked, error)

ReadColumn reads data to create a chunked array only from the requested row groups.

func (*FileReader) ReadRowGroups

func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []int) (arrow.Table, error)

ReadRowGroups is for generating an array.Table from the file but filtering to only read the requested columns and row groups rather than the entire file which ReadTable does.

func (*FileReader) ReadTable

func (fr *FileReader) ReadTable(ctx context.Context) (arrow.Table, error)

ReadTable reads the entire file into an array.Table

func (*FileReader) RowGroup

func (fr *FileReader) RowGroup(idx int) RowGroupReader

RowGroup creates a reader that will *only* read from the requested row group

func (*FileReader) Schema

func (fr *FileReader) Schema() (*arrow.Schema, error)

Schema returns the arrow schema representation of the underlying file's schema.

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter is an object for writing Arrow directly to a parquet file.

func NewFileWriter

func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error)

NewFileWriter returns a writer for writing Arrow directly to a parquetfile, rather than the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing file.Writer, this will create a new file.Writer based on the schema provided.

func (*FileWriter) AppendKeyValueMetadata

func (fw *FileWriter) AppendKeyValueMetadata(key string, value string) error

AppendKeyValueMetadata appends a key/value pair to the existing key/value metadata

func (*FileWriter) Close

func (fw *FileWriter) Close() error

Close flushes out the data and closes the file. It can be called multiple times, subsequent calls after the first will have no effect.

func (*FileWriter) NewBufferedRowGroup

func (fw *FileWriter) NewBufferedRowGroup()

NewBufferedRowGroup starts a new memory Buffered Row Group to allow writing columns / records without immediately flushing them to disk. This allows using WriteBuffered to write records and decide where to break your row group based on the TotalBytesWritten rather than on the max row group len. If using Records, this should be paired with WriteBuffered, while Write will always write a new record as a row group in and of itself.

func (*FileWriter) NewRowGroup

func (fw *FileWriter) NewRowGroup()

NewRowGroup does what it says on the tin, creates a new row group in the underlying file. Equivalent to `AppendRowGroup` on a file.Writer

func (*FileWriter) NumRows

func (fw *FileWriter) NumRows() int

NumRows returns the total number of rows that have been written so far.

func (*FileWriter) RowGroupNumRows

func (fw *FileWriter) RowGroupNumRows() (int, error)

RowGroupNumRows returns the number of rows written to the current row group. Returns an error if they are unequal between columns that have been written so far.

func (*FileWriter) RowGroupTotalBytesWritten

func (fw *FileWriter) RowGroupTotalBytesWritten() int64

RowGroupTotalBytesWritten returns the total number of bytes written and flushed out in the current row group.

func (*FileWriter) RowGroupTotalCompressedBytes

func (fw *FileWriter) RowGroupTotalCompressedBytes() int64

RowGroupTotalCompressedBytes returns the total number of bytes after compression that have been written to the current row group so far.

func (*FileWriter) Write

func (fw *FileWriter) Write(rec arrow.Record) error

Write an arrow Record Batch to the file, respecting the MaxRowGroupLength in the writer properties to determine whether the record is broken up into more than one row group. At the very least a single row group is created per record, so calling Write always results in a new row group added.

Performance-wise Write might be more favorable than WriteBuffered if you're dealing with: * a highly-restricted memory environment * very large records with lots of rows (potentially close to the max row group length)

func (*FileWriter) WriteBuffered

func (fw *FileWriter) WriteBuffered(rec arrow.Record) error

WriteBuffered will either append to an existing row group or create a new one based on the record length and max row group length.

Additionally, it allows to manually break your row group by checking RowGroupTotalBytesWritten and calling NewBufferedRowGroup, while Write will always create at least 1 row group for the record.

Performance-wise WriteBuffered might be more favorable than Write if you're dealing with: * a loose memory environment (meaning you have a lot of memory to utilize) * records that have only a small (~<1K?) amount of rows

More memory is utilized compared to Write as the whole row group data is kept in memory before it's written since Parquet files must have an entire column written before writing the next column.

func (*FileWriter) WriteColumnChunked

func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error

WriteColumnChunked will write the data provided to the underlying file, using the provided offset and size to allow writing subsets of data from the chunked column. It uses the current column in the underlying row group writer as the starting point, allowing progressive building of writing columns to a file via arrow data without needing to already have a record or table.

func (*FileWriter) WriteColumnData

func (fw *FileWriter) WriteColumnData(data arrow.Array) error

WriteColumnData writes the entire array to the file as the next columns. Like WriteColumnChunked it is based on the current column of the row group writer allowing progressive building of the file by columns without needing a full record or table to write.

func (*FileWriter) WriteTable

func (fw *FileWriter) WriteTable(tbl arrow.Table, chunkSize int64) error

WriteTable writes an arrow table to the underlying file using chunkSize to determine the size to break at for making row groups. Writing a table will always create a new row group for each chunk of chunkSize rows in the table. Calling this with 0 rows will still write a 0 length Row Group to the file.

type RecordReader

type RecordReader interface {
	array.RecordReader
	arrio.Reader
}

RecordReader is a Record Batch Reader that meets the interfaces for both array.RecordReader and arrio.Reader to allow easy progressive reading of record batches from the parquet file. Ideal for streaming.

type RowGroupReader

type RowGroupReader struct {
	// contains filtered or unexported fields
}

RowGroupReader is a reader for getting data only from a single row group of the file rather than having to repeatedly pass the index to functions on the reader.

func (RowGroupReader) Column

func (rgr RowGroupReader) Column(idx int) ColumnChunkReader

Column creates a reader for just the requested column chunk in only this row group.

func (RowGroupReader) ReadTable

func (rgr RowGroupReader) ReadTable(ctx context.Context, colIndices []int) (arrow.Table, error)

ReadTable provides an array.Table consisting only of the columns requested for this rowgroup

type SchemaField

type SchemaField struct {
	Field     *arrow.Field
	Children  []SchemaField
	ColIndex  int
	LevelInfo file.LevelInfo
}

SchemaField is a holder that defines a specific logical field in the schema which could potentially refer to multiple physical columns in the underlying parquet file if it is a nested type.

ColIndex is only populated (not -1) when it is a leaf column.

func (*SchemaField) IsLeaf

func (s *SchemaField) IsLeaf() bool

IsLeaf returns true if the SchemaField is a leaf column, ie: ColIndex != -1

type SchemaManifest

type SchemaManifest struct {
	OriginSchema *arrow.Schema
	SchemaMeta   *arrow.Metadata

	ColIndexToField map[int]*SchemaField
	ChildToParent   map[*SchemaField]*SchemaField
	Fields          []SchemaField
	// contains filtered or unexported fields
}

SchemaManifest represents a full manifest for mapping a Parquet schema to an arrow Schema.

func NewSchemaManifest

func NewSchemaManifest(sc *schema.Schema, meta metadata.KeyValueMetadata, props *ArrowReadProperties) (*SchemaManifest, error)

NewSchemaManifest creates a manifest for mapping a parquet schema to a given arrow schema.

The metadata passed in should be the file level key value metadata from the parquet file or nil. If the ARROW:schema was in the metadata, then it is utilized to determine types.

func (*SchemaManifest) GetColumnField

func (sm *SchemaManifest) GetColumnField(index int) (*SchemaField, error)

GetColumnField returns the corresponding Field for a given column index.

func (*SchemaManifest) GetFieldIndices

func (sm *SchemaManifest) GetFieldIndices(indices []int) ([]int, error)

GetFieldIndices coalesces a list of field indices (relative to the equivalent arrow::Schema) which correspond to the column root (first node below the parquet schema's root group) of each leaf referenced in column_indices.

For example, for leaves `a.b.c`, `a.b.d.e`, and `i.j.k` (column_indices=[0,1,3]) the roots are `a` and `i` (return=[0,2]).

root -- a <------ -- -- b | | -- -- -- c | -- -- -- d | -- -- -- -- e -- f -- -- g -- -- -- h -- i <--- -- -- j | -- -- -- k

func (*SchemaManifest) GetParent

func (sm *SchemaManifest) GetParent(field *SchemaField) *SchemaField

GetParent gets the parent field for a given field if it is a nested column, otherwise returns nil if there is no parent field.

type WriterOption

type WriterOption func(*config)

WriterOption is a convenience for building up arrow writer properties

func WithAllocator

func WithAllocator(mem memory.Allocator) WriterOption

WithAllocator specifies the allocator to be used by the writer whenever allocating buffers and memory.

func WithCoerceTimestamps

func WithCoerceTimestamps(unit arrow.TimeUnit) WriterOption

WithCoerceTimestamps enables coercing of timestamp units to a specific time unit when constructing the schema and writing data so that regardless of the unit used by the datatypes being written, they will be converted to the desired time unit.

func WithDeprecatedInt96Timestamps

func WithDeprecatedInt96Timestamps(enabled bool) WriterOption

WithDeprecatedInt96Timestamps allows specifying to enable conversion of arrow timestamps to int96 columns when constructing the schema. Since int96 is the impala standard, it's technically deprecated in terms of parquet files but is sometimes needed.

func WithNoMapLogicalType

func WithNoMapLogicalType() WriterOption

func WithStoreSchema

func WithStoreSchema() WriterOption

WithStoreSchema enables writing a binary serialized arrow schema to the file in metadata to enable certain read options (like "read_dictionary") to be set automatically

If called, the arrow schema is serialized and base64 encoded before being added to the metadata of the parquet file with the key "ARROW:schema". If the key exists when opening a file for read with pqarrow.FileReader, the schema will be used to choose types and options when constructing the arrow schema of the resulting data.

func WithTruncatedTimestamps

func WithTruncatedTimestamps(allow bool) WriterOption

WithTruncatedTimestamps called with true turns off the error that would be returned if coercing a timestamp unit would cause a loss of data such as converting from nanoseconds to seconds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL