parquet

package module
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2019 License: MIT Imports: 13 Imported by: 0

README

Parquet

Parquet generates a parquet reader and writer based on a struct. The struct can be defined by you or it can be generated by reading an existing parquet file.

We (Parsyl) will respond to pull requests and issues to the best of our abilities. However, sometimes we will have higher priorities and the response might not be immediate.

NOTE: If you generate the code based on a parquet file there are quite a few limitations. The PageType of each PageHeader must be DATA_PAGE and the Codec (defined in ColumnMetaData) must be PLAIN or SNAPPY. Also, the parquet file's schema must consist of the currently supported types. But wait, there's more! Some of the encodings, like DELTA_BINARY_PACKED, BIT_PACKED, PLAIN_DICTIONARY, and DELTA_BYTE_ARRAY are also not supported. I would guess there are other parquet options that will cause problems since there are so many possibilities.

Installation

go get -u github.com/parsyl/parquet/...

This will also install parquet's only two dependencies: thift and snappy

Usage

First define a struct for the data to be written to parquet:

type Person struct {
  	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

Next, add a go:generate comment somewhere (in this example all code lives in main.go):

// go:generate parquetgen -input main.go -type Person -package main

Generate the code for the reader and writer:

$ go generate

A new file (parquet.go) has now been written that defines ParquetWriter and ParquetReader. Next, make use of the writer and reader:

package main

import (
    "bytes"
    "encoding/json"
)

func main() {
    var buf bytes.Buffer
    w, err := NewParquetWriter(&buf)
    if err != nil {
        log.Fatal(err)
    }

    w.Add(Person{ID: 1, Age: getAge(30)})
    w.Add(Person{ID: 2})

    // Each call to write creates a new parquet row group.
    if err := w.Write(); err != nil {
        log.Fatal(err)
    }

    // Close must be called when you are done.  It writes
    // the parquet metadata at the end of the file.
    if err := w.Close(); err != nil {
        log.Fatal(err)
    }

    r, err := NewParquetReader(bytes.NewReader(buf.Bytes()))
    if err != nil {
        log.Fatal(err)
    }

    enc := json.NewEncoder(os.Stdout)
    for r.Next() {
        var p Person
        r.Scan(&p)
        enc.Encode(p)
    }

    if err := r.Error(); err != nil {
        log.Fatal(err)
    }
}

func getAge(a int32) *int32 { return &a }

NewParquetWriter has a couple of optional arguments available: MaxPageSize, Uncompressed, and Snappy. For example, the following sets the page size (number of rows in a page before a new one is created) and sets the page data compression to snappy:

w, err := NewParquetWriter(&buf, MaxPageSize(10000), Snappy)

See this for a complete example of how to generate the code based on an existing struct.

See this for a complete example of how to generate the code based on an existing parquet file.

Supported Types

The struct used to define the parquet data can have the following types:

int32
uint32
int64
uint64
float32
float64
string
bool

Each of these types may be a pointer to indicate that the data is optional. The struct can also embed another struct:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being
	Username string `parquet:"username"`
}

Nested and repeated structs are supported too:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being    Being
	Username string `parquet:"username"`
	Friends  []Being
}

If you want a field to be excluded from parquet you can tag it with a dash or make it unexported like so:

type Being struct {
  	ID  int32  `parquet:"id"`
	Password string`parquet:"-"` //will not be written to parquet
	age int32                    //will not be written to parquet
}

Parquetgen

Parquetgen is the command that go generate should call in order to generate the code for your custom type. It also can print the page headers and file metadata from a parquet file:

$ parquetgen --help
Usage of parquetgen:
  -ignore
        ignore unsupported fields in -type, otherwise log.Fatal is called when an unsupported type is encountered (default true)
  -import string
        import statement of -type if it doesn't live in -package
  -input string
        path to the go file that defines -type
  -metadata
        print the metadata of a parquet file (-parquet) and exit
  -output string
        name of the file that is produced, defaults to parquet.go (default "parquet.go")
  -package string
        package of the generated code
  -pageheaders
        print the page headers of a parquet file (-parquet) and exit (also prints the metadata)
  -parquet string
        path to a parquet file (if you are generating code based on an existing parquet file or printing the file metadata or page headers)
  -struct-output string
        name of the file that is produced, defaults to parquet.go (default "generated_struct.go")
  -type string
        name of the struct that will used for writing and reading

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BoolType

func BoolType(se *sch.SchemaElement)

BoolType is a fieldFunc

func Float32Type

func Float32Type(se *sch.SchemaElement)

Float32Type is a fieldFunc

func Float64Type

func Float64Type(se *sch.SchemaElement)

Float64Type is a fieldFunc

func GetBools

func GetBools(r io.Reader, n int, pageSizes []int) ([]bool, error)

GetBools reads a byte array and turns each bit into a bool

func Int32Type

func Int32Type(se *sch.SchemaElement)

Int32Type is a fieldFunc

func Int64Type

func Int64Type(se *sch.SchemaElement)

Int64Type is a fieldFunc

func OptionalFieldSnappy added in v0.1.0

func OptionalFieldSnappy(r *OptionalField)

OptionalFieldSnappy sets the compression for a column to snappy It is an optional arg to NewOptionalField

func OptionalFieldUncompressed added in v0.1.0

func OptionalFieldUncompressed(o *OptionalField)

OptionalFieldUncompressed sets the compression to none It is an optional arg to NewOptionalField

func PageHeader(r io.Reader) (*sch.PageHeader, error)

PageHeader reads the page header from a column page

func PageHeaders added in v0.3.0

func PageHeaders(footer *sch.FileMetaData, r io.ReadSeeker) ([]sch.PageHeader, error)

PageHeader reads all the page headers without reading the actual data. It is used by parquetgen to print the page headers.

func PageHeadersAtOffset added in v0.3.0

func PageHeadersAtOffset(r io.ReadSeeker, o, n int64) ([]sch.PageHeader, error)

PageHeadersAtOffset seeks to the given offset, then reads the PageHeader without reading the data.

func ReadMetaData added in v0.1.0

func ReadMetaData(r io.ReadSeeker) (*sch.FileMetaData, error)

ReadMetaData reads the FileMetaData from the end of a parquet file

func RepetitionOptional

func RepetitionOptional(se *sch.SchemaElement)

RepetitionOptional...

func RepetitionRepeated added in v0.3.0

func RepetitionRepeated(se *sch.SchemaElement)

RepetitionRepeated...

func RepetitionRequired

func RepetitionRequired(se *sch.SchemaElement)

RepetitionRequired...

func RequiredFieldSnappy added in v0.1.0

func RequiredFieldSnappy(r *RequiredField)

RequiredFieldSnappy sets the compression for a column to snappy It is an optional arg to NewRequiredField

func RequiredFieldUncompressed added in v0.1.0

func RequiredFieldUncompressed(r *RequiredField)

RequiredFieldUncompressed sets the compression to none It is an optional arg to NewRequiredField

func StringType

func StringType(se *sch.SchemaElement)

StringType is a fieldFunc

func Uint32Type

func Uint32Type(se *sch.SchemaElement)

Uint32Type is a fieldFunc

func Uint64Type

func Uint64Type(se *sch.SchemaElement)

Uint64Type is a fieldFunc

Types

type Field

type Field struct {
	Name           string
	Path           []string
	Types          []int
	Type           FieldFunc
	RepetitionType FieldFunc
}

Field holds the type information for a parquet column

type FieldFunc

type FieldFunc func(*sch.SchemaElement)

FieldFunc is used to set some of the metadata for each column

type MaxLevel added in v0.3.0

type MaxLevel struct {
	Def uint8
	Rep uint8
}

MaxLevel holds the maximum definition and repeptition level for a given field.

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata keeps track of the things that need to be kept track of in order to write the FileMetaData at the end of the parquet file.

func New

func New(fields ...Field) *Metadata

New returns a Metadata struct and reads the first row group into memory.

func (*Metadata) Footer

func (m *Metadata) Footer(w io.Writer) error

Footer writes the FileMetaData at the end of the file.

func (*Metadata) NextDoc added in v0.3.0

func (m *Metadata) NextDoc()

NextDoc keeps track of how many documents have been added to this parquet file. The final value of m.docs is used for the FileMetaData.NumRows

func (*Metadata) Pages added in v0.1.0

func (m *Metadata) Pages() (map[string][]Page, error)

Pages maps each column name to its Pages

func (*Metadata) ReadFooter

func (m *Metadata) ReadFooter(r io.ReadSeeker) error

ReadFooter reads the parquet metadata

func (*Metadata) RowGroups added in v0.0.3

func (m *Metadata) RowGroups() []RowGroup

RowGroups returns a summary of each schema.RowGroup

func (*Metadata) Rows

func (m *Metadata) Rows() int64

func (*Metadata) StartRowGroup

func (m *Metadata) StartRowGroup(fields ...Field)

StartRowGroup is called when starting a new row group

func (*Metadata) WritePageHeader

func (m *Metadata) WritePageHeader(w io.Writer, pth []string, dataLen, compressedLen, defCount, count int, defLen, repLen int64, comp sch.CompressionCodec, stats Stats) error

WritePageHeader is called in order to finish writing to a column chunk.

type OptionalField added in v0.0.8

type OptionalField struct {
	Defs []uint8
	Reps []uint8

	MaxLevels MaxLevel

	RepetitionType FieldFunc
	Types          []int
	// contains filtered or unexported fields
}

OptionalField is any exported field in a struct that is a pointer.

func NewOptionalField added in v0.0.8

func NewOptionalField(pth []string, types []int, opts ...func(*OptionalField)) OptionalField

NewOptionalField...

func (*OptionalField) DoRead added in v0.0.8

func (f *OptionalField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, error)

DoRead is called by all optional fields. It reads the definition levels and uses them to interpret the raw data.

func (*OptionalField) DoWrite added in v0.0.8

func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error

DoWrite is called by all optional field types to write the definition levels and raw data to the io.Writer

func (*OptionalField) Name added in v0.0.8

func (f *OptionalField) Name() string

Name returns the column name of this field

func (*OptionalField) Path added in v0.3.0

func (f *OptionalField) Path() []string

Path returns the path of this field

func (*OptionalField) Values added in v0.0.8

func (f *OptionalField) Values() int

Values reads the definition levels and uses them to return the values from the page data.

type Page added in v0.1.0

type Page struct {
	// N is the number of values in the ColumnChunk
	N      int
	Size   int
	Offset int64
	Codec  sch.CompressionCodec
}

Page keeps track of metadata for each ColumnChunk

type RequiredField added in v0.0.8

type RequiredField struct {
	// contains filtered or unexported fields
}

RequiredField writes the raw data for required columns

func NewRequiredField added in v0.0.8

func NewRequiredField(pth []string, opts ...func(*RequiredField)) RequiredField

NewRequiredField creates a new required field.

func (*RequiredField) DoRead added in v0.0.8

func (f *RequiredField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, error)

DoRead reads the actual raw data.

func (*RequiredField) DoWrite added in v0.0.8

func (f *RequiredField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error

DoWrite writes the actual raw data.

func (*RequiredField) Name added in v0.0.8

func (f *RequiredField) Name() string

Name returns the column name of this field

func (*RequiredField) Path added in v0.3.0

func (f *RequiredField) Path() []string

Path returns the path of this field

type RowGroup added in v0.0.8

type RowGroup struct {
	Rows int64
	// contains filtered or unexported fields
}

RowGroup wraps schema.RowGroup and adds accounting functions that are used to keep track of number of rows written, byte size, etc.

func (*RowGroup) Columns added in v0.0.8

func (r *RowGroup) Columns() []*sch.ColumnChunk

Columns returns the Columns of the row group.

type Stats added in v0.3.0

type Stats interface {
	NullCount() *int64
	DistinctCount() *int64
	Min() []byte
	Max() []byte
}

Stats is passed in by each column's call to DoWrite

Directories

Path Synopsis
cmd
examples
internal
gen
rle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL