Documentation ¶
Overview ¶
Package coder contains coder representation and utilities. Coders describe how to serialize and deserialize pipeline data and may be provided by users.
Index ¶
- Variables
- func DecodeBool(r io.Reader) (bool, error)
- func DecodeByte(r io.Reader) (byte, error)
- func DecodeBytes(r io.Reader) ([]byte, error)
- func DecodeDouble(r io.Reader) (float64, error)
- func DecodeEventTime(r io.Reader) (typex.EventTime, error)
- func DecodeInt32(r io.Reader) (int32, error)
- func DecodeStringUTF8(r io.Reader) (string, error)
- func DecodeUint32(r io.Reader) (uint32, error)
- func DecodeUint64(r io.Reader) (uint64, error)
- func DecodeVarInt(r io.Reader) (int64, error)
- func DecodeVarUint64(r io.Reader) (uint64, error)
- func EncodeBool(v bool, w io.Writer) error
- func EncodeByte(v byte, w io.Writer) error
- func EncodeBytes(v []byte, w io.Writer) error
- func EncodeDouble(value float64, w io.Writer) error
- func EncodeEventTime(et typex.EventTime, w io.Writer) error
- func EncodeInt32(value int32, w io.Writer) error
- func EncodeStringUTF8(s string, w io.Writer) error
- func EncodeUint32(value uint32, w io.Writer) error
- func EncodeUint64(value uint64, w io.Writer) error
- func EncodeVarInt(value int64, w io.Writer) error
- func EncodeVarUint64(value uint64, w io.Writer) error
- func IsCoGBK(c *Coder) bool
- func IsKV(c *Coder) bool
- func IsW(c *Coder) bool
- func RegisterCoder(t reflect.Type, enc, dec interface{})
- func Types(list []*Coder) []typex.FullType
- type Coder
- func CoderFrom(c *CustomCoder) *Coder
- func NewBool() *Coder
- func NewBytes() *Coder
- func NewCoGBK(components []*Coder) *Coder
- func NewDouble() *Coder
- func NewKV(components []*Coder) *Coder
- func NewString() *Coder
- func NewVarInt() *Coder
- func NewW(c *Coder, w *WindowCoder) *Coder
- func SkipW(c *Coder) *Coder
- type CustomCoder
- type ElementDecoder
- type ElementEncoder
- type Kind
- type WindowCoder
- type WindowKind
Constants ¶
This section is empty.
Variables ¶
var ErrVarIntTooLong = errors.New("varint too long")
ErrVarIntTooLong indicates a data corruption issue that needs special handling by callers of decode. TODO(herohde): have callers perform this special handling.
Functions ¶
func DecodeBool ¶
DecodeBool decodes a boolean according to the beam protocol.
func DecodeBytes ¶
DecodeBytes decodes a length prefixed []byte according to the beam protocol.
func DecodeDouble ¶
DecodeDouble decodes a float64 in big endian format.
func DecodeEventTime ¶
DecodeEventTime decodes an EventTime.
func DecodeInt32 ¶
DecodeInt32 decodes an int32 in big endian format.
func DecodeStringUTF8 ¶
DecodeStringUTF8 decodes a length prefixed UTF8 string.
func DecodeUint32 ¶
DecodeUint32 decodes an uint32 in big endian format.
func DecodeUint64 ¶
DecodeUint64 decodes an uint64 in big endian format.
func DecodeVarUint64 ¶
DecodeVarUint64 decodes an uint64.
func EncodeBool ¶
EncodeBool encodes a boolean according to the beam protocol.
func EncodeBytes ¶
EncodeBytes encodes a []byte with a length prefix per the beam protocol.
func EncodeDouble ¶
EncodeDouble encodes a float64 in big endian format.
func EncodeEventTime ¶
EncodeEventTime encodes an EventTime as an uint64. The encoding is millis-since-epoch, but shifted so that the byte representation of negative values are lexicographically ordered before the byte representation of positive values.
func EncodeInt32 ¶
EncodeInt32 encodes an int32 in big endian format.
func EncodeStringUTF8 ¶
EncodeStringUTF8 encodes a UTF8 string with a length prefix.
func EncodeUint32 ¶
EncodeUint32 encodes an uint32 in big endian format.
func EncodeUint64 ¶
EncodeUint64 encodes an uint64 in big endian format.
func EncodeVarInt ¶
EncodeVarInt encodes an int64.
func EncodeVarUint64 ¶
EncodeVarUint64 encodes an uint64.
func RegisterCoder ¶
RegisterCoder registers a user defined coder for a given type, and will be used if there is no beam coder for that type. Must be called prior to beam.Init(), preferably in an init() function.
Coders are encoder and decoder pairs, and operate around []bytes.
The coder used for a given type follows this ordering:
- Coders for Known Beam types.
- Coders registered for specific types
- Coders registered for interfaces types
- Default coder (JSON)
Types of kind Interface, are handled specially by the registry, so they may be iterated over to check if element types implement them.
Repeated registrations of the same type overrides prior ones.
Types ¶
type Coder ¶
type Coder struct { Kind Kind T typex.FullType Components []*Coder // WindowedValue, KV, CoGBK Custom *CustomCoder // Custom Window *WindowCoder // WindowedValue ID string // (optional) This coder's ID if translated from a pipeline proto. }
Coder is a description of how to encode and decode values of a given type. Except for the "custom" kind, they are built in and must adhere to the (unwritten) Beam specification.
func CoderFrom ¶
func CoderFrom(c *CustomCoder) *Coder
CoderFrom is a helper that creates a Coder from a CustomCoder.
func NewBytes ¶
func NewBytes() *Coder
NewBytes returns a new []byte coder using the built-in scheme. It is always nested, for now.
func NewDouble ¶
func NewDouble() *Coder
NewDouble returns a new double coder using the built-in scheme.
func NewString ¶
func NewString() *Coder
NewString returns a new string coder using the built-in scheme.
func NewVarInt ¶
func NewVarInt() *Coder
NewVarInt returns a new int64 coder using the built-in scheme.
func NewW ¶
func NewW(c *Coder, w *WindowCoder) *Coder
NewW returns a WindowedValue coder for the window of elements.
func SkipW ¶
SkipW returns the data coder used by a WindowedValue, or returns the coder. This allows code to seamlessly traverse WindowedValues without additional conditional code.
type CustomCoder ¶
type CustomCoder struct { // Name is the coder name. Informational only. Name string // Type is the underlying concrete type that is being coded. It is // available to Enc and Dec. It must be a concrete type. Type reflect.Type // Enc is the encoding function : T -> []byte. It may optionally take a // reflect.Type parameter and return an error as well. Enc *funcx.Fn // Dec is the decoding function: []byte -> T. It may optionally take a // reflect.Type parameter and return an error as well. Dec *funcx.Fn ID string // (optional) This coder's ID if translated from a pipeline proto. }
CustomCoder contains possibly untyped encode/decode user functions that are type-bound at runtime. Universal coders can thus be used for many different types, but each CustomCoder instance will be bound to a specific type.
func LookupCustomCoder ¶
func LookupCustomCoder(t reflect.Type) *CustomCoder
LookupCustomCoder returns the custom coder for the type if any, first checking for a specific matching type, and then iterating through registered interface coders in reverse registration order.
func NewCustomCoder ¶
func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error)
NewCustomCoder creates a coder for the supplied parameters defining a particular encoding strategy.
func (*CustomCoder) Equals ¶
func (c *CustomCoder) Equals(o *CustomCoder) bool
Equals returns true iff the two custom coders are equal. It assumes that functions with the same name and types are identical.
func (*CustomCoder) String ¶
func (c *CustomCoder) String() string
type ElementDecoder ¶
ElementDecoder encapsulates being able to decode an element from a reader.
type ElementEncoder ¶
ElementEncoder encapsulates being able to encode an element into a writer.
type Kind ¶
type Kind string
Kind represents the type of coder used.
const ( Custom Kind = "Custom" // Implicitly length-prefixed Bytes Kind = "bytes" // Implicitly length-prefixed as part of the encoding String Kind = "string" // Implicitly length-prefixed as part of the encoding. Bool Kind = "bool" VarInt Kind = "varint" Double Kind = "double" WindowedValue Kind = "W" KV Kind = "KV" // CoGBK is currently equivalent to either // // KV<X,Iterable<Y>> (if GBK) // KV<X,Iterable<KV<int,Y>>> (if CoGBK, using a tagged union encoding) // // It requires special handling in translation to the model pipeline in the latter case // to add the incoming index for each input. // // TODO(BEAM-490): once this JIRA is done, this coder should become the new thing. CoGBK Kind = "CoGBK" )
Tags for the various Beam encoding strategies. https://beam.apache.org/documentation/programming-guide/#coders documents the usage of coders in the Beam environment.
type WindowCoder ¶
type WindowCoder struct {
Kind WindowKind
}
WindowCoder represents a Window coder.
func NewGlobalWindow ¶
func NewGlobalWindow() *WindowCoder
NewGlobalWindow returns a window coder for the global window.
func NewIntervalWindow ¶
func NewIntervalWindow() *WindowCoder
NewIntervalWindow returns a window coder for interval windows.
func (*WindowCoder) Equals ¶
func (w *WindowCoder) Equals(o *WindowCoder) bool
func (*WindowCoder) String ¶
func (w *WindowCoder) String() string
type WindowKind ¶
type WindowKind string
WindowKind represents a kind of window coder.
const ( GlobalWindow WindowKind = "GWC" IntervalWindow WindowKind = "IWC" )