parquet

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2019 License: MIT Imports: 13 Imported by: 0

README

Parquet

Parquet generates a parquet reader and writer based on a struct. The struct can be defined by you or it can be generated by reading an existing parquet file.

We (Parsyl) will respond to pull requests and issues to the best of our abilities. However, sometimes we will have higher priorities and the response might not be immediate.

NOTE: If you generate the code based on a parquet file there are quite a few limitations. The PageType of each PageHeader must be DATA_PAGE and the Codec (defined in ColumnMetaData) must be PLAIN or SNAPPY. Also, the parquet file's schema must consist of the currently supported types. But wait, there's more! Some of the encodings, like DELTA_BINARY_PACKED, BIT_PACKED, PLAIN_DICTIONARY, and DELTA_BYTE_ARRAY are also not supported. I would guess there are other parquet options that will cause problems since there are so many possibilities.

Installation

go get -u github.com/parsyl/parquet/...

This will also install parquet's only dependency: thift

Usage

First define a struct for the data to be written to parquet:

type Person struct {
  	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

Next, add a go:generate comment somewhere (in this example all code lives in main.go):

//go:generate parquetgen -input main.go -type Person -package main

Generate the code for the reader and writer:

$ go generate

A new file (parquet.go) has now been written that defines ParquetWriter and ParquetReader. Next, make use of the writer and reader:

package main

import (
    "bytes"
    "encoding/json"
)

func main() {
    var buf bytes.Buffer
    w, err := NewParquetWriter(&buf)
    if err != nil {
        log.Fatal(err)
    }

    w.Add(Person{ID: 1, Age: getAge(30)})
    w.Add(Person{ID: 2})

    // Each call to write creates a new parquet row group.
    if err := w.Write(); err != nil {
        log.Fatal(err)
    }

    // Close must be called when you are done.  It writes
    // the parquet metadata at the end of the file.
    if err := w.Close(); err != nil {
        log.Fatal(err)
    }

    r, err := NewParquetReader(bytes.NewReader(buf.Bytes()))
    if err != nil {
        log.Fatal(err)
    }

    enc := json.NewEncoder(os.Stdout)
    for r.Next() {
        var p Person
        r.Scan(&p)
        enc.Encode(p)
    }

    if err := r.Error(); err != nil {
        log.Fatal(err)
    }
}

func getAge(a int32) *int32 { return &a }

NewParquetWriter has a couple of optional arguments available: MaxPageSize, Uncompressed, and Snappy. For example, the following sets the page size (number of rows in a page before a new one is created) and sets the page data compression to snappy:

    w, err := NewParquetWriter(&buf, MaxPageSize(10000), Snappy)

See this for a complete example of how to generate the code based on an existing struct.

See this for a complete example of how to generate the code based on an existing parquet file.

Supported Types

The struct used to define the parquet data can have the following types:

int32
uint32
int64
uint64
float32
float64
string
bool

Each of these types may be a pointer to indicate that the data is optional. The struct can also embed another struct:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being
	Username string `parquet:"username"`
}

Nested and repeated structs are supported too:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being    Being
	Username string `parquet:"username"`
	Friends  []Being
}

If you want a field to be excluded from parquet you can tag it with a dash or make it unexported like so:

type Being struct {
  	ID  int32  `parquet:"id"`
	Password string`parquet:"-"` //will not be written to parquet
	age int32                    //will not be written to parquet
}

Parquetgen

Parquetgen is the command that go generate should call in order to generate the code for your custom type. It also can print the page headers and file metadata from a parquet file:

$ parquetgen --help
Usage of parquetgen:
  -ignore
        ignore unsupported fields in -type, otherwise log.Fatal is called when an unsupported type is encountered (default true)
  -import string
        import statement of -type if it doesn't live in -package
  -input string
        path to the go file that defines -type
  -metadata
        print the metadata of a parquet file (-parquet) and exit
  -output string
        name of the file that is produced, defaults to parquet.go (default "parquet.go")
  -package string
        package of the generated code
  -pageheaders
        print the page headers of a parquet file (-parquet) and exit (also prints the metadata)
  -parquet string
        path to a parquet file (if you are generating code based on an existing parquet file or printing the file metadata or page headers)
  -struct-output string
        name of the file that is produced, defaults to parquet.go (default "generated_struct.go")
  -type string
        name of the struct that will used for writing and reading

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BoolType

func BoolType(se *sch.SchemaElement)

func Float32Type

func Float32Type(se *sch.SchemaElement)

func Float64Type

func Float64Type(se *sch.SchemaElement)

func GetBools

func GetBools(r io.Reader, n int, pageSizes []int) ([]bool, error)

GetBools reads a byte array and turns each bit into a bool

func Int32Type

func Int32Type(se *sch.SchemaElement)

func Int64Type

func Int64Type(se *sch.SchemaElement)

func OptionalFieldSnappy added in v0.1.0

func OptionalFieldSnappy(r *OptionalField)

OptionalFieldSnappy sets the compression for a column to snappy It is an optional arg to NewOptionalField

func OptionalFieldUncompressed added in v0.1.0

func OptionalFieldUncompressed(o *OptionalField)

OptionalFieldUncompressed sets the compression to none It is an optional arg to NewOptionalField

func PageHeader(r io.Reader) (*sch.PageHeader, error)

PageHeader reads the page header from a column page

func PageHeaders added in v0.3.0

func PageHeaders(footer *sch.FileMetaData, r io.ReadSeeker) ([]sch.PageHeader, error)

func PageHeadersAtOffset added in v0.3.0

func PageHeadersAtOffset(r io.ReadSeeker, o, n int64) ([]sch.PageHeader, error)

func Pfloat32 added in v0.3.0

func Pfloat32(f float32) *float32

func Pfloat64 added in v0.3.0

func Pfloat64(f float64) *float64

func Pint32 added in v0.3.0

func Pint32(i int32) *int32

func Pint64 added in v0.3.0

func Pint64(i int64) *int64

func Puint32 added in v0.3.0

func Puint32(i uint32) *uint32

func Puint64 added in v0.3.0

func Puint64(i uint64) *uint64

func ReadMetaData added in v0.1.0

func ReadMetaData(r io.ReadSeeker) (*sch.FileMetaData, error)

ReadMetaData reads the FileMetaData from the end of a parquet file

func RepetitionOptional

func RepetitionOptional(se *sch.SchemaElement)

func RepetitionRepeated added in v0.3.0

func RepetitionRepeated(se *sch.SchemaElement)

func RepetitionRequired

func RepetitionRequired(se *sch.SchemaElement)

func RequiredFieldSnappy added in v0.1.0

func RequiredFieldSnappy(r *RequiredField)

RequiredFieldSnappy sets the compression for a column to snappy It is an optional arg to NewRequiredField

func RequiredFieldUncompressed added in v0.1.0

func RequiredFieldUncompressed(r *RequiredField)

RequiredFieldUncompressed sets the compression to none It is an optional arg to NewRequiredField

func StringType

func StringType(se *sch.SchemaElement)

func Uint32Type

func Uint32Type(se *sch.SchemaElement)

func Uint64Type

func Uint64Type(se *sch.SchemaElement)

Types

type Field

type Field struct {
	Name           string
	Path           []string
	Types          []int
	Type           FieldFunc
	RepetitionType FieldFunc
}

Field holds the type information for a parquet column

type FieldFunc

type FieldFunc func(*sch.SchemaElement)

FieldFunc is used to set some of the metadata for each column

type IntStats added in v0.3.0

type IntStats struct {
	// contains filtered or unexported fields
}

RequiredField writes the raw data for required columns

func NewIntStats added in v0.3.0

func NewIntStats(len int) IntStats

func (IntStats) Statistics added in v0.3.0

func (i IntStats) Statistics(min, max int64) *sch.Statistics

type MaxLevel added in v0.3.0

type MaxLevel struct {
	Def uint8
	Rep uint8
}

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata keeps track of the things that need to be kept track of in order to write the FileMetaData at the end of the parquet file.

func New

func New(fields ...Field) *Metadata

New returns a Metadata struct and reads the first row group into memory.

func (*Metadata) Footer

func (m *Metadata) Footer(w io.Writer) error

Footer writes the FileMetaData at the end of the file.

func (*Metadata) NextDoc added in v0.3.0

func (m *Metadata) NextDoc()

NextDoc keeps track of how many documents have been added to this parquet file. The final value of m.docs is used for the FileMetaData.NumRows

func (*Metadata) Pages added in v0.1.0

func (m *Metadata) Pages() (map[string][]Page, error)

Pages maps each column name to its Pages

func (*Metadata) ReadFooter

func (m *Metadata) ReadFooter(r io.ReadSeeker) error

ReadFooter reads the parquet metadata

func (*Metadata) RowGroups added in v0.0.3

func (m *Metadata) RowGroups() []RowGroup

RowGroups returns a summary of each schema.RowGroup

func (*Metadata) Rows

func (m *Metadata) Rows() int64

func (*Metadata) StartRowGroup

func (m *Metadata) StartRowGroup(fields ...Field)

StartRowGroup is called when starting a new row group

func (*Metadata) WritePageHeader

func (m *Metadata) WritePageHeader(w io.Writer, pth []string, dataLen, compressedLen, defCount, count int, defLen, repLen int64, comp sch.CompressionCodec, stats Stats) error

WritePageHeader is called when no more data is written to a column chunk

type OptionalField added in v0.0.8

type OptionalField struct {
	Defs []uint8
	Reps []uint8

	MaxLevels MaxLevel

	RepetitionType FieldFunc
	Types          []int
	// contains filtered or unexported fields
}

func NewOptionalField added in v0.0.8

func NewOptionalField(pth []string, types []int, opts ...func(*OptionalField)) OptionalField

func (*OptionalField) DoRead added in v0.0.8

func (f *OptionalField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, error)

DoRead is called by all optional fields. It reads the definition levels and uses them to interpret the raw data.

func (*OptionalField) DoWrite added in v0.0.8

func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error

DoWrite is called by all optional field types to write the definition levels and raw data to the io.Writer

func (*OptionalField) Key added in v0.3.0

func (f *OptionalField) Key() string

func (*OptionalField) Name added in v0.0.8

func (f *OptionalField) Name() string

Name returns the column name of this field

func (*OptionalField) NilCount added in v0.3.0

func (f *OptionalField) NilCount() *int64

func (*OptionalField) Path added in v0.3.0

func (f *OptionalField) Path() []string

func (*OptionalField) Values added in v0.0.8

func (f *OptionalField) Values() int

Values reads the definition levels and uses them to return the values from the page data.

type Page added in v0.1.0

type Page struct {
	// N is the number of values in the ColumnChunk
	N      int
	Size   int
	Offset int64
	Codec  sch.CompressionCodec
}

Page keeps track of metadata for each ColumnChunk

type RequiredField added in v0.0.8

type RequiredField struct {
	// contains filtered or unexported fields
}

func NewRequiredField added in v0.0.8

func NewRequiredField(pth []string, opts ...func(*RequiredField)) RequiredField

NewRequiredField creates a new required field.

func (*RequiredField) DoRead added in v0.0.8

func (f *RequiredField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, error)

func (*RequiredField) DoWrite added in v0.0.8

func (f *RequiredField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error

DoWrite writes the actual raw data.

func (*RequiredField) Key added in v0.3.0

func (f *RequiredField) Key() string

func (*RequiredField) Name added in v0.0.8

func (f *RequiredField) Name() string

func (*RequiredField) Path added in v0.3.0

func (f *RequiredField) Path() []string

type RowGroup added in v0.0.8

type RowGroup struct {
	Rows int64
	// contains filtered or unexported fields
}

RowGroup wraps schema.RowGroup and adds accounting functions that are used to keep track of number of rows written, byte size, etc.

func (*RowGroup) Columns added in v0.0.8

func (r *RowGroup) Columns() []*sch.ColumnChunk

Columns returns the Columns of the row group.

type Stats added in v0.3.0

type Stats interface {
	NullCount() *int64
	DistinctCount() *int64
	Min() []byte
	Max() []byte
}

Stats is passed in by each column's call to DoWrite

type UintStats added in v0.3.0

type UintStats struct {
	// contains filtered or unexported fields
}

func NewUintStats added in v0.3.0

func NewUintStats(len int) UintStats

func (UintStats) Statistics added in v0.3.0

func (i UintStats) Statistics(min, max uint64) *sch.Statistics

Directories

Path Synopsis
cmd
examples
internal
gen
rle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL