coder

package
v2.22.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package coder contains coder representation and utilities. Coders describe how to serialize and deserialize pipeline data and may be provided by users.

Index

Constants

This section is empty.

Variables

View Source
var ErrVarIntTooLong = errors.New("varint too long")

ErrVarIntTooLong indicates a data corruption issue that needs special handling by callers of decode. TODO(herohde): have callers perform this special handling.

Functions

func DecodeDouble

func DecodeDouble(r io.Reader) (float64, error)

DecodeDouble decodes a float64 in big endian format.

func DecodeEventTime

func DecodeEventTime(r io.Reader) (typex.EventTime, error)

DecodeEventTime decodes an EventTime.

func DecodeInt32

func DecodeInt32(r io.Reader) (int32, error)

DecodeInt32 decodes an int32 in big endian format.

func DecodeUint32

func DecodeUint32(r io.Reader) (uint32, error)

DecodeUint32 decodes an uint32 in big endian format.

func DecodeUint64

func DecodeUint64(r io.Reader) (uint64, error)

DecodeUint64 decodes an uint64 in big endian format.

func DecodeVarInt

func DecodeVarInt(r io.Reader) (int64, error)

DecodeVarInt decodes an int64.

func DecodeVarUint64

func DecodeVarUint64(r io.Reader) (uint64, error)

DecodeVarUint64 decodes an uint64.

func EncodeDouble

func EncodeDouble(value float64, w io.Writer) error

EncodeDouble encodes a float64 in big endian format.

func EncodeEventTime

func EncodeEventTime(et typex.EventTime, w io.Writer) error

EncodeEventTime encodes an EventTime as an uint64. The encoding is millis-since-epoch, but shifted so that the byte representation of negative values are lexicographically ordered before the byte representation of positive values.

func EncodeInt32

func EncodeInt32(value int32, w io.Writer) error

EncodeInt32 encodes an int32 in big endian format.

func EncodeUint32

func EncodeUint32(value uint32, w io.Writer) error

EncodeUint32 encodes an uint32 in big endian format.

func EncodeUint64

func EncodeUint64(value uint64, w io.Writer) error

EncodeUint64 encodes an uint64 in big endian format.

func EncodeVarInt

func EncodeVarInt(value int64, w io.Writer) error

EncodeVarInt encodes an int64.

func EncodeVarUint64

func EncodeVarUint64(value uint64, w io.Writer) error

EncodeVarUint64 encodes an uint64.

func IsCoGBK

func IsCoGBK(c *Coder) bool

IsCoGBK returns true iff the coder is for a CoGBK type.

func IsKV

func IsKV(c *Coder) bool

IsKV returns true iff the coder is for key-value pairs.

func IsW

func IsW(c *Coder) bool

IsW returns true iff the coder is for a WindowedValue.

func RegisterCoder

func RegisterCoder(t reflect.Type, enc, dec interface{})

RegisterCoder registers a user defined coder for a given type, and will be used if there is no beam coder for that type. Must be called prior to beam.Init(), preferably in an init() function.

Coders are encoder and decoder pairs, and operate around []bytes.

The coder used for a given type follows this ordering:

  1. Coders for Known Beam types.
  2. Coders registered for specific types
  3. Coders registered for interfaces types
  4. Default coder (JSON)

Types of kind Interface, are handled specially by the registry, so they may be iterated over to check if element types implement them.

Repeated registrations of the same type overrides prior ones.

func Types

func Types(list []*Coder) []typex.FullType

Types returns a slice of types used by the supplied coders.

Types

type Coder

type Coder struct {
	Kind Kind
	T    typex.FullType

	Components []*Coder     // WindowedValue, KV, CoGBK
	Custom     *CustomCoder // Custom
	Window     *WindowCoder // WindowedValue
}

Coder is a description of how to encode and decode values of a given type. Except for the "custom" kind, they are built in and must adhere to the (unwritten) Beam specification.

func NewBool

func NewBool() *Coder

NewBool returns a new bool coder using the built-in scheme.

func NewBytes

func NewBytes() *Coder

NewBytes returns a new []byte coder using the built-in scheme. It is always nested, for now.

func NewCoGBK

func NewCoGBK(components []*Coder) *Coder

NewCoGBK returns a coder for CoGBK elements.

func NewDouble

func NewDouble() *Coder

NewDouble returns a new double coder using the built-in scheme.

func NewKV

func NewKV(components []*Coder) *Coder

NewKV returns a coder for key-value pairs.

func NewVarInt

func NewVarInt() *Coder

NewVarInt returns a new int64 coder using the built-in scheme.

func NewW

func NewW(c *Coder, w *WindowCoder) *Coder

NewW returns a WindowedValue coder for the window of elements.

func SkipW

func SkipW(c *Coder) *Coder

SkipW returns the data coder used by a WindowedValue, or returns the coder. This allows code to seamlessly traverse WindowedValues without additional conditional code.

func (*Coder) Equals

func (c *Coder) Equals(o *Coder) bool

Equals returns true iff the two coders are equal. It assumes that functions with the same name and types are identical.

func (*Coder) String

func (c *Coder) String() string

type CustomCoder

type CustomCoder struct {
	// Name is the coder name. Informational only.
	Name string
	// Type is the underlying concrete type that is being coded. It is
	// available to Enc and Dec. It must be a concrete type.
	Type reflect.Type

	// Enc is the encoding function : T -> []byte. It may optionally take a
	// reflect.Type parameter and return an error as well.
	Enc *funcx.Fn
	// Dec is the decoding function: []byte -> T. It may optionally take a
	// reflect.Type parameter and return an error as well.
	Dec *funcx.Fn
}

CustomCoder contains possibly untyped encode/decode user functions that are type-bound at runtime. Universal coders can thus be used for many different types, but each CustomCoder instance will be bound to a specific type.

func LookupCustomCoder

func LookupCustomCoder(t reflect.Type) *CustomCoder

LookupCustomCoder returns the custom coder for the type if any, first checking for a specific matching type, and then iterating through registered interface coders in reverse registration order.

func NewCustomCoder

func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error)

NewCustomCoder creates a coder for the supplied parameters defining a particular encoding strategy.

func (*CustomCoder) Equals

func (c *CustomCoder) Equals(o *CustomCoder) bool

Equals returns true iff the two custom coders are equal. It assumes that functions with the same name and types are identical.

func (*CustomCoder) String

func (c *CustomCoder) String() string

type ElementDecoder

type ElementDecoder interface {
	Decode(r io.Reader) (interface{}, error)
}

ElementDecoder encapsulates being able to decode an element from a reader.

type ElementEncoder

type ElementEncoder interface {
	Encode(element interface{}, w io.Writer) error
}

ElementEncoder encapsulates being able to encode an element into a writer.

type Kind

type Kind string

Kind represents the type of coder used.

const (
	Custom        Kind = "Custom" // Implicitly length-prefixed
	Bytes         Kind = "bytes"  // Implicitly length-prefixed as part of the encoding
	Bool          Kind = "bool"
	VarInt        Kind = "varint"
	Double        Kind = "double"
	WindowedValue Kind = "W"
	KV            Kind = "KV"

	// CoGBK is currently equivalent to either
	//
	//     KV<X,Iterable<Y>>         (if GBK)
	//     KV<X,Iterable<KV<int,Y>>> (if CoGBK, using a tagged union encoding)
	//
	// It requires special handling in translation to the model pipeline in the latter case
	// to add the incoming index for each input.
	//
	// TODO(BEAM-490): once this JIRA is done, this coder should become the new thing.
	CoGBK Kind = "CoGBK"
)

Tags for the various Beam encoding strategies. https://beam.apache.org/documentation/programming-guide/#coders documents the usage of coders in the Beam environment.

type WindowCoder

type WindowCoder struct {
	Kind WindowKind
}

WindowCoder represents a Window coder.

func NewGlobalWindow

func NewGlobalWindow() *WindowCoder

NewGlobalWindow returns a window coder for the global window.

func NewIntervalWindow

func NewIntervalWindow() *WindowCoder

NewIntervalWindow returns a window coder for interval windows.

func (*WindowCoder) Equals

func (w *WindowCoder) Equals(o *WindowCoder) bool

func (*WindowCoder) String

func (w *WindowCoder) String() string

type WindowKind

type WindowKind string

WindowKind represents a kind of window coder.

const (
	GlobalWindow   WindowKind = "GWC"
	IntervalWindow WindowKind = "IWC"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL