grmqx

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: MIT Imports: 20 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultHeartbeat   = 3 * time.Second
	DefaultDialTimeout = 5 * time.Second
)
View Source
const (
	RequestIdHeader = "x-request-id"
)

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger) consumer.Middleware

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

func JoinDeclarations

func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

func PublisherLog

func PublisherLog(logger log.Logger) publisher.Middleware

func PublisherMetrics

func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

func TopologyFromConsumers

func TopologyFromConsumers(consumers ...Consumer) topology.Declarations

Types

type BatchConsumer

type BatchConsumer struct {
	Queue              string       `validate:"required" schema:"Наименование очереди"`
	Dlq                bool         `schema:"Создать очередь DLQ"`
	BatchSize          int          `validate:"required" schema:"Количество сообщений в пачке"`
	PurgeIntervalInMs  int          `validate:"required" schema:"Интервал обработки"`
	DisableAutoDeclare bool         `` /* 182-byte string literal not displayed */
	Binding            *Binding     `schema:"Настройки топологии"`
	RetryPolicy        *RetryPolicy `schema:"Политика повторной обработки"`
}

func (BatchConsumer) ConsumerConfig

func (b BatchConsumer) ConsumerConfig() Consumer

func (BatchConsumer) DefaultConsumer

func (b BatchConsumer) DefaultConsumer(handler BatchHandlerAdapter, restMiddlewares ...consumer.Middleware) consumer.Consumer

type BatchHandler

type BatchHandler struct {
	// contains filtered or unexported fields
}

func NewBatchHandler

func NewBatchHandler(
	adapter BatchHandlerAdapter,
	purgeInterval time.Duration,
	maxSize int,
) *BatchHandler

func (*BatchHandler) Close

func (r *BatchHandler) Close()

func (*BatchHandler) Handle

func (r *BatchHandler) Handle(ctx context.Context, delivery *consumer.Delivery)

type BatchHandlerAdapter

type BatchHandlerAdapter interface {
	Handle(batch []BatchItem)
}

type BatchHandlerAdapterFunc

type BatchHandlerAdapterFunc func(batch []BatchItem)

func (BatchHandlerAdapterFunc) Handle

func (b BatchHandlerAdapterFunc) Handle(batch []BatchItem)

type BatchItem

type BatchItem struct {
	Context  context.Context
	Delivery *consumer.Delivery
}

type Binding

type Binding struct {
	Exchange     string `validate:"required" schema:"Точка обмена"`
	ExchangeType string `validate:"required,oneof=direct fanout topic" schema:"Тип точки обмена"`
	RoutingKey   string `validate:"required" schema:"Ключ маршрутизации"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(logger log.Logger) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

func (*Client) Upgrade

func (c *Client) Upgrade(ctx context.Context, config Config) error

func (*Client) UpgradeAndServe

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

type Config

type Config struct {
	Url          string
	Publishers   []*publisher.Publisher
	Consumers    []consumer.Consumer
	Declarations topology.Declarations
}

func NewConfig

func NewConfig(url string, opts ...ConfigOption) Config

type ConfigOption

type ConfigOption func(c *Config)

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ConfigOption

func WithDeclarations

func WithDeclarations(declarations topology.Declarations) ConfigOption

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

type Connection

type Connection struct {
	Host     string `validate:"required" schema:"Хост"`
	Port     int    `validate:"required" schema:"Порт"`
	Username string `schema:"Логин"`
	Password string `schema:"Пароль"`
	Vhost    string `schema:"Виртуальный хост"`
}

func (Connection) Url

func (c Connection) Url() string

type Consumer

type Consumer struct {
	Queue              string       `validate:"required" schema:"Наименование очереди"`
	Dlq                bool         `schema:"Создать очередь DLQ"`
	PrefetchCount      int          `schema:"Количество предзагруженных сообщений,по умолчанию - 1"`
	Concurrency        int          `` /* 168-byte string literal not displayed */
	DisableAutoDeclare bool         `` /* 182-byte string literal not displayed */
	Binding            *Binding     `schema:"Настройки топологии"`
	RetryPolicy        *RetryPolicy `schema:"Политика повторной обработки"`
}

func (Consumer) DefaultConsumer

func (c Consumer) DefaultConsumer(handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer

type LogObserver

type LogObserver struct {
	grmq.NoopObserver
	// contains filtered or unexported fields
}

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

func (LogObserver) ClientError

func (l LogObserver) ClientError(err error)

func (LogObserver) ClientReady

func (l LogObserver) ClientReady()

func (LogObserver) ConsumerError

func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)

func (LogObserver) PublisherError

func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)

func (LogObserver) PublishingFlow

func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)

func (LogObserver) ShutdownDone

func (l LogObserver) ShutdownDone()

func (LogObserver) ShutdownStarted

func (l LogObserver) ShutdownStarted()

type Publisher

type Publisher struct {
	Exchange   string `schema:"Точка обмена"`
	RoutingKey string `` /* 182-byte string literal not displayed */
}

func (Publisher) DefaultPublisher

func (p Publisher) DefaultPublisher(restMiddlewares ...publisher.Middleware) *publisher.Publisher

type PublisherMetricStorage

type PublisherMetricStorage interface {
	ObservePublishDuration(exchange string, routingKey string, t time.Duration)
	ObservePublishMsgSize(exchange string, routingKey string, size int)
	IncPublishError(exchange string, routingKey string)
}

type RetryConfig

type RetryConfig struct {
	DelayInMs   int `validate:"required" schema:"Задержка в миллисекундах"`
	MaxAttempts int `validate:"required" schema:"Количество попыток,-1 = бесконечно"`
}

type RetryPolicy

type RetryPolicy struct {
	FinallyMoveToDlq bool          `` /* 173-byte string literal not displayed */
	Retries          []RetryConfig `schema:"Настройки"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL