Documentation
¶
Index ¶
- Variables
- func NormalizeMainTopicName(e Event) string
- type ErrorBatchHandler
- type Event
- type Header
- type Message
- func (m *Message) GetData() []byte
- func (m *Message) GetHeaderByKey(key []byte) []byte
- func (m *Message) GetHeaders() []*Header
- func (m *Message) GetRaw() sarama.ConsumerMessage
- func (m *Message) GetSinceTime() time.Duration
- func (m *Message) GetTopicName() string
- func (m *Message) SetHeaderByKey(key []byte, val []byte)
- func (m *Message) Unmarshal(evtType reflect.Type) (Event, error)
- type PublisherKafkaConfig
- type Topic
- type TopicOption
Constants ¶
This section is empty.
Variables ¶
View Source
var (
ErrorWithoutRetry = errors.New("error without retry")
)
View Source
var KafkaDefaultVersion = sarama.V3_4_0_0
Functions ¶
func NormalizeMainTopicName ¶
NormalizeMainTopicName return main topic when consume
Types ¶
type ErrorBatchHandler ¶
type ErrorBatchHandler struct {
Indexes []int
}
ErrorBatchHandler throw when has some msg error Indexes is empty mean: all msg is pass
func (ErrorBatchHandler) Error ¶
func (e ErrorBatchHandler) Error() string
type Event ¶
type Event interface { // GetTopicName should return main topic name GetTopicName() string // GetPartitionValue should return value to GetPartitionValue() string }
Event abstraction for all struct
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
func NewMessage(msg *sarama.ConsumerMessage, marshaler marshaler.Marshaler) *Message
NewMessage create a new message.
func (*Message) GetHeaderByKey ¶
func (*Message) GetRaw ¶
func (m *Message) GetRaw() sarama.ConsumerMessage
func (*Message) GetSinceTime ¶
GetSinceTime gets the remain time of a message before retried.
func (*Message) GetTopicName ¶
GetTopicName gets topic of msg
func (*Message) SetHeaderByKey ¶
type PublisherKafkaConfig ¶
type PublisherKafkaConfig struct { // BrokerAddrs list of broker addresses BrokerAddrs []string `json:"broker_addrs" mapstructure:"broker_addrs" yaml:"broker_addrs"` // KafkaConfig is kafka config KafkaConfig *sarama.Config }
PublisherKafkaConfig is config of Kafka producer
type TopicOption ¶
type TopicOption func(topic *Topic)
TopicOption ...
func WithPending ¶
func WithPending(pending time.Duration) TopicOption
WithPending use for retry job
Click to show internal directories.
Click to hide internal directories.