Documentation ¶
Index ¶
- Variables
- func ExtractDistributedTracingExtension(ctx context.Context, event *cloudevents.Event) (opentracing.SpanContext, error)
- func InjectDistributedTracingExtension(ctx context.Context, event *cloudevents.Event)
- func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)
- func NewTeleObservability(t *tel.Telemetry, m metrics.MetricsReader, opts ...ObservabilityOption) client.ObservabilityService
- type Consumer
- type ConsumerOption
- type DrainList
- type Dryer
- type NatsReceiver
- type ObservabilityOption
- type OpenerReceiverCloser
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SpanAttrGetter
- type SpanNameFormatter
- type SubjectQueuePool
- type Subscriber
- type TeleObservability
- func (t *TeleObservability) GetSpanAttributes(e cloudevents.Event, method string) opentracing.Tags
- func (t *TeleObservability) InboundContextDecorators() []func(context.Context, binding.Message) context.Context
- func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))
- func (t *TeleObservability) RecordReceivedMalformedEvent(ctx context.Context, err error)
- func (t TeleObservability) RecordRequestEvent(ctx context.Context, _ event.Event) (context.Context, func(error, *event.Event))
- func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))
Constants ¶
This section is empty.
Variables ¶
var ErrEmptySubject = errors.New("empty subject list")
var (
ErrTraceStateExtension = errors.New("cloudevents extension not contain key: " + extensions.TraceStateExtension)
)
Functions ¶
func ExtractDistributedTracingExtension ¶ added in v0.2.0
func ExtractDistributedTracingExtension(ctx context.Context, event *cloudevents.Event) (opentracing.SpanContext, error)
ExtractDistributedTracingExtension extracts the tracecontext from the cloud event.
func InjectDistributedTracingExtension ¶ added in v0.2.0
func InjectDistributedTracingExtension(ctx context.Context, event *cloudevents.Event)
InjectDistributedTracingExtension injects the tracecontext from the context into the event as a DistributedTracingExtension
If a DistributedTracingExtension is present in the provided event, its current value is replaced with the tracecontext obtained from the context.
func NewSender ¶
func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)
NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection
func NewTeleObservability ¶ added in v0.2.0
func NewTeleObservability(t *tel.Telemetry, m metrics.MetricsReader, opts ...ObservabilityOption) client.ObservabilityService
Types ¶
type Consumer ¶
type Consumer struct { NatsReceiver Conn *nats.Conn Subject string Subscriber Subscriber // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueuePoolSubscriber ¶
func WithQueuePoolSubscriber(queue string, subject ...string) ConsumerOption
WithQueuePoolSubscriber create subject list pool for specific queue
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type DrainList ¶
type DrainList []Dryer
DrainList simple cleaner which on drain error note call left drains
type NatsReceiver ¶
func NewReceiver ¶
func NewReceiver(ch <-chan *nats.Msg) NatsReceiver
type ObservabilityOption ¶ added in v0.2.0
type ObservabilityOption func(*TeleObservability)
func WithSpanAttributesGetter ¶ added in v0.2.0
func WithSpanAttributesGetter(attrGetter SpanAttrGetter) ObservabilityOption
WithSpanAttributesGetter appends the returned attributes from the function to the span.
func WithSpanNameFormatter ¶ added in v0.2.0
func WithSpanNameFormatter(nameFormatter SpanNameFormatter) ObservabilityOption
WithSpanNameFormatter replaces the default span name with the string returned from the function
type OpenerReceiverCloser ¶
type OpenerReceiverCloser interface { protocol.Opener protocol.ReceiveCloser }
type Protocol ¶
type Protocol struct { Conn *nats.Conn Consumer OpenerReceiverCloser Sender protocol.SendCloser // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
func WithConsumerOptions ¶
func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
type Sender ¶
func NewSenderFromConn ¶
func NewSenderFromConn(conn *nats.Conn, subject string, opts ...cn.SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SpanAttrGetter ¶ added in v0.2.0
type SpanAttrGetter func(cloudevents.Event) opentracing.Tags
type SpanNameFormatter ¶ added in v0.2.0
type SpanNameFormatter func(cloudevents.Event) string
type SubjectQueuePool ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)
}
The Subscriber interface allows us to configure how the subscription is created
type TeleObservability ¶
type TeleObservability struct { *tel.Telemetry Metrics metrics.MetricsReader // contains filtered or unexported fields }
TeleObservability implement cloudevents client.ObservabilityService with OpenTracing propagation This flow idempotent and not tight coupled to NATS and can easily treat any opentracing flow which provides correct context values
Producer component handled in RecordSendingEvent and has specific context requirements
func (*TeleObservability) GetSpanAttributes ¶ added in v0.2.0
func (t *TeleObservability) GetSpanAttributes(e cloudevents.Event, method string) opentracing.Tags
GetSpanAttributes returns the attributes that are always added to the spans
func (*TeleObservability) InboundContextDecorators ¶
func (t *TeleObservability) InboundContextDecorators() []func(context.Context, binding.Message) context.Context
InboundContextDecorators returns a decorator function that allows enriching the context with the incoming parent trace. This method gets invoked automatically by passing the option 'WithObservabilityService' when creating the cloudevents client.
func (*TeleObservability) RecordCallingInvoker ¶
func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))
RecordCallingInvoker consumer middleware expect special data inside containing opentracing.SpanReference which receiver should put inside ˚ consumer represent invoker model for opentracing, from tracing objectives this mean that it begin new span either and that span should be return and used by others
func (*TeleObservability) RecordReceivedMalformedEvent ¶
func (t *TeleObservability) RecordReceivedMalformedEvent(ctx context.Context, err error)
RecordReceivedMalformedEvent if content is unpredictable
func (TeleObservability) RecordRequestEvent ¶
func (*TeleObservability) RecordSendingEvent ¶
func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))
RecordSendingEvent producer interceptor required context argument to be polluted with tel context creates new tracing brunch from provided inside context OpenTracing or tel data