bodkin

package module
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: Apache-2.0 Imports: 17 Imported by: 1

README ΒΆ

Bodkin 🏹

Go Reference

Go library for generating schemas and decoding generic map values and native Go structures to Apache Arrow.

The goal is to provide a useful toolkit to make it easier to use Arrow, and by extension Parquet, especially on data whose schema is evolving or not strictly defined. An example would be with working with data retrieved from a 3rd-party API that does not maintain their OpenAPI spec.

Bodkin enables you to use your data to define and evolve your Arrow Schema.

Features

Arrow schema generation from data type inference
  • Converts a structured input (json string or []byte, Go struct or map[string]any) into an Apache Arrow schema
    • Supports nested types
  • Automatically evolves the Arrow schema with new fields when providing new inputs
  • Option to merge new infered schema at existing path for composibility (bodkin.UnifyAtPath)
  • Converts schema field types when unifying schemas to accept evolving input data (bodkin.WithTypeConversion)
  • Tracks changes to the schema
  • Export/import a serialized Arrow schema to/from file or []byte to transmit or persist schema definition
Custom data loader

πŸš€ Install

Using Bodkin is easy. First, use go get to install the latest version of the library.

go get -u github.com/loicalleyne/bodkin@latest

πŸ’‘ Usage

You can import bodkin using:

import "github.com/loicalleyne/bodkin"

Create a new Bodkin, provide some structured data and print out the resulting Arrow Schema's string representation and any field evaluation errors

var jsonS1 string = `{
    "count": 89,
    "next": "https://sub.domain.com/api/search/?models=thurblig&page=3",
    "previous": null,
    "results": [{"id":7594}],
    "arrayscalar":[],
    "datefield":"1979-01-01",
    "timefield":"01:02:03"
    }`
u, _ := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
u.Unify(jsonS1)
s, _ := u.OriginSchema()
fmt.Printf("original input %v\n", s.String())
for _, e := range u.Err() {
	fmt.Printf("%v : [%s]\n", e.Issue, e.Dotpath)
}
// original input schema:
//   fields: 5
//     - results: type=list<item: struct<id: float64>, nullable>, nullable
//     - datefield: type=date32, nullable
//     - timefield: type=time64[ns], nullable
//     - count: type=float64, nullable
//     - next: type=utf8, nullable
// could not determine type of unpopulated field : [$previous]
// could not determine element type of empty array : [$arrayscalar]

Provide some more structured data and print out the new merged schema and the list of changes

var jsonS2 string = `{
"count": 89.5,
"next": "https://sub.domain.com/api/search/?models=thurblig&page=3",
"previous": "https://sub.domain.com/api/search/?models=thurblig&page=2",
"results": [{"id":7594,"scalar":241.5,"nestedObj":{"strscalar":"str1","nestedarray":[123,456]}}],
"arrayscalar":["str"],
"datetime":"2024-10-24 19:03:09",
"event_time":"2024-10-24T19:03:09+00:00",
"datefield":"2024-10-24T19:03:09+00:00",
"timefield":"1970-01-01"
}`
u.Unify(jsonS2)
schema, _ := u.Schema()
fmt.Printf("\nunified %v\n", schema.String())
fmt.Println(u.Changes())
// unified schema:
//   fields: 9
//     - count: type=float64, nullable
//     - next: type=utf8, nullable
//     - results: type=list<item: struct<id: float64, scalar: float64, nested: struct<strscalar: utf8, nestedarray: list<item: float64, nullable>>>, nullable>, nullable
//     - datefield: type=timestamp[ms, tz=UTC], nullable
//     - timefield: type=utf8, nullable
//     - previous: type=utf8, nullable
//     - datetime: type=timestamp[ms, tz=UTC], nullable
//     - arrayscalar: type=list<item: utf8, nullable>, nullable
//     - event_time: type=timestamp[ms, tz=UTC], nullable
// changes:
// added $previous : utf8
// added $datetime : timestamp[ms, tz=UTC]
// changed $datefield : from date32 to timestamp[ms, tz=UTC]
// added $results.results.elem.scalar : float64
// added $results.results.elem.nested : struct<strscalar: utf8, nestedarray: list<item: float64, nullable>>
// added $arrayscalar : list<item: utf8, nullable>
// added $event_time : timestamp[ms, tz=UTC]
// changed $timefield : from time64[ns] to utf8

Also works with nested Go structs and slices

	stu := Student{
		Name: "StudentName",
		Age:  25,
		ID:   123456,
		Day:  123,
	}
	sch := School{
		Name: "SchoolName",
		Address: AddressType{
			Country: "CountryName",
		},
	}
	e, _ := bodkin.NewBodkin(stu, bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
	sc, err := e.OriginSchema()
	fmt.Printf("original input %v\n", sc.String())
// original input schema:
//   fields: 5
//     - ID: type=int64, nullable
//     - Day: type=int32, nullable
//     - School: type=struct<Name: utf8, Address: struct<Street: utf8, City: utf8, Region: utf8, Country: utf8>>, nullable
//     - Name: type=utf8, nullable
//     - Age: type=int32, nullable
	e.Unify(sch)
	sc, err = e.OriginSchema()
	fmt.Printf("unified %v\n", sc.String())
// unified schema:
//   fields: 5
//     - ID: type=int64, nullable
//     - Day: type=int32, nullable
//     - School: type=struct<Name: utf8, Address: struct<Street: utf8, City: utf8, Region: utf8, Country: utf8>>, nullable
//     - Name: type=utf8, nullable
//     - Age: type=int32, nullable

Export your schema to a file, then import the file to retrieve the schema; or export/import to/from a []byte.

_ = u.ExportSchemaFile("./test.schema")
imp, _ := u.ImportSchemaFile("./test.schema")
fmt.Printf("imported %v\n", imp.String())

bs, _ := u.ExportSchemaBytes()
sc, _ := u.ImportSchemaBytes(bs)
fmt.Printf("imported %v\n", sc.String())

Use a Bodkin Reader to load data to Arrow Records

u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
u.Unify(jsonS1)	// feed data for schema generation
rdr, _ := u.NewReader() // infered schema in Bodkin used to create Reader
rec, _ := rdr.ReadToRecord([]byte(jsonS1)) // Reader loads data and returns Arrow Record

Provide a Bodkin Reader with an io.Reader to load many records

import "github.com/loicalleyne/bodkin/reader"
...
u := bodkin.NewBodkin(bodkin.WithInferTimeUnits(), bodkin.WithTypeConversion())
// Create Reader attached to Bodkin ...
u.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024))
for u.Reader.Next(){
	rec := r.Record()
}
// or create a stand-alone Reader if you have an existing *arrow.Schema
rdr, _ := reader.NewReader(schema, 0, reader.WithIOReader(ff, reader.DefaultDelimiter), reader.WithChunk(1024))
for rdr.Next() {
	rec := r.Record()
...
}

Use the generated Arrow schema with Arrow's built-in JSON reader to decode JSON data into Arrow records

rdr = array.NewJSONReader(strings.NewReader(jsonS2), schema)
defer rdr.Release()
for rdr.Next() {
    rec := rdr.Record()
    rj, _ := rec.MarshalJSON()
    fmt.Printf("\nmarshaled record:\n%v\n", string(rj))
}
// marshaled record:
// [{"arrayscalar":["str"],"count":89.5,"datefield":"2024-10-24 19:03:09Z","datetime":"2024-10-24 19:03:09Z","event_time":"2024-10-24 19:03:09Z","next":"https://sub.domain.com/api/search/?models=thurblig\u0026page=3","previous":"https://sub.domain.com/api/search/?models=thurblig\u0026page=2","results":[{"id":7594,"nested":{"nestedarray":[123,456],"strscalar":"str1"},"scalar":241.5}],"timefield":"1970-01-01"}
// ]

πŸ’« Show your support

Give a ⭐️ if this project helped you! Feedback and PRs welcome.

License

Bodkin is released under the Apache 2.0 license. See LICENCE.txt

Documentation ΒΆ

Overview ΒΆ

Package bodkin is a Go library for generating schemas and decoding generic map values and native Go structures to Apache Arrow. The goal is to provide a useful toolkit to make it easier to use Arrow, and by extension Parquet with data whose shape is evolving or not strictly defined.

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var (
	ErrUndefinedInput            = errors.New("nil input")
	ErrInvalidInput              = errors.New("invalid input")
	ErrNoLatestSchema            = errors.New("no second input has been provided")
	ErrUndefinedFieldType        = errors.New("could not determine type of unpopulated field")
	ErrUndefinedArrayElementType = errors.New("could not determine element type of empty array")
	ErrNotAnUpgradableType       = errors.New("is not an upgradable type")
	ErrPathNotFound              = errors.New("path not found")
	ErrFieldTypeChanged          = errors.New("changed")
	ErrFieldAdded                = errors.New("added")
)

Schema evaluation/evolution errors.

UpgradableTypes are scalar types that can be upgraded to a more flexible type.

Functions ΒΆ

This section is empty.

Types ΒΆ

type Bodkin ΒΆ

type Bodkin struct {
	Reader *reader.DataReader
	// contains filtered or unexported fields
}

Bodkin is a collection of field paths, describing the columns of a structured input(s).

func NewBodkin ΒΆ

func NewBodkin(opts ...Option) *Bodkin

NewBodkin returns a new Bodkin value from a structured input. Input must be a json byte slice or string, a Go struct with exported fields or map[string]any. Any unpopulated fields, empty objects or empty slices in JSON or map[string]any inputs are skipped as their types cannot be evaluated and converted.

func (*Bodkin) Changes ΒΆ

func (u *Bodkin) Changes() error

Changes returns a list of field additions and field type conversions done in the lifetime of the Bodkin object.

func (*Bodkin) Count ΒΆ added in v0.2.0

func (u *Bodkin) Count() int

Count returns the number of datum evaluated for schema to date.

func (*Bodkin) CountPaths ΒΆ added in v0.2.0

func (u *Bodkin) CountPaths() int

Returns count of evaluated field paths.

func (*Bodkin) CountPending ΒΆ added in v0.2.0

func (u *Bodkin) CountPending() int

Returns count of unevaluated field paths.

func (*Bodkin) Err ΒΆ

func (u *Bodkin) Err() []Field

Err returns a []Field that could not be evaluated to date.

func (*Bodkin) ExportSchemaBytes ΒΆ added in v0.2.5

func (u *Bodkin) ExportSchemaBytes() ([]byte, error)

ExportSchemaBytes exports a serialized Arrow Schema.

func (*Bodkin) ExportSchemaFile ΒΆ added in v0.2.5

func (u *Bodkin) ExportSchemaFile(exportPath string) error

ExportSchema exports a serialized Arrow Schema to a file.

func (*Bodkin) ImportSchemaBytes ΒΆ added in v0.2.5

func (u *Bodkin) ImportSchemaBytes(dat []byte) (*arrow.Schema, error)

ImportSchemaBytes imports a serialized Arrow Schema.

func (*Bodkin) ImportSchemaFile ΒΆ added in v0.2.5

func (u *Bodkin) ImportSchemaFile(importPath string) (*arrow.Schema, error)

ImportSchema imports a serialized Arrow Schema from a file.

func (*Bodkin) LastSchema ΒΆ

func (u *Bodkin) LastSchema() (*arrow.Schema, error)

LastSchema returns the Arrow schema generated from the structure/types of the most recent input. Any unpopulated fields, empty objects or empty slices are skipped. ErrNoLatestSchema if Unify() has never been called. A panic recovery error is returned if the schema could not be created.

func (*Bodkin) MaxCount ΒΆ added in v0.2.4

func (u *Bodkin) MaxCount() int

MaxCount returns the maximum number of datum to be evaluated for schema.

func (*Bodkin) NewReader ΒΆ added in v0.3.0

func (u *Bodkin) NewReader(opts ...reader.Option) (*reader.DataReader, error)

func (*Bodkin) Opts ΒΆ added in v0.3.1

func (u *Bodkin) Opts() []Option

func (*Bodkin) OriginSchema ΒΆ

func (u *Bodkin) OriginSchema() (*arrow.Schema, error)

Schema returns the original Arrow schema generated from the structure/types of the initial input, and a panic recovery error if the schema could not be created.

func (*Bodkin) Paths ΒΆ added in v0.2.0

func (u *Bodkin) Paths() []Field

Paths returns a slice of dotpaths of fields successfully evaluated to date.

func (*Bodkin) ResetCount ΒΆ added in v0.2.4

func (u *Bodkin) ResetCount() int

ResetCount resets the count of datum evaluated for schema to date.

func (*Bodkin) ResetMaxCount ΒΆ added in v0.2.4

func (u *Bodkin) ResetMaxCount() int

ResetMaxCount resets the maximum number of datam to be evaluated for schema to maxInt64. ResetCount resets the count of datum evaluated for schema to date.

func (*Bodkin) Schema ΒΆ

func (u *Bodkin) Schema() (*arrow.Schema, error)

Schema returns the current merged Arrow schema generated from the structure/types of the input(s), and a panic recovery error if the schema could not be created. If the Bodkin has a Reader and the schema has been updated since its creation, the Reader will replaced with a new one matching the current schema. Any

func (*Bodkin) Unify ΒΆ

func (u *Bodkin) Unify(a any) error

Unify merges structured input's column definition with the previously input's schema. Any unpopulated fields, empty objects or empty slices in JSON input are skipped.

func (*Bodkin) UnifyAtPath ΒΆ added in v0.2.0

func (u *Bodkin) UnifyAtPath(a any, mergeAt string) error

Unify merges structured input's column definition with the previously input's schema, using a specified valid path as the root. An error is returned if the mergeAt path is not found. Any unpopulated fields, empty objects or empty slices in JSON input are skipped.

func (*Bodkin) UnifyScan ΒΆ added in v0.3.0

func (u *Bodkin) UnifyScan() error

UnifyScan reads from a provided io.Reader and merges each datum's structured input's column definition with the previously input's schema. Any unpopulated fields, empty objects or empty slices in JSON input are skipped.

type Field ΒΆ added in v0.2.0

type Field struct {
	Dotpath string     `json:"dotpath"`
	Type    arrow.Type `json:"arrow_type"`
	// Number of child fields if a nested type
	Childen int `json:"children,omitempty"`
	// Evaluation failure reason
	Issue error `json:"issue,omitempty"`
}

Field represents an element in the input data.

type Option ΒΆ

type Option func(config)

Option configures a Bodkin

func WithIOReader ΒΆ added in v0.3.0

func WithIOReader(r io.Reader, delim byte) Option

WithIOReader provides an io.Reader for a Bodkin to use with UnifyScan(), along with a delimiter to use to split datum in the data stream. Default delimiter '\n' if delimiter is not provided.

func WithInferTimeUnits ΒΆ

func WithInferTimeUnits() Option

WithInferTimeUnits() enables scanning input string values for time, date and timestamp types.

Times use a format of HH:MM or HH:MM:SS[.zzz] where the fractions of a second cannot exceed the precision allowed by the time unit, otherwise unmarshalling will error.

Dates use YYYY-MM-DD format.

Timestamps use RFC3339Nano format except without a timezone, all of the following are valid:

	YYYY-MM-DD
	YYYY-MM-DD[T]HH
	YYYY-MM-DD[T]HH:MM
 YYYY-MM-DD[T]HH:MM:SS[.zzzzzzzzzz]

func WithMaxCount ΒΆ added in v0.2.4

func WithMaxCount(i int) Option

WithMaxCount enables capping the number of Unify evaluations.

func WithQuotedValuesAreStrings ΒΆ added in v0.1.2

func WithQuotedValuesAreStrings() Option

WithTypeConversion enables upgrading the column types to fix compatibilty conflicts.

func WithTypeConversion ΒΆ

func WithTypeConversion() Option

WithTypeConversion enables upgrading the column types to fix compatibilty conflicts.

Directories ΒΆ

Path Synopsis
cmd
Package reader contains helpers for reading data and loading to Arrow.
Package reader contains helpers for reading data and loading to Arrow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL