Documentation ¶
Index ¶
- Constants
- func ConsumerLog(logger log.Logger) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func GetHeaderValue(headers []kafka.Header, key string) string
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PlainAuth(auth *Auth) sasl.Mechanism
- func PublisherLog(logger log.Logger) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- type Auth
- type Client
- type Config
- type ConfigOption
- type ConsumerConfig
- type KafkaClient
- type LogObserver
- type NoopObserver
- type Observer
- type PublisherConfig
- func (p PublisherConfig) DefaultPublisher(logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher
- func (p PublisherConfig) GetBatchSizePerPartition() int
- func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
- func (p PublisherConfig) GetMaxMessageSizePerPartition() int64
- func (p PublisherConfig) GetRequiredAckLevel() kafka.RequiredAcks
- func (p PublisherConfig) GetWriteTimeout() time.Duration
- type PublisherMetricStorage
Constants ¶
View Source
const RequestIdHeader = "x-request-id"
Variables ¶
This section is empty.
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger) consumer.Middleware
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
func GetHeaderValue ¶
func NewResultHandler ¶
func PublisherLog ¶
func PublisherLog(logger log.Logger) publisher.Middleware
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
Types ¶
type ConfigOption ¶
type ConfigOption func(c *Config)
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
type ConsumerConfig ¶
type ConsumerConfig struct { Addresses []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"` Topic string `validate:"required" schema:"Топик"` GroupId string `validate:"required" schema:"Идентификатор консьюмера"` Concurrency int `schema:"Кол-во обработчиков, по умолчанию 1"` MaxBatchSizeMb int `` /* 133-byte string literal not displayed */ CommitIntervalSec *int `` /* 141-byte string literal not displayed */ Auth *Auth `schema:"Параметры аутентификации"` }
func (ConsumerConfig) DefaultConsumer ¶
func (c ConsumerConfig) DefaultConsumer( logger log.Logger, handler consumer.Handler, restMiddlewares ...consumer.Middleware, ) consumer.Consumer
func (ConsumerConfig) GetCommitInterval ¶ added in v1.37.1
func (c ConsumerConfig) GetCommitInterval() time.Duration
func (ConsumerConfig) GetMaxBatchSizeMb ¶ added in v1.37.1
func (c ConsumerConfig) GetMaxBatchSizeMb() int
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
type LogObserver ¶
type LogObserver struct { NoopObserver // contains filtered or unexported fields }
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
type NoopObserver ¶
type NoopObserver struct{}
func (NoopObserver) ClientError ¶
func (n NoopObserver) ClientError(err error)
func (NoopObserver) ClientReady ¶
func (n NoopObserver) ClientReady()
func (NoopObserver) ShutdownDone ¶
func (n NoopObserver) ShutdownDone()
func (NoopObserver) ShutdownStarted ¶
func (n NoopObserver) ShutdownStarted()
type Observer ¶
type Observer interface { ClientReady() ClientError(err error) ShutdownStarted() ShutdownDone() }
type PublisherConfig ¶
type PublisherConfig struct { Addresses []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"` Topic string `` /* 160-byte string literal not displayed */ MaxMsgSizeMbPerPartition int64 `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"` BatchSizePerPartition int `` /* 142-byte string literal not displayed */ BatchTimeoutPerPartitionMs *int `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"` WriteTimeoutSec *int `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"` RequiredAckLevel int `` /* 179-byte string literal not displayed */ Auth *Auth `schema:"Параметры аутентификации"` }
func (PublisherConfig) DefaultPublisher ¶
func (p PublisherConfig) DefaultPublisher(logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher
func (PublisherConfig) GetBatchSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchSizePerPartition() int
func (PublisherConfig) GetBatchTimeoutPerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration
func (PublisherConfig) GetMaxMessageSizePerPartition ¶ added in v1.38.1
func (p PublisherConfig) GetMaxMessageSizePerPartition() int64
func (PublisherConfig) GetRequiredAckLevel ¶
func (p PublisherConfig) GetRequiredAckLevel() kafka.RequiredAcks
func (PublisherConfig) GetWriteTimeout ¶ added in v1.37.1
func (p PublisherConfig) GetWriteTimeout() time.Duration
Source Files ¶
Click to show internal directories.
Click to hide internal directories.