Versions in this module Expand all Collapse all v0 v0.14.0 Apr 24, 2024 Changes in this version type DispatchOptions + func AddInterceptors(interceptors ...ConsumerHandlerInterceptor) DispatchOptions + type Dispatcher struct + Interceptors []ConsumerDispatchInterceptor + Logger MessageLogger + func (d *Dispatcher) AddHandler(fn MessageHandlerFunc, opts ...DispatchOptions) error + func (d *Dispatcher) Dispatch(msgCtx *MessageContext) (err error) v0.13.0 Feb 27, 2024 Changes in this version + const ConfigKafkaBindingPrefix + const ConfigKafkaDefaultBindingPrefix + const ConfigKafkaPrefix + const ErrorCodeAutoAddPartitionsFailed + const ErrorCodeAutoCreateTopicFailed + const ErrorCodeBindingInternal + const ErrorCodeBrokerNotReachable + const ErrorCodeConsumerExists + const ErrorCodeIllegalLifecycleState + const ErrorCodeIllegalState + const ErrorCodeProducerExists + const ErrorSubTypeCodeBindingInternal + const ErrorSubTypeCodeConnectivity + const ErrorSubTypeCodeConsumerGeneral + const ErrorSubTypeCodeDecoding + const ErrorSubTypeCodeEncoding + const ErrorSubTypeCodeIllegalConsumerUsage + const ErrorSubTypeCodeIllegalProducerUsage + const ErrorSubTypeCodeProducerGeneral + const ErrorSubTypeCodeProvisioning + const ErrorTypeCodeBinding + const ErrorTypeCodeConsumer + const ErrorTypeCodeProducer + const FxGroup + const HeaderContentType + const MIMETypeBinary + const MIMETypeJson + const MIMETypeText + const Reserved + var ErrorCategoryKafka = NewErrorCategory(Reserved, errors.New("error type: kafka")) + var ErrorStartClosedBinding = NewKafkaError(ErrorCodeIllegalLifecycleState, "error: cannot start closed binding") + var ErrorSubTypeBindingInternal = NewErrorSubType(ErrorSubTypeCodeBindingInternal, errors.New("error sub-type: internal")) + var ErrorSubTypeConnectivity = NewErrorSubType(ErrorSubTypeCodeConnectivity, errors.New("error sub-type: connectivity")) + var ErrorSubTypeConsumerGeneral = NewErrorSubType(ErrorSubTypeCodeConsumerGeneral, errors.New("error sub-type: consumer")) + var ErrorSubTypeDecoding = NewErrorSubType(ErrorSubTypeCodeDecoding, errors.New("error sub-type: decoding")) + var ErrorSubTypeEncoding = NewErrorSubType(ErrorSubTypeCodeEncoding, errors.New("error sub-type: encoding")) + var ErrorSubTypeIllegalConsumerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalConsumerUsage, ...) + var ErrorSubTypeIllegalProducerUsage = NewErrorSubType(ErrorSubTypeCodeIllegalProducerUsage, ...) + var ErrorSubTypeProducerGeneral = NewErrorSubType(ErrorSubTypeCodeProducerGeneral, errors.New("error sub-type: producer")) + var ErrorSubTypeProvisioning = NewErrorSubType(ErrorSubTypeCodeProvisioning, errors.New("error sub-type: provisioning")) + var ErrorTypeBinding = NewErrorType(ErrorTypeCodeBinding, errors.New("error type: binding")) + var ErrorTypeConsumer = NewErrorType(ErrorTypeCodeConsumer, errors.New("error type: consumer")) + var ErrorTypeProducer = NewErrorType(ErrorTypeCodeProducer, errors.New("error type: producer")) + var Module = &bootstrap.Module + func BindingName(name string) func(cfg *bindingConfig) + func LogLevel(level log.LoggingLevel) func(cfg *bindingConfig) + func NewKafkaError(code int64, text string, causes ...interface{}) *CodedError + func Use() + type AckMode string + const AckModeModeAll + const AckModeModeLocal + const AckModeModeNone + func (m *AckMode) UnmarshalText(data []byte) error + type Binder interface + Consume func(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error) + ListTopics func() []string + Produce func(topic string, options ...ProducerOptions) (Producer, error) + Subscribe func(topic string, options ...ConsumerOptions) (Subscriber, error) + func ProvideKafkaBinder(di binderDI) Binder + type BinderLifecycle interface + Done func() <-chan struct{} + Initialize func(ctx context.Context) error + Shutdown func(ctx context.Context) error + Start func(ctx context.Context) error + type BinderOption struct + ApplicationConfig bootstrap.ApplicationConfig + ConsumerInterceptors []ConsumerDispatchInterceptor + HandlerInterceptors []ConsumerHandlerInterceptor + ProducerInterceptors []ProducerMessageInterceptor + Properties KafkaProperties + TLSCertsManager certs.Manager + type BinderOptions func(opt *BinderOption) + type BinderProperties struct + HeartbeatCurveFactor float64 + HeartbeatCurveMidpoint float64 + InitialHeartbeat utils.Duration + WatchdogHeartbeat utils.Duration + type BindingLifecycle interface + Close func() error + Closed func() bool + Start func(ctx context.Context) error + type BindingProperties struct + Consumer ConsumerProperties + Producer ProducerProperties + type ConsumerDispatchFinalizer interface + Finalize func(msgCtx *MessageContext, err error) (*MessageContext, error) + type ConsumerDispatchInterceptor interface + Intercept func(msgCtx *MessageContext) (*MessageContext, error) + type ConsumerGroupProperties struct + Backoff *utils.Duration + JoinTimeout *utils.Duration + MaxRetry *int + type ConsumerHandlerInterceptor interface + AfterHandling func(ctx context.Context, msg *Message, err error) (context.Context, error) + BeforeHandling func(ctx context.Context, msg *Message) (context.Context, error) + type ConsumerOptions func(cfg *bindingConfig) + func WithConsumerProperties(p *ConsumerProperties) ConsumerOptions + type ConsumerProperties struct + Backoff *utils.Duration + Group ConsumerGroupProperties + LogLevel *log.LoggingLevel + type DispatchOptions func(h *handler) + func FilterOnHeader(header string, matcher matcher.StringMatcher) DispatchOptions + type Encoder interface + Encode func(v interface{}) ([]byte, error) + MIMEType func() string + type GroupConsumer interface + AddHandler func(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error + Group func() string + Topic func() string + type Headers map[string]string + type HealthIndicator struct + func NewHealthIndicator(binder Binder) *HealthIndicator + func (i *HealthIndicator) Health(_ context.Context, opts health.Options) health.Health + func (i *HealthIndicator) Name() string + type KafkaProperties struct + Binder BinderProperties + Brokers utils.CommaSeparatedSlice + ClientId string + Metadata Metadata + Net Net + func BindKafkaProperties(ctx *bootstrap.ApplicationContext) KafkaProperties + type LoggerOptions func(opt *loggerOption) + type Message struct + Headers Headers + Payload interface{} + type MessageContext struct + Message Message + RawMessage interface{} + Source interface{} + Topic string + type MessageFilterFunc func(ctx context.Context, msg *Message) (shouldHandle bool) + type MessageHandlerFunc interface + type MessageLogger interface + LogReceivedMessage func(ctx context.Context, msg interface{}) + LogSentMessage func(ctx context.Context, msg interface{}) + WithLevel func(level log.LoggingLevel) MessageLogger + type MessageMetadata struct + Key []byte + Offset int + Partition int + Timestamp time.Time + type MessageOptions func(config *messageConfig) + func WithEncoder(valueEncoder Encoder) MessageOptions + func WithKey(key interface{}) MessageOptions + type Metadata struct + RefreshFrequency utils.Duration + type Net struct + Sasl SASL + Tls TLS + type Producer interface + ReadyCh func() <-chan struct{} + Send func(ctx context.Context, message interface{}, options ...MessageOptions) error + Topic func() string + type ProducerMessageFinalizer interface + Finalize func(msgCtx *MessageContext, partition int32, offset int64, err error) (*MessageContext, error) + type ProducerMessageInterceptor interface + Intercept func(msgCtx *MessageContext) (*MessageContext, error) + type ProducerOptions func(cfg *bindingConfig) + func AckTimeout(timeout time.Duration) ProducerOptions + func KeyEncoder(enc Encoder) ProducerOptions + func Partitions(partitionCount int, replicationFactor int) ProducerOptions + func RequireAllAck() ProducerOptions + func RequireLocalAck() ProducerOptions + func RequireNoAck() ProducerOptions + func WithProducerProperties(p *ProducerProperties) ProducerOptions + type ProducerProperties struct + AckMode *AckMode + AckTimeout *utils.Duration + Backoff *utils.Duration + LogLevel *log.LoggingLevel + MaxRetry *int + Provisioning ProvisioningProperties + type ProvisioningProperties struct + AllowLowerPartitions *bool + AutoAddPartitions *bool + AutoCreateTopic *bool + PartitionCount *int32 + ReplicationFactor *int16 + type SASL struct + Enable bool + Handshake bool + Password string + User string + type SaramaBinder interface + Client func() sarama.Client + type SaramaKafkaBinder struct + func NewBinder(ctx context.Context, opts ...BinderOptions) *SaramaKafkaBinder + func (b *SaramaKafkaBinder) Client() sarama.Client + func (b *SaramaKafkaBinder) CloseProducer(ctx context.Context, topic string) + func (b *SaramaKafkaBinder) Consume(topic string, group string, options ...ConsumerOptions) (GroupConsumer, error) + func (b *SaramaKafkaBinder) Done() <-chan struct{} + func (b *SaramaKafkaBinder) Initialize(ctx context.Context) (err error) + func (b *SaramaKafkaBinder) ListTopics() (topics []string) + func (b *SaramaKafkaBinder) Produce(topic string, options ...ProducerOptions) (Producer, error) + func (b *SaramaKafkaBinder) Shutdown(ctx context.Context) error + func (b *SaramaKafkaBinder) Start(ctx context.Context) (err error) + func (b *SaramaKafkaBinder) Subscribe(topic string, options ...ConsumerOptions) (Subscriber, error) + type Subscriber interface + AddHandler func(handlerFunc MessageHandlerFunc, opts ...DispatchOptions) error + Partitions func() []int32 + Topic func() string + type TLS struct + Certs certs.SourceProperties + Enable bool