Documentation ¶
Overview ¶
Package binding defines interfaces for protocol bindings.
NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.
The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.
Protocol Bindings ¶
A protocol binding implements at least Message, Sender and Receiver, and usually Encoder.
Receiver: receives protocol messages and wraps them to implement the Message interface.
Message: interface that defines the visitors for an encoded event in structured mode, binary mode or event mode. A method is provided to read the Encoding of the message
Sender: converts arbitrary Message implementations to a protocol-specific form and sends them. A protocol Sender should preserve the spec-version and structured/binary mode of sent messages as far as possible. This package provides generic Sender wrappers to pre-process messages into a specific spec-version or structured/binary mode when the user requires that.
Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.
Intermediaries ¶
Intermediaries can forward Messages from a Receiver to a Sender without knowledge of the underlying protocols. The Message interface allows structured messages to be forwarded without decoding and re-encoding. It also allows any Message to be fully decoded and examined as needed.
Example (Implementing) ¶
Example of implementing a transport including a simple message type, and a transport sender and receiver.
package main import ( "bytes" "context" "encoding/json" "io" "io/ioutil" "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/binding/format" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" ) // ExMessage is a json.RawMessage, a byte slice containing a JSON encoded event. // It implements binding.MockStructuredMessage // // Note: a good binding implementation should provide an easy way to convert // between the Message implementation and the "native" message format. // In this case it's as simple as: // // native = ExMessage(impl) // impl = json.RawMessage(native) // // For example in a HTTP binding it should be easy to convert between // the HTTP binding.Message implementation and net/http.Request and // Response types. There are no interfaces for this conversion as it // requires the use of unknown types. type ExMessage json.RawMessage func (m ExMessage) GetParent() binding.Message { return nil } func (m ExMessage) Encoding() binding.Encoding { return binding.EncodingStructured } func (m ExMessage) Structured(ctx context.Context, b binding.StructuredEncoder) error { return b.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m)) } func (m ExMessage) Binary(context.Context, binding.BinaryEncoder) error { return binding.ErrNotBinary } func (m ExMessage) Finish(error) error { return nil } var _ binding.Message = (*ExMessage)(nil) // ExSender sends by writing JSON encoded events to an io.Writer // ExSender supports transcoding // ExSender implements directly StructuredEncoder & EventEncoder type ExSender struct { encoder *json.Encoder transformers binding.TransformerFactories } func NewExSender(w io.Writer, factories ...binding.TransformerFactory) binding.Sender { return &ExSender{encoder: json.NewEncoder(w), transformers: factories} } func (s *ExSender) Send(ctx context.Context, m binding.Message) error { // Encode tries the various encodings, starting with provided root encoder factories. // If a sender doesn't support a specific encoding, a null root encoder factory could be provided. _, err := binding.Encode( ctx, m, s, nil, s.transformers, ) return err } func (s *ExSender) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { if f == format.JSON { b, err := ioutil.ReadAll(event) if err != nil { return err } return s.encoder.Encode(json.RawMessage(b)) } else { return binding.ErrNotStructured } } func (s *ExSender) Close(context.Context) error { return nil } var _ binding.Sender = (*ExSender)(nil) var _ binding.StructuredEncoder = (*ExSender)(nil) // ExReceiver receives by reading JSON encoded events from an io.Reader type ExReceiver struct{ decoder *json.Decoder } func NewExReceiver(r io.Reader) binding.Receiver { return &ExReceiver{json.NewDecoder(r)} } func (r *ExReceiver) Receive(context.Context) (binding.Message, error) { var rm json.RawMessage err := r.decoder.Decode(&rm) // This is just a byte copy. return ExMessage(rm), err } func (r *ExReceiver) Close(context.Context) error { return nil } // NewExTransport returns a transport.Transport which is implemented by // an ExSender and an ExReceiver func NewExTransport(r io.Reader, w io.Writer) transport.Transport { return binding.NewTransportAdapter(NewExSender(w), NewExReceiver(r), []func(ctx context.Context) context.Context{}) } // Example of implementing a transport including a simple message type, // and a transport sender and receiver. func main() {}
Output:
Example (Using) ¶
This example shows how to use a transport in sender, receiver, and intermediary processes.
The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.
package main import ( "context" "fmt" "io" "strconv" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" ) const count = 3 // Example ends after this many events. // The sender uses the cloudevents.Client API, not the transport APIs directly. func runSender(w io.Writer) error { c, err := client.New(NewExTransport(nil, w), client.WithoutTracePropagation()) if err != nil { return err } for i := 0; i < count; i++ { e := cloudevents.New() e.SetType("example.com/event") e.SetSource("example.com/source") e.SetID(strconv.Itoa(i)) if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil { return err } if _, _, err := c.Send(context.TODO(), e); err != nil { return err } } return nil } // The receiver uses the cloudevents.Client API, not the transport APIs directly. func runReceiver(r io.Reader) error { i := 0 process := func(e cloudevents.Event) error { fmt.Printf("%s\n", e) i++ if i == count { return io.EOF } return nil } c, err := client.New(NewExTransport(r, nil), client.WithoutTracePropagation()) if err != nil { return err } return c.StartReceiver(context.TODO(), process) } // The intermediary receives events and forwards them to another // process using ExReceiver and ExSender directly. // // By forwarding a transport.Message instead of a cloudevents.Event, // it allows the transports to avoid un-necessary decoding of // structured events, and to exchange delivery status between reliable // transports. Even transports using different protocols can ensure // reliable delivery. func runIntermediary(r io.Reader, w io.WriteCloser) error { defer w.Close() for { receiver := NewExReceiver(r) sender := NewExSender(w) for i := 0; i < count; i++ { if m, err := receiver.Receive(context.TODO()); err != nil { return err } else if err := sender.Send(context.TODO(), m); err != nil { return err } } } } // This example shows how to use a transport in sender, receiver, // and intermediary processes. // // The sender and receiver use the client.Client API to send and // receive messages. the transport. Only the intermediary example // actually uses the transport APIs for efficiency and reliability in // forwarding events. func main() { r1, w1 := io.Pipe() // The sender-to-intermediary pipe r2, w2 := io.Pipe() // The intermediary-to-receiver pipe done := make(chan error) go func() { done <- runReceiver(r2) }() go func() { done <- runIntermediary(r1, w2) }() go func() { done <- runSender(w1) }() for i := 0; i < 2; i++ { if err := <-done; err != nil && err != io.EOF { fmt.Println(err) } } }
Output: Validation: valid Context Attributes, specversion: 1.0 type: example.com/event source: example.com/source id: 0 Data, "hello 0" Validation: valid Context Attributes, specversion: 1.0 type: example.com/event source: example.com/source id: 1 Data, "hello 1" Validation: valid Context Attributes, specversion: 1.0 type: example.com/event source: example.com/source id: 2 Data, "hello 2"
Index ¶
- Constants
- Variables
- func EventContextToBinaryEncoder(c cloudevents.EventContext, b BinaryEncoder) (err error)
- func GetOrDefaultFromCtx(ctx context.Context, key string, def interface{}) interface{}
- func WithForceBinary(ctx context.Context) context.Context
- func WithForceStructured(ctx context.Context) context.Context
- func WithPreferredEventEncoding(ctx context.Context, enc Encoding) context.Context
- func WithSkipDirectBinaryEncoding(ctx context.Context, skip bool) context.Context
- func WithSkipDirectStructuredEncoding(ctx context.Context, skip bool) context.Context
- type BinaryEncoder
- type BindingTransport
- func (t *BindingTransport) HasConverter() bool
- func (t *BindingTransport) HasTracePropagation() bool
- func (t *BindingTransport) Send(ctx context.Context, e ce.Event) (context.Context, *ce.Event, error)
- func (t *BindingTransport) SetConverter(transport.Converter)
- func (t *BindingTransport) SetReceiver(r transport.Receiver)
- func (t *BindingTransport) StartReceiver(ctx context.Context) error
- type ChanReceiver
- type ChanSender
- type Closer
- type Encoding
- func Encode(ctx context.Context, message Message, structuredEncoder StructuredEncoder, ...) (Encoding, error)
- func RunDirectEncoding(ctx context.Context, message Message, structuredEncoder StructuredEncoder, ...) (Encoding, error)
- func ToEvent(ctx context.Context, message Message, transformers ...TransformerFactory) (e ce.Event, encoding Encoding, err error)
- type EventMessage
- func (m EventMessage) Binary(ctx context.Context, b BinaryEncoder) (err error)
- func (m EventMessage) Encoding() Encoding
- func (EventMessage) Finish(error) error
- func (m EventMessage) GetParent() Message
- func (m *EventMessage) SetEvent(e ce.Event) error
- func (m EventMessage) Structured(ctx context.Context, builder StructuredEncoder) error
- type EventTransformer
- type ExactlyOnceMessage
- type Message
- type MessageWrapper
- type ReceiveCloser
- type Receiver
- type Requester
- type SendCloser
- type Sender
- type StructuredEncoder
- type TransformerFactories
- type TransformerFactory
Examples ¶
Constants ¶
const ( SKIP_DIRECT_STRUCTURED_ENCODING = "SKIP_DIRECT_STRUCTURED_ENCODING" SKIP_DIRECT_BINARY_ENCODING = "SKIP_DIRECT_BINARY_ENCODING" PREFERRED_EVENT_ENCODING = "PREFERRED_EVENT_ENCODING" )
Variables ¶
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")
var ErrNotBinary = errors.New("message is not in binary mode")
ErrNotBinary returned by Message.Binary for non-binary messages.
var ErrNotStructured = errors.New("message is not in structured mode")
ErrNotStructured returned by Message.Structured for non-structured messages.
var ErrUnknownEncoding = errors.New("unknown Message encoding")
Error to specify that or the Message is not an event or it is encoded with an unknown encoding
Functions ¶
func EventContextToBinaryEncoder ¶ added in v1.1.0
func EventContextToBinaryEncoder(c cloudevents.EventContext, b BinaryEncoder) (err error)
func GetOrDefaultFromCtx ¶ added in v1.1.0
func WithForceBinary ¶ added in v1.1.0
Force binary encoding during the encoding process
func WithForceStructured ¶ added in v1.1.0
Force structured encoding during the encoding process
func WithPreferredEventEncoding ¶ added in v1.1.0
Define the preferred encoding from event to message during the encoding process
func WithSkipDirectBinaryEncoding ¶ added in v1.1.0
Skip direct binary to binary encoding during the encoding process
Types ¶
type BinaryEncoder ¶ added in v0.10.0
type BinaryEncoder interface { // Method invoked at the beginning of the visit. Useful to perform initial memory allocations Start(ctx context.Context) error // Set a standard attribute. // // The value can either be the correct golang type for the attribute, or a canonical // string encoding. See package cloudevents/types SetAttribute(attribute spec.Attribute, value interface{}) error // Set an extension attribute. // // The value can either be the correct golang type for the attribute, or a canonical // string encoding. See package cloudevents/types SetExtension(name string, value interface{}) error // SetData receives an io.Reader for the data attribute. // io.Reader could be empty, meaning that message payload is empty SetData(data io.Reader) error // End method is invoked only after the whole encoding process ends successfully. // If it fails, it's never invoked. It can be used to finalize the message. End() error }
BinaryEncoder is used to visit a binary Message and generate a new representation.
Protocols that supports binary encoding should implement this interface to implement direct binary -> binary transfer and event -> binary.
Start() and End() methods are invoked every time this BinaryEncoder implementation is used to visit a Message
type BindingTransport ¶ added in v1.1.0
type BindingTransport struct { Sender Sender Receiver Receiver SenderContextDecorators []func(context.Context) context.Context // contains filtered or unexported fields }
BindingTransport implements transport.Transport using a Sender and Receiver.
func NewTransportAdapter ¶ added in v1.1.0
func (*BindingTransport) HasConverter ¶ added in v1.1.0
func (t *BindingTransport) HasConverter() bool
func (*BindingTransport) HasTracePropagation ¶ added in v1.1.0
func (t *BindingTransport) HasTracePropagation() bool
func (*BindingTransport) SetConverter ¶ added in v1.1.0
func (t *BindingTransport) SetConverter(transport.Converter)
func (*BindingTransport) SetReceiver ¶ added in v1.1.0
func (t *BindingTransport) SetReceiver(r transport.Receiver)
func (*BindingTransport) StartReceiver ¶ added in v1.1.0
func (t *BindingTransport) StartReceiver(ctx context.Context) error
type ChanReceiver ¶ added in v0.10.0
type ChanReceiver <-chan Message
ChanReceiver implements Receiver by receiving from a channel.
type ChanSender ¶ added in v0.10.0
type ChanSender chan<- Message
ChanSender implements Sender by sending on a channel.
type Encoding ¶ added in v1.1.0
type Encoding int
Encoding enum specifies the type of encodings supported by binding interfaces
const ( // Binary encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message EncodingBinary Encoding = iota // Structured encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message EncodingStructured // Message is an instance of EventMessage or it contains it nested (through MessageWrapper) EncodingEvent // When the encoding is unknown (which means that the message is a non-event) EncodingUnknown )
func Encode ¶ added in v1.1.0
func Encode( ctx context.Context, message Message, structuredEncoder StructuredEncoder, binaryEncoder BinaryEncoder, transformers TransformerFactories, ) (Encoding, error)
This is the full algorithm to encode a Message using transformers: 1. It first tries direct encoding using RunEncoders 2. If no direct encoding is possible, it goes through ToEvent to generate an event representation 3. Using the encoders previously defined You can tweak the encoding process using the context decorators WithForceStructured, WithForceStructured, etc. This function guarantees that transformers are invoked only one time during the encoding process. Returns: * EncodingStructured, nil if message was structured and correctly translated to Event * EncodingBinary, nil if message was binary and correctly translated to Event * EncodingStructured, err if message was structured but error happened during translation * BinaryEncoding, err if message was binary but error happened during translation * EncodingUnknown, ErrUnknownEncoding if message is not recognized
func RunDirectEncoding ¶ added in v1.1.0
func RunDirectEncoding( ctx context.Context, message Message, structuredEncoder StructuredEncoder, binaryEncoder BinaryEncoder, factories TransformerFactories, ) (Encoding, error)
Invokes the encoders. createRootStructuredEncoder and createRootBinaryEncoder could be null if the protocol doesn't support it
Returns: * EncodingStructured, nil if message was structured and correctly translated to Event * EncodingBinary, nil if message was binary and correctly translated to Event * EncodingStructured, err if message was structured but error happened during translation * BinaryEncoding, err if message was binary but error happened during translation * EncodingUnknown, ErrUnknownEncoding if message is not recognized
func ToEvent ¶ added in v0.11.0
func ToEvent(ctx context.Context, message Message, transformers ...TransformerFactory) (e ce.Event, encoding Encoding, err error)
Translates a Message with a valid Structured or Binary representation to an Event The TransformerFactories **aren't invoked** during the transformation to event, but after the event instance is generated
type EventMessage ¶
EventMessage type-converts a cloudevents.Event object to implement Message. This allows local cloudevents.Event objects to be sent directly via Sender.Send()
s.Send(ctx, binding.EventMessage(e))
func (EventMessage) Binary ¶ added in v0.11.0
func (m EventMessage) Binary(ctx context.Context, b BinaryEncoder) (err error)
func (EventMessage) Encoding ¶ added in v1.1.0
func (m EventMessage) Encoding() Encoding
func (EventMessage) Finish ¶
func (EventMessage) Finish(error) error
func (EventMessage) GetParent ¶ added in v1.1.0
func (m EventMessage) GetParent() Message
func (EventMessage) Structured ¶
func (m EventMessage) Structured(ctx context.Context, builder StructuredEncoder) error
type EventTransformer ¶ added in v1.1.0
type ExactlyOnceMessage ¶
type ExactlyOnceMessage interface { Message // Received is called by a forwarding QoS2 Sender when it gets // acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC) // // The receiver must call settle(nil) when it get's the ack-of-ack // (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the // transfer fails. // // Finally the Sender calls Finish() to indicate the message can be // discarded. // // If sending fails, or if the sender does not support QoS 2, then // Finish() may be called without any call to Received() Received(settle func(error)) }
ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.
type Message ¶
type Message interface { // Return the type of the message Encoding. // The encoding should be preferably computed when the message is constructed. Encoding() Encoding // Structured transfers a structured-mode event to a StructuredEncoder. // Returns ErrNotStructured if message is not in structured mode. // // Returns a different err if something wrong happened while trying to read the structured event // In this case, the caller must Finish the message with appropriate error // // This allows Senders to avoid re-encoding messages that are // already in suitable structured form. Structured(context.Context, StructuredEncoder) error // Binary transfers a binary-mode event to an BinaryEncoder. // Returns ErrNotBinary if message is not in binary mode. // // Returns a different err if something wrong happened while trying to read the binary event // In this case, the caller must Finish the message with appropriate error // // Allows Senders to forward a binary message without allocating an // intermediate Event. Binary(context.Context, BinaryEncoder) error // Finish *must* be called when message from a Receiver can be forgotten by // the receiver. Sender.Send() calls Finish() when the message is sent. A QoS // 1 sender should not call Finish() until it gets an acknowledgment of // receipt on the underlying transport. For QoS 2 see ExactlyOnceMessage. // // Passing a non-nil err indicates sending or processing failed. // A non-nil return indicates that the message was not accepted // by the receivers peer. Finish(error) error }
Message is the interface to a binding-specific message containing an event.
Reliable Delivery ¶
There are 3 reliable qualities of service for messages:
0/at-most-once/unreliable: messages can be dropped silently.
1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.
2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.
The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2
The Structured and Binary methods provide optional optimized transfer of an event to a Sender, they may not be implemented by all Message instances. A Sender should try each method of interest and fall back to ToEvent() if none are supported.
func WithFinish ¶ added in v0.10.0
WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.
type MessageWrapper ¶ added in v1.1.0
type MessageWrapper interface { Message // Method to get the wrapped message GetWrappedMessage() Message }
Message Wrapper interface is used to walk through a decorated Message and unwrap it.
type ReceiveCloser ¶ added in v0.10.0
ReceiveCloser is a Receiver that can be closed.
type Receiver ¶
type Receiver interface { // Receive blocks till a message is received or ctx expires. // // A non-nil error means the receiver is closed. // io.EOF means it closed cleanly, any other value indicates an error. Receive(ctx context.Context) (Message, error) }
Receiver receives messages.
type Requester ¶ added in v0.10.0
type Requester interface { // Request sends m like Sender.Send() but also arranges to receive a response. // The returned Receiver is used to receive the response. Request(ctx context.Context, m Message) (Receiver, error) }
Requester sends a message and receives a response
Optional interface that may be implemented by protocols that support request/response correlation.
type SendCloser ¶ added in v0.10.0
SendCloser is a Sender that can be closed.
type Sender ¶
type Sender interface { // Send a message. // // Send returns when the "outbound" message has been sent. The Sender may // still be expecting acknowledgment or holding other state for the message. // // m.Finish() is called when sending is finished: expected acknowledgments (or // errors) have been received, the Sender is no longer holding any state for // the message. m.Finish() may be called during or after Send(). // // To support optimized forwading of structured-mode messages, Send() // should use the encoding returned by m.Structured() if there is one. // Otherwise m.Event() can be encoded as per the binding's rules. Send(ctx context.Context, m Message) error }
Sender sends messages.
type StructuredEncoder ¶ added in v0.11.0
type StructuredEncoder interface { // Event receives an io.Reader for the whole event. SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error }
StructuredEncoder is used to visit a structured Message and generate a new representation.
Protocols that supports structured encoding should implement this interface to implement direct structured -> structured transfer and event -> binary.
type TransformerFactories ¶ added in v0.11.0
type TransformerFactories []TransformerFactory
Utility type alias to manage multiple TransformerFactory
func (TransformerFactories) BinaryTransformer ¶ added in v0.11.0
func (t TransformerFactories) BinaryTransformer(encoder BinaryEncoder) BinaryEncoder
func (TransformerFactories) EventTransformer ¶ added in v0.11.0
func (t TransformerFactories) EventTransformer() EventTransformer
func (TransformerFactories) StructuredTransformer ¶ added in v0.11.0
func (t TransformerFactories) StructuredTransformer(encoder StructuredEncoder) StructuredEncoder
type TransformerFactory ¶ added in v0.11.0
type TransformerFactory interface { // Can return nil if the transformation doesn't support structured encoding directly StructuredTransformer(encoder StructuredEncoder) StructuredEncoder // Can return nil if the transformation doesn't support binary encoding directly BinaryTransformer(encoder BinaryEncoder) BinaryEncoder // Can return nil if the transformation doesn't support events EventTransformer() EventTransformer }
Implements a transformation process while transferring the event from the Message implementation to the provided encoder
A transformer could optionally not provide an implementation for binary and/or structured encodings, returning nil to the respective factory method.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package format formats structured events.
|
Package format formats structured events. |
Package spec provides spec-version metadata.
|
Package spec provides spec-version metadata. |
Package test contains test data and generic tests for testing bindings.
|
Package test contains test data and generic tests for testing bindings. |