edatstan

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2021 License: MIT Imports: 15 Imported by: 0

README

edat-stan - Streaming NATS for edat

Installation

go get -u github.com/stackus/edat-stan

Usage Example

import "github.com/stack/edat-stan"

conn, _ := stan.Connect(clusterID, clientID, options)

// Create a consumer and use it in a message subscriber
consumer := edatstan.NewConsumer(conn, groupID)
subscriber := msg.NewSubscriber(consumer)

// Create a producer and use it in a message publisher
producer := edatstan.NewProducer(conn)
publisher := msg.NewPublisher(producer)

Prerequisites

Go 1.15

Features

  • Message Consumer NewConsumer(stan.Conn, groupID, ...options)
  • Message Producer NewProducer(stan.Conn, ...options)

TODOs

  • Documentation
  • Tests, tests, and more tests

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthMessage        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowMessage          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group")
)
View Source
var DefaultAckWait = time.Second * 30
View Source
var DefaultSerializer = ProtoSerializer{}

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conn stan.Conn, groupID string, options ...ConsumerOption) *Consumer

func (*Consumer) Close

func (c *Consumer) Close(context.Context) error

func (*Consumer) Listen

func (c *Consumer) Listen(ctx context.Context, channel string, subscription msg.ReceiveMessageFunc) error

type ConsumerOption

type ConsumerOption func(*Consumer)

func WithConsumerActWait

func WithConsumerActWait(ackWait time.Duration) ConsumerOption

func WithConsumerLogger

func WithConsumerLogger(logger log.Logger) ConsumerOption

func WithConsumerSerializer

func WithConsumerSerializer(serializer Serializer) ConsumerOption

func WithConsumerSubscriptionOptions

func WithConsumerSubscriptionOptions(option ...stan.SubscriptionOption) ConsumerOption

type Msg

type Msg struct {
	Id      string            `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	Headers map[string]string `` /* 155-byte string literal not displayed */
	Payload []byte            `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
}

func (*Msg) Descriptor

func (*Msg) Descriptor() ([]byte, []int)

func (*Msg) Equal

func (this *Msg) Equal(that interface{}) bool

func (*Msg) GetHeaders

func (m *Msg) GetHeaders() map[string]string

func (*Msg) GetId

func (m *Msg) GetId() string

func (*Msg) GetPayload

func (m *Msg) GetPayload() []byte

func (*Msg) GoString

func (this *Msg) GoString() string

func (*Msg) Marshal

func (m *Msg) Marshal() (dAtA []byte, err error)

func (*Msg) MarshalTo

func (m *Msg) MarshalTo(dAtA []byte) (int, error)

func (*Msg) MarshalToSizedBuffer

func (m *Msg) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Msg) ProtoMessage

func (*Msg) ProtoMessage()

func (*Msg) Reset

func (m *Msg) Reset()

func (*Msg) Size

func (m *Msg) Size() (n int)

func (*Msg) String

func (this *Msg) String() string

func (*Msg) Unmarshal

func (m *Msg) Unmarshal(dAtA []byte) error

func (*Msg) XXX_DiscardUnknown

func (m *Msg) XXX_DiscardUnknown()

func (*Msg) XXX_Marshal

func (m *Msg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Msg) XXX_Merge

func (m *Msg) XXX_Merge(src proto.Message)

func (*Msg) XXX_Size

func (m *Msg) XXX_Size() int

func (*Msg) XXX_Unmarshal

func (m *Msg) XXX_Unmarshal(b []byte) error

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conn stan.Conn, options ...ProducerOption) *Producer

func (*Producer) Close

func (p *Producer) Close(context.Context) error

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, channel string, message msg.Message) error

type ProducerOption

type ProducerOption func(*Producer)

func WithProducerLogger

func WithProducerLogger(logger log.Logger) ProducerOption

func WithProducerSerializer

func WithProducerSerializer(serializer Serializer) ProducerOption

type ProtoSerializer

type ProtoSerializer struct{}

func (ProtoSerializer) Deserialize

func (ProtoSerializer) Deserialize(message *stan.Msg) (msg.Message, error)

func (ProtoSerializer) Serialize

func (ProtoSerializer) Serialize(message msg.Message) ([]byte, error)

type Serializer

type Serializer interface {
	Serialize(message msg.Message) ([]byte, error)
	Deserialize(message *stan.Msg) (msg.Message, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL