Documentation ¶
Overview ¶
Package hevent open telemetry keys
Index ¶
Constants ¶
const ( // HexaEventID is id of the event we use as key in hexa Context. HexaEventID = "HEXA_EVENT_ID" // HexaEventHandlerActionName is action's name that event handler wants to do. HexaEventHandlerActionName = "HEXA_EVENT_HANDLER_ACTION_NAME" // HexaRootEventID is id of the root event for retry events. HexaRootEventID = "HEXA_ROOT_EVENT_ID" // HexaRootEventHandlerActionName is action's name of the root event for retry events. HexaRootEventHandlerActionName = "HEXA_ROOT_EVENT_HANDLER_ACTION_NAME" )
const ( HeaderKeyReplyChannel = "_reply_channel" HeaderKeyPayloadEncoder = "_payload_encoder" // the message body. )
const Version = "1.0.0"
Version is the package current version.
Variables ¶
var ( MessagingActionName = attribute.Key("messaging.action.name") MessagingRootActionName = attribute.Key("messaging.action.root.name") )
open telemetry attribute keys:
var (
MessagingWithError = attribute.Key("messaging.with_error")
)
Functions ¶
func SubscribeMulti ¶
func SubscribeMulti(r Receiver, options ...*SubscriptionOptions) error
SubscribeMulti subscribes to multiple channel.
Types ¶
type Decoder ¶
type Decoder interface { // Decode decodes payload to the provided value. Decode(val interface{}) error }
Decoder is event payload decoder.
type Emitter ¶
type Emitter interface { // Emit sends event to the channel. // context can be nil. // dont forget to validate the event here. Emit(context.Context, *Event) (msgID string, err error) hexa.Shutdownable }
Emitter is the interface to emit events
type Encoder ¶
type Encoder interface { // Name returns the Encoder name. Name() string Encode(interface{}) ([]byte, error) Decoder([]byte) Decoder }
Encoder encode and decode the event payload.
func NewJsonEncoder ¶
func NewJsonEncoder() Encoder
NewJsonEncoder returns new instance of the json encoder.
func NewProtobufEncoder ¶
func NewProtobufEncoder() Encoder
NewProtobufEncoder returns new instance of the protobuf encoder.
type Event ¶
type Event struct { Key string // required, can use to specify partition number.(see pulsar docs) Channel string ReplyChannel string // optional (use if need to reply the response) // It will encode using either protobuf,json,... encoder(relative to config of emitter). // Dont forget that your emitter encoder and event receivers decoder should match with each other. Payload interface{} }
Event is the event to send.
type EventHandler ¶
type EventHandler func(HandlerContext, Message, error) error
EventHandler handle events. pulsar and hestan implementations just log returned error, in kafka if you return error, it will push event to the retry or DLQ topic.
func RecoverMiddleware ¶
func RecoverMiddleware(h EventHandler) EventHandler
RecoverMiddleware is a event handler middleware which recover panic error.
func WithMiddlewares ¶
func WithMiddlewares(h EventHandler, middlewares ...Middleware) EventHandler
WithMiddlewares adds middlewares to the handler too.
type HandlerContext ¶
type HandlerContext interface { context.Context // Ack get the message and send ack. Ack() // Nack gets the message and send negative ack. Nack() }
HandlerContext is the context that pass to the message handler.
type Message ¶
type Message struct { // Primary is not the RawMessage. its the driver's // raw message. Primary interface{} Headers map[string][]byte CorrelationId string ReplyChannel string Payload Decoder }
Message is the message that provide to event handler.
type Middleware ¶
type Middleware func(handler EventHandler) EventHandler
type RawMessage ¶
type RawMessage struct { Headers map[string][]byte `json:"header,omitempty"` Payload []byte `json:"payload"` }
RawMessage is the message sent by emitter, we will convert RawMessage to message and then pass it to the event handler. Note: Some event drivers (kafkabox & hafka) do not push the marshaled RawMessage as the event value, they send RawMessage's headers in the headers section and RawMessage's payload in the Payload section of the event, so if you want to define extra fields in addition to Headers and Payload in the RawMessage, please be careful.
func (RawMessage) Validate ¶
func (e RawMessage) Validate() error
type RawMessageConverter ¶
type RawMessageConverter interface { EventToRaw(c context.Context, e *Event) (*RawMessage, error) // RawMsgToMessage converts the raw message to a message. // primary is the primary driver's message that its receiver will get. RawMsgToMessage(c context.Context, raw *RawMessage, primary interface{}) (context.Context, Message, error) }
func NewRawMessageConverter ¶
func NewRawMessageConverter(p hexa.ContextPropagator, e Encoder) RawMessageConverter
type Receiver ¶
type Receiver interface { // Subscribe subscribe to the provided channel Subscribe(channel string, h EventHandler) error // SubscribeWithOptions subscribe by options. SubscribeWithOptions(*SubscriptionOptions) error hexa.Runnable // to start receiving events. hexa.Shutdownable // to close connections and shutdown the server. }
type SubscriptionOptions ¶
type SubscriptionOptions struct { // Channel specify the channel name you will subscribe on. // Either Channel,Channels or ChannelsPattern are required when subscribing. Channel string // Channels contains name of channels which we want to subscribe. // Either Channel,Channels or ChannelsPattern are required when subscribing. Channels []string // ChannelsPattern is the pattern you will use to subscribe on all channels // which match with this pattern. // Either Channel,Channels or ChannelsPattern are required when subscribing. ChannelsPattern string // Handler is the event handler. Handler EventHandler // contains filtered or unexported fields }
SubscriptionOptions contains options to subscribe to one or multiple channels.
func NewSubscriptionOptions ¶
func NewSubscriptionOptions(channel string, handler EventHandler) *SubscriptionOptions
NewSubscriptionOptions returns new instance of the subscription options.
func (*SubscriptionOptions) Extra ¶
func (so *SubscriptionOptions) Extra() []interface{}
Extra returns the extra data of the subscription options.
func (*SubscriptionOptions) Validate ¶
func (so *SubscriptionOptions) Validate() error
func (*SubscriptionOptions) WithExtra ¶
func (so *SubscriptionOptions) WithExtra(extra ...interface{}) *SubscriptionOptions
WithExtra add Extra data to the subscription options.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
Package hestan (hexa stan) in implementation of Nats-streaming broker for hexa SDK using stan client library of NATS.
|
Package hestan (hexa stan) in implementation of Nats-streaming broker for hexa SDK using stan client library of NATS. |
Package hexapulsar implements hexa events.
|
Package hexapulsar implements hexa events. |