nsq

package
v0.0.0-...-e31961d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2022 License: GPL-3.0 Imports: 8 Imported by: 0

README

NSQ

Wrapper of nsqio library

Please note that this library only wrapped the existing consumers from nsq/nsqio backend and mapped them. The nsq.Handler also overrided with HandlerFunc in this library.

Design

The library flow control is based on buffered channel for each topic and channel. This means every consumer for different topic and channel might has different size of buffered channel and number of concurrency.

The worker model will replace ConcurrenHandler for handling multiple message concurrently. This is because the library want to control the flow of the message by using buffered channel as the main communication channel for worker.

nsq throttling design

Throttling

By design, the handler that registered by this library is not directly exposed to the consumer handler. This means the handler not directly asking for message from nsq.

The message is being sent into the handler from a go channel that is dedicated for specific topic and channel, and the message can be consumed from that go channel only. By using this mechanism, the rate of message consumed by the concurrent handler can be controlled when something wrong happened.

Message Retrieval Throttling

This throttling is on by default.

The message retrieval is throttled when the number of message in the channel is more than half of its size.

For example, if the length of buffer is 10, and the message already exceeding 5. The consumer will pause the message consumption until the number of message in the buffer is going back to less than half of the buffer.

Message Processing Throttling

This throttling can be enabled by using Throttling middleware

The message processing is throttled when the number of message in the channel is mor ethan half of its size.

For example if the length of buffer is 10, and the message already exceeding 5. The consumer will slow down the message processing, this throttling is being handled by the Throttling middleware in this library. If the throttle middleware is set, then the library will seek throttled status in the message.

How To Use The Library

To use this library, the consumer must be created using nsq/nsqio.

TODO

  • DNS: make it possible to specify a single addresss with host or single/multiple address with IP. If a single host is given, then resolve to host.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTopicWithChannelNotFound for error when channel and topic is not found
	ErrTopicWithChannelNotFound = errors.New("nsq: topic and channel not found")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer for nsq

func WrapConsumers

func WrapConsumers(config ConsumerConfig, backends ...ConsumerBackend) (*Consumer, error)

WrapConsumers of gonsq

func (*Consumer) Backends

func (c *Consumer) Backends() map[string]map[string]bool

Backends return information regarding topic and channel that avaialbe

func (*Consumer) Handle

func (c *Consumer) Handle(topic, channel string, handler HandlerFunc)

Handle the consumer

func (*Consumer) Start

func (c *Consumer) Start() error

Start the consumer.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop all the nsq consumer

func (*Consumer) Use

func (c *Consumer) Use(middleware ...MiddlewareFunc)

Use the middleware use should be called before handle function this function will avoid to add the same middleware twice if the same middleware is used, it will skip the addition

type ConsumerBackend

type ConsumerBackend interface {
	Topic() string
	Channel() string
	Stop()
	AddHandler(handler gonsq.Handler)
	AddConcurrentHandlers(handler gonsq.Handler, concurrency int)
	ConnectToNSQLookupds(addresses []string) error
	ChangeMaxInFlight(n int)
	Concurrency() int
	BufferMultiplier() int
}

ConsumerBackend for NSQ

type ConsumerConfig

type ConsumerConfig struct {
	LookupdsAddr []string
	// Concurrency is the number of worker intended for handling message from nsq
	Concurrency int
	// BufferMultiplier is the multiplier factor of concurrency to set the size of buffer when consuming message
	// the size of buffer multiplier is number of message being consumed before the buffer will be half full.
	// For example, 20(default value) buffer multiplier means the worker is able to consume more than 10 message,
	// before the buffer is half full from the nsqd message consumption.
	// To fill this configuration correctly, it is needed to observe the consumption rate of the message and the handling rate of the worker.
	BufferMultiplier int
}

ConsumerConfig to supply WrapConsumers

func (*ConsumerConfig) Validate

func (cg *ConsumerConfig) Validate() error

Validate consumer configuration

type HandlerFunc

type HandlerFunc func(ctx context.Context, message *Message) error

HandlerFunc for nsq

func Metrics

func Metrics(handler HandlerFunc) HandlerFunc

Metrics middleware for nsq metrics that might be missleading: - throttled - message_in_buffer The metrics might be missleading because the message is not processed in ordered manner.

type Info

type Info struct {
	WorkerTotal     int
	WorkerCurrent   int
	MessageInBuffer int
	ThrottleFlag    int
	Throttled       int
}

Info for message

type Message

type Message struct {
	Topic   string
	Channel string
	Message *gonsq.Message
	Info    *Info
}

Message for nsq

func (*Message) Finish

func (m *Message) Finish()

Finish call gonsq message finish

func (*Message) ID

func (m *Message) ID() gonsq.MessageID

ID return message id from gonsq message.ID

func (*Message) Requeue

func (m *Message) Requeue(delay time.Duration)

Requeue call gonsq message requeue

func (*Message) RequeueWithoutBackoff

func (m *Message) RequeueWithoutBackoff(delay time.Duration)

RequeueWithoutBackoff call gonsq message requeueWithoutBackoff

type MiddlewareFunc

type MiddlewareFunc func(handler HandlerFunc) HandlerFunc

MiddlewareFunc for nsq middleware

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer for nsq

func WrapProducer

func WrapProducer(backend ProducerBackend, topics ...string) *Producer

WrapProducer is a function to wrap the nsq producer

func (*Producer) MultiPublish

func (p *Producer) MultiPublish(topic string, body [][]byte) error

MultiPublish message to nsqd

func (*Producer) Publish

func (p *Producer) Publish(topic string, body []byte) error

Publish message to nsqd

type ProducerBackend

type ProducerBackend interface {
	Ping() error
	Publish(topic string, body []byte) error
	MultiPublish(topic string, body [][]byte) error
	Stop()
}

ProducerBackend for NSQ

type ThrottleMiddleware

type ThrottleMiddleware struct {
	// TimeDelay means the duration of time to pause message consumption
	TimeDelay time.Duration
}

ThrottleMiddleware implement MiddlewareFunc

func (*ThrottleMiddleware) Throttle

func (tm *ThrottleMiddleware) Throttle(handler HandlerFunc) HandlerFunc

Throttle middleware for nsq. This middleware check whether there is some information about throttling in the message.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL