Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultConsumer() (instance *consumer, err error)
- func DefaultProducer() (instance *producer, err error)
- func GetAsyncProducer(name string) (instance *producer, err error)
- func GetConsumer(name string, typ ConsumerType) (instance *consumer, err error)
- func GetGroupConsumer(name string) (instance *consumer, err error)
- func GetPartitionConsumer(name string) (instance *consumer, err error)
- func GetProducer(name string, typ ProducerType) (instance *producer, err error)
- func GetSyncProducer(name string) (instance *producer, err error)
- func NewConsumer(option ConsumerOption) (*consumer, error)
- func NewProducer(option ProducerOption) (*producer, error)
- type ConsumerOption
- type ConsumerType
- type DefaultGroupHandler
- type KafkaComponent
- type KafkaConf
- type KafkaConf_Connection
- type KafkaConf_Connection_Consumer
- type KafkaConf_Connection_Net
- type KafkaConf_Connection_Net_TLS
- type KafkaConf_Connection_Producer
- type KafkaOperatorInterface
- type ProducerMessage
- type ProducerOption
- type ProducerPartitioner
- type ProducerType
Constants ¶
View Source
const ( CONSUMER_OFFSET_OLDEST = sarama.OffsetNewest CONSUMER_OFFSET_NEWEST = sarama.OffsetOldest )
View Source
const KafkaConfigName = "KafkaConf"
Variables ¶
View Source
var Component = &KafkaComponent{}
Functions ¶
func DefaultConsumer ¶
func DefaultConsumer() (instance *consumer, err error)
func DefaultProducer ¶
func DefaultProducer() (instance *producer, err error)
func GetAsyncProducer ¶
func GetConsumer ¶
func GetConsumer(name string, typ ConsumerType) (instance *consumer, err error)
func GetGroupConsumer ¶
func GetPartitionConsumer ¶
func GetProducer ¶
func GetProducer(name string, typ ProducerType) (instance *producer, err error)
GetProducer 获取生产者
func GetSyncProducer ¶
func NewConsumer ¶
func NewConsumer(option ConsumerOption) (*consumer, error)
func NewProducer ¶
func NewProducer(option ProducerOption) (*producer, error)
Types ¶
type ConsumerOption ¶
type ConsumerOption struct { ID string Type ConsumerType Config *sarama.Config Brokers []string Topic string Group string Partitions []int32 Offset int64 Size int64 Wait int64 Multi bool }
func (*ConsumerOption) GetID ¶
func (i *ConsumerOption) GetID() (clientId, consumerId string)
type ConsumerType ¶
type ConsumerType string
const ( CONSUMER_TYPE_PARTITION ConsumerType = "partition" CONSUMER_TYPE_GROUP ConsumerType = "group" )
type DefaultGroupHandler ¶
type DefaultGroupHandler struct {
Messages chan *sarama.ConsumerMessage
}
func (*DefaultGroupHandler) Cleanup ¶
func (i *DefaultGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
func (*DefaultGroupHandler) ConsumeClaim ¶
func (i *DefaultGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*DefaultGroupHandler) Setup ¶
func (i *DefaultGroupHandler) Setup(sarama.ConsumerGroupSession) error
type KafkaComponent ¶ added in v0.3.2
type KafkaComponent struct{}
func (*KafkaComponent) Inject ¶ added in v0.3.3
func (i *KafkaComponent) Inject(instance any) bool
func (*KafkaComponent) InjectConf ¶ added in v0.4.0
func (i *KafkaComponent) InjectConf(config cComponents.ConfigInterface) bool
func (*KafkaComponent) Listen ¶ added in v0.4.0
func (i *KafkaComponent) Listen() []*cComponents.ConfigListener
func (*KafkaComponent) Load ¶ added in v0.3.3
func (i *KafkaComponent) Load()
type KafkaConf ¶
type KafkaConf struct { Connections map[string]*KafkaConf_Connection `json:"connections"` HealthCheck int64 `json:"health_check"` }
func (*KafkaConf) ConfigName ¶
type KafkaConf_Connection ¶
type KafkaConf_Connection struct { Brokers []string `json:"brokers"` Topic string `json:"topic"` Net *KafkaConf_Connection_Net `json:"net"` Producer *KafkaConf_Connection_Producer `json:"producer"` Consumer *KafkaConf_Connection_Consumer `json:"consumer"` Version [4]uint `json:"version"` }
type KafkaConf_Connection_Producer ¶
type KafkaConf_Connection_Producer struct { MaxMessageBytes int `json:"max_message_bytes"` RequiredAck bool `json:"required_ack"` Partitioner ProducerPartitioner `json:"partitioner"` Partition int32 `json:"partition"` }
type KafkaOperatorInterface ¶
type ProducerMessage ¶
type ProducerMessage struct { Key string Value string Headers []sarama.RecordHeader }
type ProducerOption ¶
type ProducerOption struct { ID string Type ProducerType Config *sarama.Config Brokers []string Topic string Partition int32 }
func (*ProducerOption) GetID ¶
func (i *ProducerOption) GetID() (clientId, producerId string)
type ProducerPartitioner ¶
type ProducerPartitioner string
const ( PRODUCER_PARTITIONER_HASH ProducerPartitioner = "hash" PRODUCER_PARTITIONER_RANDOM ProducerPartitioner = "random" PRODUCER_PARTITIONER_MANUAL ProducerPartitioner = "manual" )
type ProducerType ¶
type ProducerType string
const ( PRODUCER_TYPE_SYNC ProducerType = "sync" PRODUCER_TYPE_ASYNC ProducerType = "async" )
Click to show internal directories.
Click to hide internal directories.