bus

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2020 License: MIT Imports: 15 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigureTLS

func ConfigureTLS(nsqCfg *nsq.Config, tlsCfg *TLSConfig)

ConfigureTLS configures the given NSQ configuration for TLS connections.

func CreateNSQConfig

func CreateNSQConfig(tlsCfg *TLSConfig) *nsq.Config

CreateNSQConfig creates and configures a TLS enabled (if given TLS config != nil) NSQ configuration.

func LoadCertificate

func LoadCertificate(path string) (*tls.Certificate, error)

LoadCertificate reads file, divides into key and certificates

func TTL

func TTL(ttl time.Duration) crOption

TTL specifies the maximum age of messages to accept. If a message is received that is older than the given ttl, it will be dropped.

func Timeout

func Timeout(timeout time.Duration, timeoutFunction OnTimeout) crOption

Timeout guards the event handler with a timeout, timeout 0 means no timeout. The optional timeoutFunction is called in the case of timeout while handling the event.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

A Consumer wraps the base configuration for the nsq connection

func NewConsumer

func NewConsumer(log *zap.Logger, tlsCfg *TLSConfig, lookupds ...string) (*Consumer, error)

NewConsumer returns a consumer and stores the addresses of the lookupd's.

func (*Consumer) MustRegister

func (c *Consumer) MustRegister(topic, channel string) *ConsumerRegistration

func (*Consumer) Register

func (c *Consumer) Register(topic, channel string) (*ConsumerRegistration, error)

func (*Consumer) With

func (c *Consumer) With(opts ...Option) *Consumer

type ConsumerRegistration

type ConsumerRegistration struct {
	// contains filtered or unexported fields
}

func (*ConsumerRegistration) Consume

func (cr *ConsumerRegistration) Consume(paramProto interface{}, recv Receiver, concurrent int, opts ...crOption) error

Consume a message

func (*ConsumerRegistration) Output

func (cr *ConsumerRegistration) Output(num int, msg string) error

type Level

type Level int

Level specifies the severity of a given log message

const (
	Debug Level = iota
	Info
	Warning
	Error
)

Log levels

type OnTimeout

type OnTimeout func(err TimeoutError) error

OnTimeout function that is called in the case of timeout while handling the event

type Option

type Option func(registration *Consumer) *Consumer

func LogLevel

func LogLevel(v Level) Option

LogLevel maps between our loglevel and nsq loglevels

type Publisher

type Publisher interface {
	Publish(topic string, data interface{}) error
	CreateTopic(topic string) error
}

A Publisher is used for event publishing to topics. The fields Publish and CreateTopics can be overwritten to mock this publisher.

func NewPublisher

func NewPublisher(zlog *zap.Logger, publisherCfg *PublisherConfig) (Publisher, error)

NewPublisher creates a new publisher to produce events for topics.

type PublisherConfig

type PublisherConfig struct {
	TCPAddress   string
	HTTPEndpoint string
	TLS          *TLSConfig
	NSQ          *nsq.Config
}

A PublisherConfig represents the config of an NSQ publisher.

func (*PublisherConfig) ConfigureNSQ

func (p *PublisherConfig) ConfigureNSQ()

ConfigureTLS configures the publisher regarding NSQ.

type Receiver

type Receiver func(interface{}) error

A Receiver is a callback when you receive messages from the bus.

type TLSConfig

type TLSConfig struct {
	CACertFile     string
	ClientCertFile string
}

A TLSConfig represents the TLS config of an NSQ publisher/consumer.

func (*TLSConfig) Inactive

func (cfg *TLSConfig) Inactive() bool

Inactive a TLSConfig considered inactive if neither a ca-cert nor a client-cert is present

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

func (TimeoutError) Error

func (t TimeoutError) Error() string

func (TimeoutError) Event

func (t TimeoutError) Event() interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL