protonats

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2021 License: MIT Imports: 20 Imported by: 0

README

= Cloud Events NATS protocol

Updated nats protocol for https://github.com/cloudevents/sdk-go/tree/main/protocol/nats/v2

Feature:

* OpenTracing features with "github.com/d7561985/tel" send trace `to` NATS, read tracing span `from` NATS
* Producer uses `context.TopicFrom` feature for overwrite default subject
* Consumer subject pool for group
* Protocol Consumer and Sander struct members are Interfaces and easily could be replaced


== Trace feature enable

We use `TeleObservability` correctly read span from NATS and pack it correctly.
But this is only like middleware.

All engine pack/unpack under `adapter`

[source,go]
----
package main

import (
	"github.com/d7561985/tel"
	"github.com/d7561985/tel/monitoring/metrics"
	cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/client"
	"github.com/d7561985/protonats"
)

func main()  {
	t := tel.New(tel.GetConfigFromEnv())

	p, err := protonats.NewProtocol(env.NATSServer, "-", "", cenats.NatsOptions())

	metricsss := metrics.NewCollectorMetricsReader()
     ce, err := cloudevents.NewClient(p,
            client.WithObservabilityService(&protonats.TeleObservability{
                Metrics:   metricsss,
                Telemetry: &t,
            }),
        )
}
----

== Consumer Subject Group pool

Use option for protocol - `WithConsumerOptions`

[source,go]
----
    p, err := protonats.NewProtocol(env.NATSServer, "-", "",
		cenats.NatsOptions(),
		protonats.WithConsumerOptions(
			protonats.WithQueuePoolSubscriber("MyQueue",
				"MySubject1", "MySubject2",
			),
		),
	)
----

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptySubject = errors.New("empty subject list")

Functions

func NewSender

func NewSender(url, subject string, natsOpts []nats.Option, opts ...cn.SenderOption) (protocol.SendCloser, error)

NewSender creates a new protocol.Sender responsible for opening and closing the STAN connection

Types

type Consumer

type Consumer struct {
	NatsReceiver

	Conn       *nats.Conn
	Subject    string
	Subscriber Subscriber
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(url, subject string, natsOpts []nats.Option, opts ...ConsumerOption) (*Consumer, error)

func NewConsumerFromConn

func NewConsumerFromConn(conn *nats.Conn, subject string, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) OpenInbound

func (c *Consumer) OpenInbound(ctx context.Context) error

type ConsumerOption

type ConsumerOption func(*Consumer) error

func WithQueuePoolSubscriber

func WithQueuePoolSubscriber(queue string, subject ...string) ConsumerOption

WithQueuePoolSubscriber create subject list pool for specific queue

func WithQueueSubscriber

func WithQueueSubscriber(queue string) ConsumerOption

WithQueueSubscriber configures the Consumer to join a queue group when subscribing

type DrainList

type DrainList []Dryer

DrainList simple cleaner which on drain error note call left drains

func (DrainList) Drain

func (d DrainList) Drain() error

type Dryer

type Dryer interface {
	Drain() error
}

type NatsReceiver

type NatsReceiver interface {
	protocol.Receiver
}

func NewReceiver

func NewReceiver(ch <-chan *nats.Msg) NatsReceiver

type OpenerReceiverCloser

type OpenerReceiverCloser interface {
	protocol.Opener
	protocol.ReceiveCloser
}

type Protocol

type Protocol struct {
	Conn *nats.Conn

	Consumer OpenerReceiverCloser

	Sender protocol.SendCloser
	// contains filtered or unexported fields
}

Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.

func NewProtocol

func NewProtocol(url, sendSubject, receiveSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error)

NewProtocol creates a new NATS protocol.

func NewProtocolFromConn

func NewProtocolFromConn(conn *nats.Conn, sendSubject, receiveSubject string, opts ...ProtocolOption) (*Protocol, error)

func (*Protocol) Close

func (p *Protocol) Close(ctx context.Context) error

Close implements Closer.Close

func (*Protocol) OpenInbound

func (p *Protocol) OpenInbound(ctx context.Context) error

func (*Protocol) Receive

func (p *Protocol) Receive(ctx context.Context) (binding.Message, error)

Receive implements Receiver.Receive

func (*Protocol) Send

func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error

Send implements Sender.Send

type ProtocolOption

type ProtocolOption func(*Protocol) error

ProtocolOption is the function signature required to be considered an nats.ProtocolOption.

func WithConsumerOptions

func WithConsumerOptions(opts ...ConsumerOption) ProtocolOption

type QueueSubscriber

type QueueSubscriber struct {
	Queue string
}

QueueSubscriber creates queue subscriptions

func (*QueueSubscriber) Subscribe

func (s *QueueSubscriber) Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)

Subscribe implements Subscriber.Subscribe

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

func (*Receiver) Receive

func (r *Receiver) Receive(ctx context.Context) (binding.Message, error)

type RegularSubscriber

type RegularSubscriber struct {
}

RegularSubscriber creates regular subscriptions

func (*RegularSubscriber) Subscribe

func (s *RegularSubscriber) Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)

Subscribe implements Subscriber.Subscribe

type Sender

type Sender struct {
	*cn.Sender
}

func NewSenderFromConn

func NewSenderFromConn(conn *nats.Conn, subject string, opts ...cn.SenderOption) (*Sender, error)

NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller

func (*Sender) Send

func (s *Sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error)

type SubjectQueuePool

type SubjectQueuePool struct {
	Queue    string
	Subjects []string
}

func (*SubjectQueuePool) Subscribe

func (s *SubjectQueuePool) Subscribe(conn *nats.Conn, _ string, cn chan *nats.Msg) (Dryer, error)

type Subscriber

type Subscriber interface {
	Subscribe(conn *nats.Conn, subject string, cn chan *nats.Msg) (Dryer, error)
}

The Subscriber interface allows us to configure how the subscription is created

type TeleObservability

type TeleObservability struct {
	*tel.Telemetry
	Metrics metrics.MetricsReader
}

TeleObservability implement cloudevents client.ObservabilityService with OpenTracing propagation This flow idempotent and not tight coupled to NATS and can easily treat any opentracing flow which provides correct context values

Producer component handled in RecordSendingEvent and has specific context requirements

func (*TeleObservability) InboundContextDecorators

func (t *TeleObservability) InboundContextDecorators() []func(context.Context, binding.Message) context.Context

func (*TeleObservability) RecordCallingInvoker

func (t *TeleObservability) RecordCallingInvoker(_ctx context.Context, e *event.Event) (context.Context, func(errOrResult error))

RecordCallingInvoker consumer middleware expect special data inside containing opentracing.SpanReference which receiver should put inside ˚ consumer represent invoker model for opentracing, from tracing objectives this mean that it begin new span either and that span should be return and used by others

func (TeleObservability) RecordReceivedMalformedEvent

func (t TeleObservability) RecordReceivedMalformedEvent(_ context.Context, _ error)

func (TeleObservability) RecordRequestEvent

func (t TeleObservability) RecordRequestEvent(ctx context.Context, _ event.Event) (context.Context, func(error, *event.Event))

func (*TeleObservability) RecordSendingEvent

func (t *TeleObservability) RecordSendingEvent(_ctx context.Context, e event.Event) (context.Context, func(errOrResult error))

RecordSendingEvent producer interceptor required context argument to be polluted with tel context creates new tracing brunch from provided inside context OpenTracing or tel data

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL