retriable

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrorWithoutRetry = errors.New("error without retry")
)
View Source
var KafkaDefaultVersion = sarama.V3_4_0_0

Functions

func NormalizeMainTopicName

func NormalizeMainTopicName(e Event) string

NormalizeMainTopicName return main topic when consume

Types

type ErrorBatchHandler

type ErrorBatchHandler struct {
	Indexes []int
}

ErrorBatchHandler throw when has some msg error Indexes is empty mean: all msg is pass

func (ErrorBatchHandler) Error

func (e ErrorBatchHandler) Error() string

type Event

type Event interface {
	// GetTopicName should return main topic name
	GetTopicName() string
	// GetPartitionValue  should return value to
	GetPartitionValue() string
}

Event abstraction for all struct

type Header struct {
	Key   []byte
	Value []byte
}

Header contain in header of message

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(msg *sarama.ConsumerMessage, marshaler marshaler.Marshaler) *Message

NewMessage create a new message.

func (*Message) GetData

func (m *Message) GetData() []byte

GetData gets data of msg

func (*Message) GetHeaderByKey

func (m *Message) GetHeaderByKey(key []byte) []byte

func (*Message) GetHeaders

func (m *Message) GetHeaders() []*Header

GetHeaders gets header of msg

func (*Message) GetRaw

func (m *Message) GetRaw() sarama.ConsumerMessage

func (*Message) GetSinceTime

func (m *Message) GetSinceTime() time.Duration

GetSinceTime gets the remain time of a message before retried.

func (*Message) GetTopicName

func (m *Message) GetTopicName() string

GetTopicName gets topic of msg

func (*Message) SetHeaderByKey

func (m *Message) SetHeaderByKey(key []byte, val []byte)

func (*Message) Unmarshal

func (m *Message) Unmarshal(evtType reflect.Type) (Event, error)

Unmarshal ...

type PublisherKafkaConfig

type PublisherKafkaConfig struct {
	// BrokerAddrs list of broker addresses
	BrokerAddrs []string `json:"broker_addrs" mapstructure:"broker_addrs" yaml:"broker_addrs"`
	// KafkaConfig is kafka config
	KafkaConfig *sarama.Config
}

PublisherKafkaConfig is config of Kafka producer

type Topic

type Topic struct {
	Name string

	Pending time.Duration

	// next topic should be push to
	Next *Topic
}

Topic action about topic

func NewTopic

func NewTopic(name string, options ...TopicOption) *Topic

NewTopic init new topic

type TopicOption

type TopicOption func(topic *Topic)

TopicOption ...

func WithPending

func WithPending(pending time.Duration) TopicOption

WithPending use for retry job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL