goavro

package module
v0.0.0-...-47784ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2015 License: Apache-2.0 Imports: 15 Imported by: 0

README

goavro

Description

Goavro is a golang library that implements encoding and decoding of Avro data. It provides an interface to encode data directly to io.Writer streams, and decoding data from io.Reader streams. Goavro fully adheres to version 1.7.7 of the Avro specification.

Resources

Usage

Documentation is available via GoDoc.

Please see the example programs in the examples directory for reference.

Although the Avro specification defines the terms reader and writer as library components which read and write Avro data, Go has particular strong emphasis on what a Reader and Writer are. Namely, it is bad form to define an interface which shares the same name but uses a different method signature. In other words, all Reader interfaces should effectively mirror an io.Reader, and all Writer interfaces should mirror an io.Writer. Adherance to this standard is essential to keep libraries easy to use.

An io.Reader reads data from the stream specified at object creation time into the parameterized slice of bytes and returns both the number of bytes read and an error. An Avro reader also reads from a stream, but it is possible to create an Avro reader that can read from one stream, then read from another, using the same compiled schema. In other words, an Avro reader puts the schema first, whereas an io.Reader puts the stream first.

To support an Avro reader being able to read from multiple streams, it's API must be different and incompatible with io.Reader interface from the Go standard. Instead, an Avro reader looks more like the Unmarshal functionality provided by the Go encoding/json library.

Codec interface

Creating a goavro.Codec is fast, but ought to be performed exactly once per Avro schema to process. Once a Codec is created, it may be used multiple times to either decode or encode data.

The Codec interface exposes two methods, one to encode data and one to decode data. They encode directly into an io.Writer, and decode directly from an io.Reader.

A particular Codec can work with only one Avro schema. However, there is no practical limit to how many Codecs may be created and used in a program. Internally a goavro.codec is merely a namespace and two function pointers to decode and encode data. Because codecs maintain no state, the same Codec can be concurrently used on different io streams as desired.

    func (c *codec) Decode(r io.Reader) (interface{}, error)
    func (c *codec) Encode(w io.Writer, datum interface{}) error
Creating a Codec

The below is an example of creating a Codec from a provided JSON schema. Codecs do not maintain any internal state, and may be used multiple times on multiple io.Readers, io.Writers, concurrently if desired.

    someRecordSchemaJson := `{"type":"record","name":"Foo","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string","default":"happy"}]}`
    codec, err := goavro.NewCodec(someRecordSchemaJson)
    if err != nil {
        return nil, err
    }
Decoding data

The below is a simplified example of decoding binary data to be read from an io.Reader into a single datum using a previously compiled Codec. The Decode method of the Codec interface may be called multiple times, each time on the same or on different io.Reader objects.

    // uses codec created above, and an io.Reader, definition not shown
    datum, err := codec.Decode(r)
    if err != nil {
        return nil, err
    }
Encoding data

The below is a simplified example of encoding a single datum into the Avro binary format using a previously compiled Codec. The Encode method of the Codec interface may be called multiple times, each time on the same or on different io.Writer objects.

    // uses codec created above, an io.Writer, definition not shown,
    // and some data
    err := codec.Encode(w, datum)
    if err != nil {
        return nil, err
    }

Another example, this time leveraging bufio.Writer:

    // Encoding data using bufio.Writer to buffer the writes
    // during data encoding:

    func encodeWithBufferedWriter(c Codec, w io.Writer, datum interface{}) error {
        bw := bufio.NewWriter(w)
        err := c.Encode(bw, datum)
        if err != nil {
            return err
        }
        return bw.Flush()
    }

    err := encodeWithBufferedWriter(codec, w, datum)
    if err != nil {
        return nil, err
    }
Reader and Writer helper types

The Codec interface provides means to encode and decode any Avro data, but a number of additional helper types are provided to handle streaming of Avro data.

See the example programs examples/file/reader.go and examples/file/writer.go for more context:

This example wraps the provided io.Reader in a bufio.Reader and dumps the data to standard output.

func dumpReader(r io.Reader) {
	fr, err := goavro.NewReader(goavro.BufferFromReader(r))
	if err != nil {
		log.Fatal("cannot create Reader: ", err)
	}
	defer func() {
		if err := fr.Close(); err != nil {
			log.Fatal(err)
		}
	}()

	for fr.Scan() {
		datum, err := fr.Read()
		if err != nil {
			log.Println("cannot read datum: ", err)
			continue
		}
		fmt.Println(datum)
	}
}

This example buffers the provided io.Writer in a bufio.Writer, and writes some data to the stream.

func makeSomeData(w io.Writer) error {
    recordSchema := `
    {
      "type": "record",
      "name": "example",
      "fields": [
        {
          "type": "string",
          "name": "username"
        },
        {
          "type": "string",
          "name": "comment"
        },
        {
          "type": "long",
          "name": "timestamp"
        }
      ]
    }
    `
	fw, err := goavro.NewWriter(
		goavro.BlockSize(13), // example; default is 10
		goavro.Compression(goavro.CompressionSnappy), // default is CompressionNull
		goavro.WriterSchema(recordSchema),
		goavro.ToWriter(w))
	if err != nil {
		log.Fatal("cannot create Writer: ", err)
	}
	defer fw.Close()

    // make a record instance using the same schema
	someRecord, err := goavro.NewRecord(goavro.RecordSchema(recordSchema))
	if err != nil {
		log.Fatal(err)
	}
	// identify field name to set datum for
	someRecord.Set("username", "Aquaman")
	someRecord.Set("comment", "The Atlantic is oddly cold this morning!")
	// you can fully qualify the field name
	someRecord.Set("com.example.timestamp", int64(1082196484))
    fw.Write(someRecord)

    // make another record
	someRecord, err = goavro.NewRecord(goavro.RecordSchema(recordSchema))
	if err != nil {
		log.Fatal(err)
	}
	someRecord.Set("username", "Batman")
	someRecord.Set("comment", "Who are all of these crazies?")
	someRecord.Set("com.example.timestamp", int64(1427383430))
    fw.Write(someRecord)
}

Limitations

Goavro is a fully featured encoder and decoder of binary Avro data. It fully supports recursive data structures, unions, and namespacing. It does have a few limitations that have yet to be implemented.

Aliases

The Avro specification allows an implementation to optionally map a writer's schema to a reader's schema using aliases. Although goavro can complile schemas with aliases, it does not yet implement this feature.

JSON Encoding

The Avro Data Serialization format describes two encodings: binary and JSON. Goavro only implements binary encoding of data streams, because that is what most applications need.

Most applications will use the binary encoding, as it is smaller and faster. But, for debugging and web-based applications, the JSON encoding may sometimes be appropriate.

Note that data schemas are always encoded using JSON, as per the specification.

Kafka Streams

Kakfa is the reason goavro was written. Similar to Avro Object Container Files being a layer of abstraction above Avro Data Serialization format, Kafka's use of Avro is a layer of abstraction that also sits above Avro Data Serialization format, but has its own schema. Like Avro Object Container Files, this has been implemented but removed until the API can be improved.

License

Goavro license

Copyright 2015 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, 
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.Copyright [201X] LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, 
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Google Snappy license

Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
  • Neither the name of Google Inc. nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

Third Party Dependencies

Google Snappy

Goavro links with Google Snappy to provide Snappy compression and decompression support.

Documentation

Overview

Package goavro is a library that encodes and decodes of Avro data. It provides an interface to encode data directly to io.Writer streams, and to decode data from io.Reader streams. Goavro fully adheres to version 1.7.7 of the Avro specification and data encoding.

Index

Constants

View Source
const (
	CompressionNull    = "null"
	CompressionDeflate = "deflate"
	CompressionSnappy  = "snappy"
)

Compression codecs that Reader and Writer instances can process.

View Source
const DefaultWriterBlockSize = 10

DefaultWriterBlockSizeo specifies the default number of datum items in a block when writing.

Variables

This section is empty.

Functions

func IsCompressionCodecSupported

func IsCompressionCodecSupported(someCodec string) bool

IsCompressionCodecSupported returns true if and only if the specified codec string is supported by this library.

Types

type Codec

type Codec interface {
	Decoder
	Encoder
	Schema() string
	NewWriter(...WriterSetter) (*Writer, error)
}

The Codec interface supports both Decode and Encode operations.

func NewCodec

func NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error)

NewCodec creates a new object that supports both the Decode and Encode methods. It requires an Avro schema, expressed as a JSON string. The codec is created in a new Symtab.

  codec, err := goavro.NewCodec(someJSONSchema)
  if err != nil {
      return nil, err
  }

  // Decoding data uses codec created above, and an io.Reader,
  // definition not shown:
  datum, err := codec.Decode(r)
  if err != nil {
      return nil, err
  }

  // Encoding data uses codec created above, an io.Writer,
  // definition not shown, and some data:
  err := codec.Encode(w, datum)
  if err != nil {
      return nil, err
  }

  // Encoding data using bufio.Writer to buffer the writes
  // during data encoding:

  func encodeWithBufferedWriter(c Codec, w io.Writer, datum interface{}) error {
	bw := bufio.NewWriter(w)
	err := c.Encode(bw, datum)
	if err != nil {
		return err
	}
	return bw.Flush()
  }

  err := encodeWithBufferedWriter(codec, w, datum)
  if err != nil {
      return nil, err
  }

type CodecSetter

type CodecSetter func(Codec) error

CodecSetter functions are those those which are used to modify a newly instantiated Codec.

type Datum

type Datum struct {
	Value interface{}
	Err   error
}

Datum binds together a piece of data and any error resulting from either reading or writing that datum.

type Decoder

type Decoder interface {
	Decode(io.Reader) (interface{}, error)
}

Decoder interface specifies structures that may be decoded.

type Encoder

type Encoder interface {
	Encode(io.Writer, interface{}) error
}

Encoder interface specifies structures that may be encoded.

type ErrCodecBuild

type ErrCodecBuild struct {
	Message string
	Err     error
}

ErrCodecBuild is returned when the encoder encounters an error.

func (ErrCodecBuild) Error

func (e ErrCodecBuild) Error() string

type ErrDecoder

type ErrDecoder struct {
	Message string
	Err     error
}

ErrDecoder is returned when the encoder encounters an error.

func (ErrDecoder) Error

func (e ErrDecoder) Error() string

type ErrEncoder

type ErrEncoder struct {
	Message string
	Err     error
}

ErrEncoder is returned when the encoder encounters an error.

func (ErrEncoder) Error

func (e ErrEncoder) Error() string

type ErrInvalidName

type ErrInvalidName struct {
	Message string
}

ErrInvalidName is returned when a Codec cannot be created due to invalid name format.

func (ErrInvalidName) Error

func (e ErrInvalidName) Error() string

type ErrReader

type ErrReader struct {
	Message string
	Err     error
}

ErrReader is returned when the reader encounters an error.

func (ErrReader) Error

func (e ErrReader) Error() string

type ErrReaderBlockCount

type ErrReaderBlockCount struct {
	Err error
}

ErrReaderBlockCount is returned when a reader detects an error while attempting to read the block count and block size.

func (*ErrReaderBlockCount) Error

func (e *ErrReaderBlockCount) Error() string

type ErrReaderInit

type ErrReaderInit struct {
	Message string
	Err     error
}

ErrReaderInit is returned when the encoder encounters an error.

func (ErrReaderInit) Error

func (e ErrReaderInit) Error() string

type ErrSchemaParse

type ErrSchemaParse struct {
	Message string
	Err     error
}

ErrSchemaParse is returned when a Codec cannot be created due to an error while reading or parsing the schema.

func (ErrSchemaParse) Error

func (e ErrSchemaParse) Error() string

type ErrWriterInit

type ErrWriterInit struct {
	Message string
	Err     error
}

ErrWriterInit is returned when an error is created during Writer initialization.

func (*ErrWriterInit) Error

func (e *ErrWriterInit) Error() string

Error converts the error instance to a string.

type Reader

type Reader struct {
	CompressionCodec string
	DataSchema       string
	Sync             []byte
	// contains filtered or unexported fields
}

Reader structure contains data necessary to read Avro files.

func NewReader

func NewReader(setters ...ReaderSetter) (*Reader, error)

NewReader returns a object to read data from an io.Reader using the Avro Object Container Files format.

func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:8080")
    if err != nil {
        log.Fatal(err)
    }
    fr, err := goavro.NewReader(goavro.FromReader(conn))
    if err != nil {
        log.Fatal("cannot create Reader: ", err)
    }
    defer func() {
        if err := fr.Close(); err != nil {
            log.Fatal(err)
        }
    }()

    for fr.Scan() {
        datum, err := fr.Read()
        if err != nil {
            log.Println("cannot read datum: ", err)
            continue
        }
        fmt.Println("RECORD: ", datum)
    }
}

func (*Reader) Close

func (fr *Reader) Close() error

Close releases resources and returns any Reader errors.

func (*Reader) Read

func (fr *Reader) Read() (interface{}, error)

Read returns the next element from the Reader.

func (*Reader) Scan

func (fr *Reader) Scan() bool

Scan returns true if more data is ready to be read.

type ReaderSetter

type ReaderSetter func(*Reader) error

ReaderSetter functions are those those which are used to instantiate a new Reader.

func BufferFromReader

func BufferFromReader(r io.Reader) ReaderSetter

BufferFromReader wraps the specified `io.Reader` using a `bufio.Reader` to read from a file.

func FromReader

func FromReader(r io.Reader) ReaderSetter

FromReader specifies the `io.Reader` to use when reading a file.

type Record

type Record struct {
	Name   string
	Fields []*recordField
	// contains filtered or unexported fields
}

Record is an abstract data type used to hold data corresponding to an Avro record. Wherever an Avro schema specifies a record, this library's Decode method will return a Record initialized to the record's values read from the io.Reader. Likewise, when using Encode to convert data to an Avro record, it is necessary to create and send a Record instance to the Encoder method.

func NewRecord

func NewRecord(setters ...RecordSetter) (*Record, error)

NewRecord will create a Record instance corresponding to the specified schema.

func recordExample(codec goavro.Codec, w io.Writer, recordSchema string) error {
     // To encode a Record, you need to instantiate a Record instance
     // that adheres to the schema the Encoder expect.
     someRecord, err := goavro.NewRecord(goavro.RecordSchema(recordSchema))
     if err != nil {
         return err
     }
     // Once you have a Record, you can set the values of the various fields.
     someRecord.Set("username", "Aquaman")
     someRecord.Set("comment", "The Atlantic is oddly cold this morning!")
     // Feel free to fully qualify the field name if you'd like
     someRecord.Set("com.example.timestamp", int64(1082196484))

     // Once the fields of the Record have the correct data, you can encode it
     err = codec.Encode(w, someRecord)
     return err
 }

func (Record) Get

func (r Record) Get(fieldName string) (interface{}, error)

Get returns the datum of the specified Record field.

func (Record) GetFieldSchema

func (r Record) GetFieldSchema(fieldName string) (interface{}, error)

GetFieldSchema returns the schema of the specified Record field.

func (Record) Set

func (r Record) Set(fieldName string, value interface{}) error

Set updates the datum of the specified Record field.

func (Record) String

func (r Record) String() string

String returns a string representation of the Record.

type RecordSetter

type RecordSetter func(*Record) error

RecordSetter functions are those those which are used to instantiate a new Record.

func RecordEnclosingNamespace

func RecordEnclosingNamespace(someNamespace string) RecordSetter

RecordEnclosingNamespace specifies the enclosing namespace of the record to create. For instance, if the enclosing namespace is `com.example`, and the record name is `Foo`, then the full record name will be `com.example.Foo`.

func RecordSchema

func RecordSchema(recordSchemaJSON string) RecordSetter

RecordSchema specifies the schema of the record to create. Schema must be a JSON string.

type Symtab

type Symtab map[string]*codec // map full name to codec

func NewSymtab

func NewSymtab() Symtab

func (Symtab) NewCodec

func (st Symtab) NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error)

Create the codec in the specified Symtab

type Writer

type Writer struct {
	CompressionCodec string
	Sync             []byte
	// contains filtered or unexported fields
}

Writer structure contains data necessary to write Avro files.

func NewWriter

func NewWriter(setters ...WriterSetter) (*Writer, error)

NewWriter returns a object to write data to an io.Writer using the Avro Object Container Files format.

func serveClient(conn net.Conn, codec goavro.Codec) {
    fw, err := goavro.NewWriter(
        goavro.BlockSize(100),                 // flush data every 100 items
        goavro.BlockTick(10 * time.Second),    // but at least every 10 seconds
        goavro.Compression(goavro.CompressionSnappy),
        goavro.ToWriter(conn),
        goavro.UseCodec(codec))
    if err != nil {
        log.Fatal("cannot create Writer: ", err)
    }
    defer fw.Close()

    // create a record that matches the schema we want to encode
    someRecord, err := goavro.NewRecord(goavro.RecordSchema(recordSchema))
    if err != nil {
        log.Fatal(err)
    }
    // identify field name to set datum for
    someRecord.Set("username", "Aquaman")
    someRecord.Set("comment", "The Atlantic is oddly cold this morning!")
    // you can fully qualify the field name
    someRecord.Set("com.example.timestamp", int64(1082196484))
    fw.Write(someRecord)

    // create another record
    someRecord, err = goavro.NewRecord(goavro.RecordSchema(recordSchema))
    if err != nil {
        log.Fatal(err)
    }
    someRecord.Set("username", "Batman")
    someRecord.Set("comment", "Who are all of these crazies?")
    someRecord.Set("com.example.timestamp", int64(1427383430))
    fw.Write(someRecord)
}

func (*Writer) Close

func (fw *Writer) Close() error

Close is called when the open file is no longer needed. It flushes the bytes to the io.Writer if the file is being writtern.

func (*Writer) Write

func (fw *Writer) Write(datum interface{})

Write places a datum into the pipeline to be written to the Writer.

type WriterSetter

type WriterSetter func(*Writer) error

WriterSetter functions are those those which are used to instantiate a new Writer.

func BlockSize

func BlockSize(blockSize int64) WriterSetter

BlockSize specifies the default number of data items to be grouped in a block, compressed, and written to the stream.

It is a valid use case to set both BlockTick and BlockSize. For example, if BlockTick is set to time.Minute and BlockSize is set to 20, but only 13 items are written to the Writer in a minute, those 13 items will be grouped in a block, compressed, and written to the stream without waiting for the addition 7 items to complete the BlockSize.

By default, BlockSize is set to DefaultWriterBlockSize.

func BlockTick

func BlockTick(blockTick time.Duration) WriterSetter

BlockTick specifies the duration of time between when the Writer will flush the blocks to the stream.

It is a valid use case to set both BlockTick and BlockSize. For example, if BlockTick is set to time.Minute and BlockSize is set to 20, but only 13 items are written to the Writer in a minute, those 13 items will be grouped in a block, compressed, and written to the stream without waiting for the addition 7 items to complete the BlockSize.

By default, BlockTick is set to 0 and is ignored. This causes the blocker to fill up its internal queue of data to BlockSize items before flushing them to the stream.

func BufferToWriter

func BufferToWriter(w io.Writer) WriterSetter

BufferToWriter specifies which io.Writer is the target of the Writer stream, and creates a bufio.Writer around that io.Writer. It is invalid to specify both BufferToWriter and ToWriter. Exactly one of these must be called for a given Writer initialization.

func Compression

func Compression(someCompressionCodec string) WriterSetter

Compression is used to set the compression codec of a new Writer instance.

func Sync

func Sync(someSync []byte) WriterSetter

Sync is used to set the sync marker bytes of a new instance. It checks to ensure the byte slice is 16 bytes long, but does not check that it has been set to something other than the zero value. Usually you can elide the `Sync` call and allow it to create a random byte sequence.

func ToWriter

func ToWriter(w io.Writer) WriterSetter

ToWriter specifies which io.Writer is the target of the Writer stream. It is invalid to specify both BufferToWriter and ToWriter. Exactly one of these must be called for a given Writer initialization.

func UseCodec

func UseCodec(codec Codec) WriterSetter

UseCodec specifies that a Writer should reuse an existing Codec rather than creating a new one, needlessly recompling the same schema.

func WriterSchema

func WriterSchema(someSchema string) WriterSetter

WriterSchema is used to set the Avro schema of a new instance. If a codec has already been compiled for the schema, it is faster to use the UseCodec method instead of WriterSchema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL