Documentation
¶
Index ¶
- Constants
- type ConsumerComponent
- type ConsumerComponentOption
- func WithConsumerAppName(appName string) ConsumerComponentOption
- func WithConsumerBrokers(brokers []string) ConsumerComponentOption
- func WithConsumerLogger(logger *logrus.Entry) ConsumerComponentOption
- func WithConsumerTLSDir(tlsDir string) ConsumerComponentOption
- func WithConsumerTopics(topics []string) ConsumerComponentOption
- func WithSASLMechanism(protocol, username, password string) (ConsumerComponentOption, error)
- type ProducerComponent
- type ProducerComponentOption
- func WithProducerBatchSize(batchSize int) ProducerComponentOption
- func WithProducerBatchTimeout(batchTimeout time.Duration) ProducerComponentOption
- func WithProducerBrokers(brokers []string) ProducerComponentOption
- func WithProducerLogger(logger *logrus.Entry) ProducerComponentOption
- func WithProducerSASLMechanism(protocol, username, password string) (ProducerComponentOption, error)
- func WithProducerTLSDir(tlsDir string) ProducerComponentOption
Constants ¶
const ( HeaderCorrelationID = "correlation-id" HeaderOriginatorID = "originator-id" HeaderProtoName = "proto-name" )
const ( ConsumerComponentName = "kafka-consumer" ProducerComponentName = "kafka-producer" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerComponent ¶
type ConsumerComponent struct { Consumer *kafka.Reader // contains filtered or unexported fields }
func NewConsumerComponent ¶
func NewConsumerComponent(opts ...ConsumerComponentOption) *ConsumerComponent
NewConsumerComponent returns a new ConsumerComponent
func (*ConsumerComponent) Health ¶
func (c *ConsumerComponent) Health() error
Health implements the Component interface.
func (*ConsumerComponent) Name ¶
func (c *ConsumerComponent) Name() string
Name implements the Component interface.
func (*ConsumerComponent) Start ¶
func (c *ConsumerComponent) Start() error
Start implements the Component interface.
func (*ConsumerComponent) Stop ¶
func (c *ConsumerComponent) Stop() error
Stop implements the Component interface.
type ConsumerComponentOption ¶
type ConsumerComponentOption func(*ConsumerComponent)
ConsumerComponentOption represents an option for the ConsumerComponent
func WithConsumerAppName ¶
func WithConsumerAppName(appName string) ConsumerComponentOption
WithConsumerAppName sets the app name for the ConsumerComponent
func WithConsumerBrokers ¶
func WithConsumerBrokers(brokers []string) ConsumerComponentOption
WithConsumerBrokers sets the brokers for the ConsumerComponent
func WithConsumerLogger ¶
func WithConsumerLogger(logger *logrus.Entry) ConsumerComponentOption
WithConsumerLogger sets the logger for the ConsumerComponent
func WithConsumerTLSDir ¶
func WithConsumerTLSDir(tlsDir string) ConsumerComponentOption
WithConsumerTLSDir sets the location of the TLS directory for the ConsumerComponent
func WithConsumerTopics ¶
func WithConsumerTopics(topics []string) ConsumerComponentOption
WithConsumerTopics sets the topics for the ConsumerComponent
func WithSASLMechanism ¶
func WithSASLMechanism(protocol, username, password string) (ConsumerComponentOption, error)
WithSASLMechanism sets the sasl mechanism for the ConsumerComponent
type ProducerComponent ¶
type ProducerComponent struct { Producer *kafka.Writer // contains filtered or unexported fields }
ProducerComponent represents a Kafka producer component
func NewProducerComponent ¶
func NewProducerComponent(opts ...ProducerComponentOption) *ProducerComponent
NewProducerComponent returns a new ProducerComponent
func (*ProducerComponent) Health ¶
func (c *ProducerComponent) Health() error
Health implements the Component interface.
func (*ProducerComponent) Name ¶
func (c *ProducerComponent) Name() string
Name implements the Component interface.
func (*ProducerComponent) Start ¶
func (c *ProducerComponent) Start() error
Start implements the Component interface.
func (*ProducerComponent) Stop ¶
func (c *ProducerComponent) Stop() error
Stop implements the Component interface.
type ProducerComponentOption ¶
type ProducerComponentOption func(*ProducerComponent)
ProducerComponentOption represents an option for the ProducerComponent
func WithProducerBatchSize ¶
func WithProducerBatchSize(batchSize int) ProducerComponentOption
WithProducerBatchSize sets the batching size for the ProducerComponent
func WithProducerBatchTimeout ¶
func WithProducerBatchTimeout(batchTimeout time.Duration) ProducerComponentOption
WithProducerBatchTimeout sets the batching timeout for the ProducerComponent
func WithProducerBrokers ¶
func WithProducerBrokers(brokers []string) ProducerComponentOption
WithProducerBrokers sets the brokers for the ProducerComponent
func WithProducerLogger ¶
func WithProducerLogger(logger *logrus.Entry) ProducerComponentOption
WithProducerLogger sets the logger for the ProducerComponent
func WithProducerSASLMechanism ¶
func WithProducerSASLMechanism(protocol, username, password string) (ProducerComponentOption, error)
WithProducerSASLMechanism sets the sasl mechanism for the ProducerComponent
func WithProducerTLSDir ¶
func WithProducerTLSDir(tlsDir string) ProducerComponentOption
WithProducerTLSDir sets the location of the TLS files for the ProducerComponent