Documentation ¶
Index ¶
- Constants
- func ConsumerLog(logger log.Logger) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PublisherLog(logger log.Logger) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- func TopologyFromConsumers(consumers ...Consumer) topology.Declarations
- type BatchConsumer
- type BatchHandler
- type BatchHandlerAdapter
- type BatchHandlerAdapterFunc
- type BatchItem
- type Binding
- type Client
- type Config
- type ConfigOption
- type Connection
- type Consumer
- type LogObserver
- func (l LogObserver) ClientError(err error)
- func (l LogObserver) ClientReady()
- func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)
- func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)
- func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
- func (l LogObserver) ShutdownDone()
- func (l LogObserver) ShutdownStarted()
- type Publisher
- type PublisherMetricStorage
- type RetryConfig
- type RetryPolicy
Constants ¶
View Source
const ( DefaultHeartbeat = 3 * time.Second DefaultDialTimeout = 5 * time.Second )
View Source
const (
RequestIdHeader = "x-request-id"
)
Variables ¶
This section is empty.
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger) consumer.Middleware
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
func JoinDeclarations ¶
func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations
func NewResultHandler ¶
func PublisherLog ¶
func PublisherLog(logger log.Logger) publisher.Middleware
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
func TopologyFromConsumers ¶
func TopologyFromConsumers(consumers ...Consumer) topology.Declarations
Types ¶
type BatchConsumer ¶
type BatchConsumer struct { Queue string `validate:"required" schema:"Наименование очереди"` Dlq bool `schema:"Создать очередь DLQ"` BatchSize int `validate:"required" schema:"Количество сообщений в пачке"` PurgeIntervalInMs int `validate:"required" schema:"Интервал обработки"` DisableAutoDeclare bool `` /* 182-byte string literal not displayed */ Binding *Binding `schema:"Настройки топологии"` RetryPolicy *RetryPolicy `schema:"Политика повторной обработки"` }
func (BatchConsumer) ConsumerConfig ¶
func (b BatchConsumer) ConsumerConfig() Consumer
func (BatchConsumer) DefaultConsumer ¶
func (b BatchConsumer) DefaultConsumer(handler BatchHandlerAdapter, restMiddlewares ...consumer.Middleware) consumer.Consumer
type BatchHandler ¶
type BatchHandler struct {
// contains filtered or unexported fields
}
func NewBatchHandler ¶
func NewBatchHandler( adapter BatchHandlerAdapter, purgeInterval time.Duration, maxSize int, ) *BatchHandler
func (*BatchHandler) Close ¶
func (r *BatchHandler) Close()
type BatchHandlerAdapter ¶
type BatchHandlerAdapter interface {
Handle(batch []BatchItem)
}
type BatchHandlerAdapterFunc ¶
type BatchHandlerAdapterFunc func(batch []BatchItem)
func (BatchHandlerAdapterFunc) Handle ¶
func (b BatchHandlerAdapterFunc) Handle(batch []BatchItem)
type Config ¶
type Config struct { Url string Publishers []*publisher.Publisher Consumers []consumer.Consumer Declarations topology.Declarations }
func NewConfig ¶
func NewConfig(url string, opts ...ConfigOption) Config
type ConfigOption ¶
type ConfigOption func(c *Config)
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
func WithDeclarations ¶
func WithDeclarations(declarations topology.Declarations) ConfigOption
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
type Connection ¶
type Connection struct { Host string `validate:"required" schema:"Хост"` Port int `validate:"required" schema:"Порт"` Username string `schema:"Логин"` Password string `schema:"Пароль"` Vhost string `schema:"Виртуальный хост"` }
func (Connection) Url ¶
func (c Connection) Url() string
type Consumer ¶
type Consumer struct { Queue string `validate:"required" schema:"Наименование очереди"` Dlq bool `schema:"Создать очередь DLQ"` PrefetchCount int `schema:"Количество предзагруженных сообщений,по умолчанию - 1"` Concurrency int `` /* 168-byte string literal not displayed */ DisableAutoDeclare bool `` /* 182-byte string literal not displayed */ Binding *Binding `schema:"Настройки топологии"` RetryPolicy *RetryPolicy `schema:"Политика повторной обработки"` }
func (Consumer) DefaultConsumer ¶
type LogObserver ¶
type LogObserver struct { grmq.NoopObserver // contains filtered or unexported fields }
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
func (LogObserver) ConsumerError ¶
func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)
func (LogObserver) PublisherError ¶
func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)
func (LogObserver) PublishingFlow ¶
func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
type Publisher ¶
type Publisher struct { Exchange string `schema:"Точка обмена"` RoutingKey string `` /* 182-byte string literal not displayed */ }
func (Publisher) DefaultPublisher ¶
func (p Publisher) DefaultPublisher(restMiddlewares ...publisher.Middleware) *publisher.Publisher
type PublisherMetricStorage ¶
type RetryConfig ¶
type RetryPolicy ¶
type RetryPolicy struct { FinallyMoveToDlq bool `` /* 173-byte string literal not displayed */ Retries []RetryConfig `schema:"Настройки"` }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.