dynparquet

package
v0.0.0-...-e9ab6f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 25 Imported by: 6

Documentation

Index

Constants

View Source
const (
	// The size of the column indicies in parquet files.
	ColumnIndexSize = 16
)
View Source
const (
	DynamicColumnsKey = "dynamic_columns"
)

Variables

View Source
var (
	SampleDefinition          = samples.SampleDefinition
	NewTestSamples            = samples.NewTestSamples
	PrehashedSampleDefinition = samples.PrehashedSampleDefinition
	SampleDefinitionWithFloat = samples.SampleDefinitionWithFloat
	NewNestedSampleSchema     = samples.NewNestedSampleSchema
	LabelColumn               = samples.LabelColumn
	GenerateTestSamples       = samples.GenerateTestSamples
)
View Source
var ErrMalformedDynamicColumns = errors.New("malformed dynamic columns string")
View Source
var ErrNoDynamicColumns = errors.New("no dynamic columns metadata found, it must be present")

Functions

func DefinitionFromParquetFile

func DefinitionFromParquetFile(file *parquet.File) (*schemapb.Schema, error)

DefinitionFromParquetFile converts a parquet file into a schemapb.Schema.

func FieldByName

func FieldByName(fields []parquet.Field, name string) parquet.Field

func FindChildIndex

func FindChildIndex(fields []parquet.Field, name string) int

func FindHashedColumn

func FindHashedColumn(col string, fields []arrow.Field) int

findHashedColumn finds the index of the column in the given fields that have been prehashed.

func HashArray

func HashArray(arr arrow.Array) []uint64

func HashedColumnName

func HashedColumnName(col string) string

func IsHashedColumn

func IsHashedColumn(col string) bool

func MergeDeduplicatedDynCols

func MergeDeduplicatedDynCols(dyn []string) []string

MergeDeduplicatedDynCols is a light wrapper over sorting the deduplicated dynamic column names provided in dyn. It is extracted as a public method since this merging determines the order in which dynamic columns are stored and components from other packages sometimes need to figure out the physical sort order between dynamic columns.

func MergeDynamicColumnSets

func MergeDynamicColumnSets(sets []map[string][]string) map[string][]string

func ParquetSchemaFromV2Definition

func ParquetSchemaFromV2Definition(def *schemav2pb.Schema) *parquet.Schema

func PrehashColumns

func PrehashColumns(schema *Schema, r arrow.Record) arrow.Record

prehashColumns prehashes the columns in the given record that have been marked as prehashed in the given schema.

func RemoveHashedColumns

func RemoveHashedColumns(r arrow.Record) arrow.Record

RemoveHashedColumns removes the hashed columns from the record.

func SortingColumnsFromDef

func SortingColumnsFromDef(def *schemav2pb.Schema) ([]parquet.SortingColumn, error)

func ToSnakeCase

func ToSnakeCase(str string) string

func ValuesForIndex

func ValuesForIndex(row parquet.Row, index int) []parquet.Value

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer represents an batch of rows with a concrete set of dynamic column names representing how its parquet schema was created off of a dynamic parquet schema.

func ToBuffer

func ToBuffer(s Samples, schema *Schema) (*Buffer, error)

func (*Buffer) Clone

func (b *Buffer) Clone() (*Buffer, error)

func (*Buffer) ColumnChunks

func (b *Buffer) ColumnChunks() []parquet.ColumnChunk

ColumnChunks returns the list of parquet.ColumnChunk for the given index. It contains all the pages associated with this row group's column. Implements the parquet.RowGroup interface.

func (*Buffer) DynamicColumns

func (b *Buffer) DynamicColumns() map[string][]string

DynamicColumns returns the concrete dynamic column names of the buffer. It implements the DynamicRowGroup interface.

func (*Buffer) DynamicRows

func (b *Buffer) DynamicRows() DynamicRowReader

DynamicRows returns an iterator for the rows in the buffer. It implements the DynamicRowGroup interface.

func (*Buffer) NumRows

func (b *Buffer) NumRows() int64

NumRows returns the number of rows in the buffer. Implements the parquet.RowGroup interface.

func (*Buffer) Reset

func (b *Buffer) Reset()

func (*Buffer) Rows

func (b *Buffer) Rows() parquet.Rows

Rows returns an iterator for the rows in the buffer. It implements the parquet.RowGroup interface.

func (*Buffer) Schema

func (b *Buffer) Schema() *parquet.Schema

Schema returns the concrete parquet.Schema of the buffer. Implements the parquet.RowGroup interface.

func (*Buffer) Size

func (b *Buffer) Size() int64

func (*Buffer) Sort

func (b *Buffer) Sort()

func (*Buffer) SortingColumns

func (b *Buffer) SortingColumns() []parquet.SortingColumn

SortingColumns returns the concrete slice of parquet.SortingColumns of the buffer. Implements the parquet.RowGroup interface.

func (*Buffer) String

func (b *Buffer) String() string

func (*Buffer) WriteRowGroup

func (b *Buffer) WriteRowGroup(rg parquet.RowGroup) (int64, error)

WriteRowGroup writes a single row to the buffer.

func (*Buffer) WriteRows

func (b *Buffer) WriteRows(rows []parquet.Row) (int, error)

WriteRow writes a single row to the buffer.

type ColumnDefinition

type ColumnDefinition struct {
	Name          string
	StorageLayout parquet.Node
	Dynamic       bool
	PreHash       bool
}

ColumnDefinition describes a column in a dynamic parquet schema.

type DynamicRow

type DynamicRow struct {
	Row            parquet.Row
	Schema         *parquet.Schema
	DynamicColumns map[string][]string
	// contains filtered or unexported fields
}

func NewDynamicRow

func NewDynamicRow(row parquet.Row, schema *parquet.Schema, dyncols map[string][]string, fields []parquet.Field) *DynamicRow

type DynamicRowGroup

type DynamicRowGroup interface {
	parquet.RowGroup
	fmt.Stringer
	// DynamicColumns returns the concrete dynamic column names that were used
	// create its concrete parquet schema with a dynamic parquet schema.
	DynamicColumns() map[string][]string
	// DynamicRows return an iterator over the rows in the row group.
	DynamicRows() DynamicRowReader
}

DynamicRowGroup is a parquet.RowGroup that can describe the concrete dynamic columns.

func Concat

func Concat(fields []parquet.Field, drg ...DynamicRowGroup) DynamicRowGroup

type DynamicRowGroupMergeAdapter

type DynamicRowGroupMergeAdapter struct {
	// contains filtered or unexported fields
}

DynamicRowGroupMergeAdapter maps a RowBatch with a Schema with a subset of dynamic columns to a Schema with a superset of dynamic columns. It implements the parquet.RowGroup interface.

func NewDynamicRowGroupMergeAdapter

func NewDynamicRowGroupMergeAdapter(
	schema *parquet.Schema,
	sortingColumns []parquet.SortingColumn,
	mergedDynamicColumns map[string][]string,
	originalRowGroup parquet.RowGroup,
) *DynamicRowGroupMergeAdapter

NewDynamicRowGroupMergeAdapter returns a *DynamicRowGroupMergeAdapter, which maps the columns of the original row group to the columns in the super-set schema provided. This allows row groups that have non-conflicting dynamic schemas to be merged into a single row group with a superset parquet schema. The provided schema must not conflict with the original row group's schema it must be strictly a superset, this property is not checked, it is assumed to be true for performance reasons.

func (*DynamicRowGroupMergeAdapter) ColumnChunks

func (a *DynamicRowGroupMergeAdapter) ColumnChunks() []parquet.ColumnChunk

Returns the leaf column at the given index in the group. Searches for the same column in the original batch. If not found returns a column chunk filled with nulls.

func (*DynamicRowGroupMergeAdapter) NumRows

func (a *DynamicRowGroupMergeAdapter) NumRows() int64

Returns the number of rows in the group.

func (*DynamicRowGroupMergeAdapter) Rows

func (a *DynamicRowGroupMergeAdapter) Rows() parquet.Rows

Returns a reader exposing the rows of the row group.

func (*DynamicRowGroupMergeAdapter) Schema

func (a *DynamicRowGroupMergeAdapter) Schema() *parquet.Schema

Returns the schema of rows in the group. The schema is the configured merged, superset schema.

func (*DynamicRowGroupMergeAdapter) SortingColumns

func (a *DynamicRowGroupMergeAdapter) SortingColumns() []parquet.SortingColumn

Returns the list of sorting columns describing how rows are sorted in the group.

The method will return an empty slice if the rows are not sorted.

type DynamicRowReader

type DynamicRowReader interface {
	parquet.RowSeeker
	ReadRows(*DynamicRows) (int, error)
	Close() error
}

DynamicRowReader is an iterator over the rows in a DynamicRowGroup.

type DynamicRowSorter

type DynamicRowSorter struct {
	// contains filtered or unexported fields
}

func NewDynamicRowSorter

func NewDynamicRowSorter(schema *Schema, rows *DynamicRows) *DynamicRowSorter

func (*DynamicRowSorter) Len

func (d *DynamicRowSorter) Len() int

func (*DynamicRowSorter) Less

func (d *DynamicRowSorter) Less(i, j int) bool

func (*DynamicRowSorter) Swap

func (d *DynamicRowSorter) Swap(i, j int)

type DynamicRows

type DynamicRows struct {
	Rows           []parquet.Row
	Schema         *parquet.Schema
	DynamicColumns map[string][]string
	// contains filtered or unexported fields
}

func NewDynamicRows

func NewDynamicRows(
	rows []parquet.Row, schema *parquet.Schema, dynamicColumns map[string][]string, fields []parquet.Field,
) *DynamicRows

func (*DynamicRows) Get

func (r *DynamicRows) Get(i int) *DynamicRow

func (*DynamicRows) GetCopy

func (r *DynamicRows) GetCopy(i int) *DynamicRow

func (*DynamicRows) IsSorted

func (r *DynamicRows) IsSorted(schema *Schema) bool

type MergeOption

type MergeOption func(m *mergeOption)

func WithAlreadySorted

func WithAlreadySorted() MergeOption

func WithDynamicCols

func WithDynamicCols(cols map[string][]string) MergeOption

type MergedRowGroup

type MergedRowGroup struct {
	parquet.RowGroup
	DynCols map[string][]string
	// contains filtered or unexported fields
}

MergedRowGroup allows wrapping any parquet.RowGroup to implement the DynamicRowGroup interface by specifying the concrete dynamic column names the RowGroup's schema contains.

func (*MergedRowGroup) DynamicColumns

func (r *MergedRowGroup) DynamicColumns() map[string][]string

DynamicColumns returns the concrete dynamic column names that were used create its concrete parquet schema with a dynamic parquet schema. Implements the DynamicRowGroup interface.

func (*MergedRowGroup) DynamicRows

func (r *MergedRowGroup) DynamicRows() DynamicRowReader

DynamicRows returns an iterator over the rows in the row group. Implements the DynamicRowGroup interface.

func (*MergedRowGroup) String

func (r *MergedRowGroup) String() string

type NilColumnChunk

type NilColumnChunk struct {
	// contains filtered or unexported fields
}

NilColumnChunk is a column chunk that contains a single page with all null values of the given type, given length and column index of the parent schema. It implements the parquet.ColumnChunk interface.

func NewNilColumnChunk

func NewNilColumnChunk(typ parquet.Type, columnIndex, numValues int) *NilColumnChunk

NewNilColumnChunk creates a new column chunk configured with the given type, column index and number of values in the page.

func (*NilColumnChunk) BloomFilter

func (c *NilColumnChunk) BloomFilter() parquet.BloomFilter

BloomFilter returns the bloomfilter of the column chunk. Since the NilColumnChunk is a virtual column chunk only for in-memory purposes, it returns nil. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) Column

func (c *NilColumnChunk) Column() int

Type returns the index of the column chunk within the parent schema. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) ColumnIndex

func (c *NilColumnChunk) ColumnIndex() (parquet.ColumnIndex, error)

ColumnIndex returns the column index of the column chunk. Since the NilColumnChunk is a virtual column chunk only for in-memory purposes, it returns nil. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) NumValues

func (c *NilColumnChunk) NumValues() int64

NumValues returns the number of values in the column chunk. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) OffsetIndex

func (c *NilColumnChunk) OffsetIndex() (parquet.OffsetIndex, error)

OffsetIndex returns the offset index of the column chunk. Since the NilColumnChunk is a virtual column chunk only for in-memory purposes, it returns nil. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) Pages

func (c *NilColumnChunk) Pages() parquet.Pages

Pages returns an iterator for all pages within the column chunk. This iterator will only ever return a single page filled with all null values of the configured amount. Implements the parquet.ColumnChunk interface.

func (*NilColumnChunk) Type

func (c *NilColumnChunk) Type() parquet.Type

Type returns the type of the column chunk. Implements the parquet.ColumnChunk interface.

type ParquetWriter

type ParquetWriter interface {
	Schema() *parquet.Schema
	Write(rows []any) (int, error)
	WriteRows(rows []parquet.Row) (int, error)
	Flush() error
	Close() error
	Reset(writer io.Writer)
}

type PooledBuffer

type PooledBuffer struct {
	*Buffer
	// contains filtered or unexported fields
}

type PooledParquetSchema

type PooledParquetSchema struct {
	Schema *parquet.Schema
	// contains filtered or unexported fields
}

type PooledWriter

type PooledWriter struct {
	ParquetWriter
	// contains filtered or unexported fields
}

type Sample

type Sample = samples.Sample

type Samples

type Samples = samples.Samples

type Schema

type Schema struct {
	UniquePrimaryIndex bool
	// contains filtered or unexported fields
}

Schema is a dynamic parquet schema. It extends a parquet schema with the ability that any column definition that is dynamic will have columns dynamically created as their column name is seen for the first time.

func NewSampleSchema

func NewSampleSchema() *Schema

func SchemaFromDefinition

func SchemaFromDefinition(msg proto.Message) (*Schema, error)

func SchemaFromParquetFile

func SchemaFromParquetFile(file *parquet.File) (*Schema, error)

SchemaFromParquetFile converts a parquet file into a dnyparquet.Schema.

func (*Schema) Cmp

func (s *Schema) Cmp(a, b *DynamicRow) int

func (*Schema) ColumnByName

func (s *Schema) ColumnByName(name string) (ColumnDefinition, bool)

func (*Schema) ColumnDefinitionsForSortingColumns

func (s *Schema) ColumnDefinitionsForSortingColumns() []ColumnDefinition

func (*Schema) Columns

func (s *Schema) Columns() []ColumnDefinition

func (*Schema) Definition

func (s *Schema) Definition() proto.Message

func (*Schema) FindColumn

func (s *Schema) FindColumn(column string) (ColumnDefinition, bool)

FindColumn returns a column definition for the column passed.

func (*Schema) FindDynamicColumn

func (s *Schema) FindDynamicColumn(dynamicColumnName string) (ColumnDefinition, bool)

FindDynamicColumn returns a dynamic column definition for the column passed.

func (*Schema) FindDynamicColumnForConcreteColumn

func (s *Schema) FindDynamicColumnForConcreteColumn(column string) (ColumnDefinition, bool)

FindDynamicColumnForConcreteColumn returns a column definition for the column passed. So "labels.label1" would return the column definition for the dynamic column "labels" if it exists.

func (*Schema) GetBuffer

func (s *Schema) GetBuffer(dynamicColumns map[string][]string) (*PooledBuffer, error)

func (*Schema) GetDynamicParquetSchema

func (s *Schema) GetDynamicParquetSchema(dynamicColumns map[string][]string) (*PooledParquetSchema, error)

GetDynamicParquetSchema returns a parquet schema of the all columns and the given dynamic columns. The difference with GetParquetSortingSchema is that all columns are included in the parquet schema.

func (*Schema) GetParquetSortingSchema

func (s *Schema) GetParquetSortingSchema(dynamicColumns map[string][]string) (*PooledParquetSchema, error)

GetParquetSortingSchema returns a parquet schema of the sorting columns and the given dynamic columns. The difference with GetDynamicParquetSchema is that non-sorting columns are elided.

func (*Schema) GetWriter

func (s *Schema) GetWriter(w io.Writer, dynamicColumns map[string][]string, sorting bool) (*PooledWriter, error)

func (*Schema) MergeDynamicRowGroups

func (s *Schema) MergeDynamicRowGroups(rowGroups []DynamicRowGroup, options ...MergeOption) (DynamicRowGroup, error)

MergeDynamicRowGroups merges the given dynamic row groups into a single dynamic row group. It merges the parquet schema in a non-conflicting way by merging all the concrete dynamic column names and generating a superset parquet schema that all given dynamic row groups are compatible with.

func (*Schema) Name

func (s *Schema) Name() string

func (*Schema) NewBuffer

func (s *Schema) NewBuffer(dynamicColumns map[string][]string) (*Buffer, error)

NewBuffer returns a new buffer with a concrete parquet schema generated using the given concrete dynamic column names.

func (*Schema) NewBufferV2

func (s *Schema) NewBufferV2(dynamicColumns ...*schemav2pb.Node) (*Buffer, error)

func (*Schema) NewWriter

func (s *Schema) NewWriter(w io.Writer, dynamicColumns map[string][]string, sorting bool, options ...parquet.WriterOption) (ParquetWriter, error)

NewWriter returns a new parquet writer with a concrete parquet schema generated using the given concrete dynamic column names.

func (*Schema) ParquetSchema

func (s *Schema) ParquetSchema() *parquet.Schema

func (Schema) ParquetSortingColumns

func (s Schema) ParquetSortingColumns(
	dynamicColumns map[string][]string,
) []parquet.SortingColumn

ParquetSortingColumns returns the parquet sorting columns for the dynamic sorting columns with the concrete dynamic column names given in the argument.

func (*Schema) PutBuffer

func (s *Schema) PutBuffer(b *PooledBuffer)

func (*Schema) PutPooledParquetSchema

func (s *Schema) PutPooledParquetSchema(ps *PooledParquetSchema)

func (*Schema) PutWriter

func (s *Schema) PutWriter(w *PooledWriter)

func (*Schema) ResetBuffers

func (s *Schema) ResetBuffers()

func (*Schema) ResetWriters

func (s *Schema) ResetWriters()

func (*Schema) RowLessThan

func (s *Schema) RowLessThan(a, b *DynamicRow) bool

func (*Schema) SerializeBuffer

func (s *Schema) SerializeBuffer(w io.Writer, buffer *Buffer) error

func (*Schema) SortingColumns

func (s *Schema) SortingColumns() []SortingColumn

type SerializedBuffer

type SerializedBuffer struct {
	// contains filtered or unexported fields
}

func NewSerializedBuffer

func NewSerializedBuffer(f *parquet.File) (*SerializedBuffer, error)

func ReaderFromBytes

func ReaderFromBytes(buf []byte) (*SerializedBuffer, error)

func (*SerializedBuffer) DynamicColumns

func (b *SerializedBuffer) DynamicColumns() map[string][]string

func (*SerializedBuffer) DynamicRowGroup

func (b *SerializedBuffer) DynamicRowGroup(i int) DynamicRowGroup

func (*SerializedBuffer) DynamicRows

func (b *SerializedBuffer) DynamicRows() DynamicRowReader

func (*SerializedBuffer) MultiDynamicRowGroup

func (b *SerializedBuffer) MultiDynamicRowGroup() DynamicRowGroup

MultiDynamicRowGroup returns all the row groups wrapped in a single multi row group.

func (*SerializedBuffer) NumRowGroups

func (b *SerializedBuffer) NumRowGroups() int

func (*SerializedBuffer) NumRows

func (b *SerializedBuffer) NumRows() int64

func (*SerializedBuffer) ParquetFile

func (b *SerializedBuffer) ParquetFile() *parquet.File

func (*SerializedBuffer) Reader

func (b *SerializedBuffer) Reader() *parquet.GenericReader[any]

func (*SerializedBuffer) String

func (b *SerializedBuffer) String() string

type SortingColumn

type SortingColumn interface {
	parquet.SortingColumn
	ColumnName() string
}

SortingColumn describes a column to sort by in a dynamic parquet schema.

func Ascending

func Ascending(column string) SortingColumn

Ascending constructs a SortingColumn value which dictates to sort by the column in ascending order.

func Descending

func Descending(column string) SortingColumn

Descending constructs a SortingColumn value which dictates to sort by the column in descending order.

func NullsFirst

func NullsFirst(sortingColumn SortingColumn) SortingColumn

NullsFirst wraps the SortingColumn passed as argument so that it instructs the row group to place null values first in the column.

type StorageLayout

type StorageLayout interface {
	GetTypeInt32() int32
	GetRepeated() bool
	GetNullable() bool
	GetEncodingInt32() int32
	GetCompressionInt32() int32
}

func StorageLayoutWrapper

func StorageLayoutWrapper(_ *schemav2pb.StorageLayout) StorageLayout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL