Documentation ¶
Index ¶
- Variables
- func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)
- type Consumer
- type ConsumerOption
- type DrainList
- type Dryer
- type NatsReceiver
- type OpenerReceiverCloser
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SubjectQueuePool
- type Subscriber
- type TeleObservability
- func (t *TeleObservability) InboundContextDecorators() []func(context.Context, binding.Message) context.Context
- func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))
- func (t TeleObservability) RecordReceivedMalformedEvent(_ context.Context, _ error)
- func (t TeleObservability) RecordRequestEvent(ctx context.Context, _ event.Event) (context.Context, func(error, *event.Event))
- func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))
Constants ¶
This section is empty.
Variables ¶
var ErrEmptySubject = errors.New("empty subject list")
Functions ¶
func NewSender ¶
func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)
NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection
Types ¶
type Consumer ¶
type Consumer struct { NatsReceiver Conn *nats.Conn Subject string Subscriber Subscriber // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueuePoolSubscriber ¶
func WithQueuePoolSubscriber(queue string, subject ...string) ConsumerOption
WithQueuePoolSubscriber create subject list pool for specific queue
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type DrainList ¶
type DrainList []Dryer
DrainList simple cleaner which on drain error note call left drains
type NatsReceiver ¶
func NewReceiver ¶
func NewReceiver(ch <-chan *nats.Msg) NatsReceiver
type OpenerReceiverCloser ¶
type OpenerReceiverCloser interface { protocol.Opener protocol.ReceiveCloser }
type Protocol ¶
type Protocol struct { Conn *nats.Conn Consumer OpenerReceiverCloser Sender protocol.SendCloser // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
func WithConsumerOptions ¶
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
type Sender ¶
func NewSenderFromConn ¶
func NewSenderFromConn(conn *nats.Conn, subject string, opts ...cn.SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SubjectQueuePool ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)
}
The Subscriber interface allows us to configure how the subscription is created
type TeleObservability ¶
type TeleObservability struct { *tel.Telemetry Metrics metrics.MetricsReader }
TeleObservability implement cloudevents client.ObservabilityService with OpenTracing propagation This flow idempotent and not tight coupled to NATS and can easily treat any opentracing flow which provides correct context values
Producer component handled in RecordSendingEvent and has specific context requirements
func (*TeleObservability) InboundContextDecorators ¶
func (*TeleObservability) RecordCallingInvoker ¶
func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))
RecordCallingInvoker consumer middleware expect special data inside containing opentracing.SpanReference which receiver should put inside ˚ consumer represent invoker model for opentracing, from tracing objectives this mean that it begin new span either and that span should be return and used by others
func (TeleObservability) RecordReceivedMalformedEvent ¶
func (t TeleObservability) RecordReceivedMalformedEvent(_ context.Context, _ error)
func (TeleObservability) RecordRequestEvent ¶
func (*TeleObservability) RecordSendingEvent ¶
func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))
RecordSendingEvent producer interceptor required context argument to be polluted with tel context creates new tracing brunch from provided inside context OpenTracing or tel data