Documentation
¶
Index ¶
- func InitializeConsumer()
- func InitializePublisher()
- func NewDLQReader(hosts, topic, groupID string) *kafka.Reader
- func NewReader(hosts, topic, groupID string) *kafka.Reader
- func NewWriter(hosts, topic string) *kafka.Writer
- type KafkaConfig
- func (config *KafkaConfig) CreateTopic(topic string, num ...int)
- func (config *KafkaConfig) DelTopic(topic ...string)
- func (config *KafkaConfig) GetTopic() []string
- func (config *KafkaConfig) ReadMessages(topic, groupid string)
- func (config *KafkaConfig) WriteMessages(topic string, value ...string)
- func (config *KafkaConfig) WriteMessagesKeyValue(topic string, value map[string]string)
- func (config *KafkaConfig) WriteMessagesKeyValueList(topic string, value []WriteData) error
- type WorkerCircuitBreaker
- type WriteData
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitializeConsumer ¶
func InitializeConsumer()
func InitializePublisher ¶
func InitializePublisher()
func NewDLQReader ¶ added in v0.0.43
func NewDLQReader(hosts, topic, groupID string) *kafka.Reader
NewDLQReader - 建立NewDLQReader hosts - kafka host list, 可多組, 用逗號分開. e.g.:100.0.0.1:9092,100.0.0.2:9092
Types ¶
type KafkaConfig ¶
type KafkaConfig struct { Network string `yaml:"network"` Address []string `yaml:"adress"` NumPartition int `yaml:"numPartition"` ReplicationFactor int `yaml:"replicationFactor"` Conn *kafka.Conn }
KafkaConfig - Represents a Configuration
func (*KafkaConfig) CreateTopic ¶
func (config *KafkaConfig) CreateTopic(topic string, num ...int)
CreateTopic -建立topic 1.topic 2.NumPartition 3.ReplicationFactor
func (*KafkaConfig) DelTopic ¶
func (config *KafkaConfig) DelTopic(topic ...string)
DelTopic - 刪除Topic的列表
func (*KafkaConfig) ReadMessages ¶
func (config *KafkaConfig) ReadMessages(topic, groupid string)
ReadMessages - 接收Topic的訊息
func (*KafkaConfig) WriteMessages ¶
func (config *KafkaConfig) WriteMessages(topic string, value ...string)
WriteMessages - 發送訊息到Topic
func (*KafkaConfig) WriteMessagesKeyValue ¶
func (config *KafkaConfig) WriteMessagesKeyValue(topic string, value map[string]string)
WriteMessagesKeyValue - 發送訊息到Topic
func (*KafkaConfig) WriteMessagesKeyValueList ¶ added in v0.0.6
func (config *KafkaConfig) WriteMessagesKeyValueList(topic string, value []WriteData) error
WriteMessagesKeyValueList - 發送訊息到Topic
type WorkerCircuitBreaker ¶ added in v0.0.48
type WorkerCircuitBreaker struct {
// contains filtered or unexported fields
}
func NewWorkerCircuitBreaker ¶ added in v0.0.48
func NewWorkerCircuitBreaker(bucketVolume, consumeTokenPerRequest, maxDataSize int) *WorkerCircuitBreaker
NewWorkerCircuitBreaker bucketVolume: 令牌桶容量 consumeTokenPerRequest: 每次消耗令牌量 maxDataSize: 滿足此條件表示要消耗令牌 e.g. bucketVolume = 10, consumeTokenPerRequest = 3, maxDataSize = 100 表示每秒產生1個token, 但每次消耗3個token, 當資料筆數大於等於100時, 消耗三個token 當桶子內的token 不夠時, 會等待下一個token產生 用這方式降低資料庫寫入壓力
func (*WorkerCircuitBreaker) Check ¶ added in v0.0.48
func (r *WorkerCircuitBreaker) Check(ctx context.Context, dataCount int)
func (*WorkerCircuitBreaker) CheckContinuouslyWriting ¶ added in v0.0.49
func (r *WorkerCircuitBreaker) CheckContinuouslyWriting(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.