Documentation ¶
Overview ¶
Package goavro is a library that encodes and decodes of Avro data. It provides an interface to encode data directly to io.Writer streams, and to decode data from io.Reader streams. Goavro fully adheres to version 1.7.7 of the Avro specification and data encoding.
Index ¶
- Constants
- func IsCompressionCodecSupported(someCodec string) bool
- type Codec
- type CodecSetter
- type Datum
- type Decoder
- type Encoder
- type ErrCodecBuild
- type ErrDecoder
- type ErrEncoder
- type ErrInvalidName
- type ErrReader
- type ErrReaderBlockCount
- type ErrReaderInit
- type ErrSchemaParse
- type ErrWriterInit
- type Reader
- type ReaderSetter
- type Record
- type RecordSetter
- type Writer
- type WriterSetter
- func BlockSize(blockSize int64) WriterSetter
- func BlockTick(blockTick time.Duration) WriterSetter
- func BufferToWriter(w io.Writer) WriterSetter
- func Compression(someCompressionCodec string) WriterSetter
- func Sync(someSync []byte) WriterSetter
- func ToWriter(w io.Writer) WriterSetter
- func UseCodec(codec Codec) WriterSetter
- func WriterSchema(someSchema string) WriterSetter
Constants ¶
const ( CompressionNull = "null" CompressionDeflate = "deflate" CompressionSnappy = "snappy" )
Compression codecs that Reader and Writer instances can process.
const DefaultWriterBlockSize = 10
DefaultWriterBlockSizeo specifies the default number of datum items in a block when writing.
Variables ¶
This section is empty.
Functions ¶
func IsCompressionCodecSupported ¶
IsCompressionCodecSupported returns true if and only if the specified codec string is supported by this library.
Types ¶
type Codec ¶
type Codec interface { Decoder Encoder Schema() string NewWriter(...WriterSetter) (*Writer, error) }
The Codec interface supports both Decode and Encode operations.
func NewCodec ¶
func NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error)
NewCodec creates a new object that supports both the Decode and Encode methods. It requires an Avro schema, expressed as a JSON string.
codec, err := goavro.NewCodec(someJSONSchema) if err != nil { return nil, err } // Decoding data uses codec created above, and an io.Reader, // definition not shown: datum, err := codec.Decode(r) if err != nil { return nil, err } // Encoding data uses codec created above, an io.Writer, // definition not shown, and some data: err := codec.Encode(w, datum) if err != nil { return nil, err } // Encoding data using bufio.Writer to buffer the writes // during data encoding: func encodeWithBufferedWriter(c Codec, w io.Writer, datum interface{}) error { bw := bufio.NewWriter(w) err := c.Encode(bw, datum) if err != nil { return err } return bw.Flush() } err := encodeWithBufferedWriter(codec, w, datum) if err != nil { return nil, err }
type CodecSetter ¶
CodecSetter functions are those those which are used to modify a newly instantiated Codec.
type Datum ¶
type Datum struct { Value interface{} Err error }
Datum binds together a piece of data and any error resulting from either reading or writing that datum.
type ErrCodecBuild ¶
ErrCodecBuild is returned when the encoder encounters an error.
func (ErrCodecBuild) Error ¶
func (e ErrCodecBuild) Error() string
type ErrDecoder ¶
ErrDecoder is returned when the encoder encounters an error.
func (ErrDecoder) Error ¶
func (e ErrDecoder) Error() string
type ErrEncoder ¶
ErrEncoder is returned when the encoder encounters an error.
func (ErrEncoder) Error ¶
func (e ErrEncoder) Error() string
type ErrInvalidName ¶
type ErrInvalidName struct {
Message string
}
ErrInvalidName is returned when a Codec cannot be created due to invalid name format.
func (ErrInvalidName) Error ¶
func (e ErrInvalidName) Error() string
type ErrReaderBlockCount ¶
type ErrReaderBlockCount struct {
Err error
}
ErrReaderBlockCount is returned when a reader detects an error while attempting to read the block count and block size.
func (*ErrReaderBlockCount) Error ¶
func (e *ErrReaderBlockCount) Error() string
type ErrReaderInit ¶
ErrReaderInit is returned when the encoder encounters an error.
func (ErrReaderInit) Error ¶
func (e ErrReaderInit) Error() string
type ErrSchemaParse ¶
ErrSchemaParse is returned when a Codec cannot be created due to an error while reading or parsing the schema.
func (ErrSchemaParse) Error ¶
func (e ErrSchemaParse) Error() string
type ErrWriterInit ¶
ErrWriterInit is returned when an error is created during Writer initialization.
func (*ErrWriterInit) Error ¶
func (e *ErrWriterInit) Error() string
Error converts the error instance to a string.
type Reader ¶
type Reader struct { CompressionCodec string DataSchema string Sync []byte // contains filtered or unexported fields }
Reader structure contains data necessary to read Avro files.
func NewReader ¶
func NewReader(setters ...ReaderSetter) (*Reader, error)
NewReader returns a object to read data from an io.Reader using the Avro Object Container Files format.
func main() { conn, err := net.Dial("tcp", "127.0.0.1:8080") if err != nil { log.Fatal(err) } fr, err := goavro.NewReader(goavro.FromReader(conn)) if err != nil { log.Fatal("cannot create Reader: ", err) } defer func() { if err := fr.Close(); err != nil { log.Fatal(err) } }() for fr.Scan() { datum, err := fr.Read() if err != nil { log.Println("cannot read datum: ", err) continue } fmt.Println("RECORD: ", datum) } }
type ReaderSetter ¶
ReaderSetter functions are those those which are used to instantiate a new Reader.
func BufferFromReader ¶
func BufferFromReader(r io.Reader) ReaderSetter
BufferFromReader wraps the specified `io.Reader` using a `bufio.Reader` to read from a file.
func FromReader ¶
func FromReader(r io.Reader) ReaderSetter
FromReader specifies the `io.Reader` to use when reading a file.
type Record ¶
type Record struct { Name string Fields []*recordField // contains filtered or unexported fields }
Record is an abstract data type used to hold data corresponding to an Avro record. Wherever an Avro schema specifies a record, this library's Decode method will return a Record initialized to the record's values read from the io.Reader. Likewise, when using Encode to convert data to an Avro record, it is necessary to create and send a Record instance to the Encoder method.
func NewRecord ¶
func NewRecord(setters ...RecordSetter) (*Record, error)
NewRecord will create a Record instance corresponding to the specified schema.
func recordExample(codec goavro.Codec, w io.Writer, recordSchema string) error { // To encode a Record, you need to instantiate a Record instance // that adheres to the schema the Encoder expect. someRecord, err := goavro.NewRecord(goavro.RecordSchema(recordSchema)) if err != nil { return err } // Once you have a Record, you can set the values of the various fields. someRecord.Set("username", "Aquaman") someRecord.Set("comment", "The Atlantic is oddly cold this morning!") // Feel free to fully qualify the field name if you'd like someRecord.Set("com.example.timestamp", int64(1082196484)) // Once the fields of the Record have the correct data, you can encode it err = codec.Encode(w, someRecord) return err }
func (Record) GetFieldSchema ¶
GetFieldSchema returns the schema of the specified Record field.
type RecordSetter ¶
RecordSetter functions are those those which are used to instantiate a new Record.
func RecordEnclosingNamespace ¶
func RecordEnclosingNamespace(someNamespace string) RecordSetter
RecordEnclosingNamespace specifies the enclosing namespace of the record to create. For instance, if the enclosing namespace is `com.example`, and the record name is `Foo`, then the full record name will be `com.example.Foo`.
func RecordSchema ¶
func RecordSchema(recordSchemaJSON string) RecordSetter
RecordSchema specifies the schema of the record to create. Schema must be a JSON string.
type Writer ¶
type Writer struct { CompressionCodec string Sync []byte // contains filtered or unexported fields }
Writer structure contains data necessary to write Avro files.
func NewWriter ¶
func NewWriter(setters ...WriterSetter) (*Writer, error)
NewWriter returns a object to write data to an io.Writer using the Avro Object Container Files format.
func serveClient(conn net.Conn, codec goavro.Codec) { fw, err := goavro.NewWriter( goavro.BlockSize(100), // flush data every 100 items goavro.BlockTick(10 * time.Second), // but at least every 10 seconds goavro.Compression(goavro.CompressionSnappy), goavro.ToWriter(conn), goavro.UseCodec(codec)) if err != nil { log.Fatal("cannot create Writer: ", err) } defer fw.Close() // create a record that matches the schema we want to encode someRecord, err := goavro.NewRecord(goavro.RecordSchema(recordSchema)) if err != nil { log.Fatal(err) } // identify field name to set datum for someRecord.Set("username", "Aquaman") someRecord.Set("comment", "The Atlantic is oddly cold this morning!") // you can fully qualify the field name someRecord.Set("com.example.timestamp", int64(1082196484)) fw.Write(someRecord) // create another record someRecord, err = goavro.NewRecord(goavro.RecordSchema(recordSchema)) if err != nil { log.Fatal(err) } someRecord.Set("username", "Batman") someRecord.Set("comment", "Who are all of these crazies?") someRecord.Set("com.example.timestamp", int64(1427383430)) fw.Write(someRecord) }
type WriterSetter ¶
WriterSetter functions are those those which are used to instantiate a new Writer.
func BlockSize ¶
func BlockSize(blockSize int64) WriterSetter
BlockSize specifies the default number of data items to be grouped in a block, compressed, and written to the stream.
It is a valid use case to set both BlockTick and BlockSize. For example, if BlockTick is set to time.Minute and BlockSize is set to 20, but only 13 items are written to the Writer in a minute, those 13 items will be grouped in a block, compressed, and written to the stream without waiting for the addition 7 items to complete the BlockSize.
By default, BlockSize is set to DefaultWriterBlockSize.
func BlockTick ¶
func BlockTick(blockTick time.Duration) WriterSetter
BlockTick specifies the duration of time between when the Writer will flush the blocks to the stream.
It is a valid use case to set both BlockTick and BlockSize. For example, if BlockTick is set to time.Minute and BlockSize is set to 20, but only 13 items are written to the Writer in a minute, those 13 items will be grouped in a block, compressed, and written to the stream without waiting for the addition 7 items to complete the BlockSize.
By default, BlockTick is set to 0 and is ignored. This causes the blocker to fill up its internal queue of data to BlockSize items before flushing them to the stream.
func BufferToWriter ¶
func BufferToWriter(w io.Writer) WriterSetter
BufferToWriter specifies which io.Writer is the target of the Writer stream, and creates a bufio.Writer around that io.Writer. It is invalid to specify both BufferToWriter and ToWriter. Exactly one of these must be called for a given Writer initialization.
func Compression ¶
func Compression(someCompressionCodec string) WriterSetter
Compression is used to set the compression codec of a new Writer instance.
func Sync ¶
func Sync(someSync []byte) WriterSetter
Sync is used to set the sync marker bytes of a new instance. It checks to ensure the byte slice is 16 bytes long, but does not check that it has been set to something other than the zero value. Usually you can elide the `Sync` call and allow it to create a random byte sequence.
func ToWriter ¶
func ToWriter(w io.Writer) WriterSetter
ToWriter specifies which io.Writer is the target of the Writer stream. It is invalid to specify both BufferToWriter and ToWriter. Exactly one of these must be called for a given Writer initialization.
func UseCodec ¶
func UseCodec(codec Codec) WriterSetter
UseCodec specifies that a Writer should reuse an existing Codec rather than creating a new one, needlessly recompling the same schema.
func WriterSchema ¶
func WriterSchema(someSchema string) WriterSetter
WriterSchema is used to set the Avro schema of a new instance. If a codec has already been compiled for the schema, it is faster to use the UseCodec method instead of WriterSchema.