Documentation ¶
Index ¶
Constants ¶
View Source
const ( DefaultTopicIn string = "queue.in" DefaultTopicOut string = "queue.out" DefaultQueueSize int = 10000 DefaultCommitRate time.Duration = time.Minute DefaultRebalanceStrategy string = "roundrobin" )
View Source
const ( EnvNameKafkaBrokers string = "KAFKA_BROKERS" EnvNameKafkaClientId string = "KAFKA_CLIENT_ID" EnvNameKafkaGroupId string = "KAFKA_GROUP_ID" EnvNameKafkaEnableLog string = "KAFKA_ENABLE_LOG" EnvNameKafkaQueueSize string = "KAFKA_QUEUE_SIZE" EnvNameKafkaTopicIn string = "KAFKA_TOPIC_IN" EnvNameKafkaTopicOut string = "KAFKA_TOPIC_OUT" EnvNameCommitRate string = "KAFKA_COMMIT_RATE" EnvNameRebalanceStrategy string = "KAFKA_REBALANCE_STRATEGY" )
View Source
const ( UnsupportedTypef string = "unsupported type: %T" NoBrokersConfigured string = "no brokers configured" NoClientIdConfigured string = "no client id configured" NoGroupIdConfigured string = "no group id configured" CommitRateLessThanOrEqualZero string = "commit rate less than or equal to zero" )
Variables ¶
View Source
var ( ErrNoBrokersConfigured = errors.New(NoBrokersConfigured) ErrNoClientIdConfigured = errors.New(NoClientIdConfigured) ErrNoGroupIdConfigured = errors.New(NoGroupIdConfigured) ErrCommitRateLessThanOrEqualZero = errors.New(CommitRateLessThanOrEqualZero) )
View Source
var DefaultBrokers = []string{"localhost:9092"}
Functions ¶
Types ¶
type Configuration ¶
type Configuration struct { Brokers []string `json:"brokers"` ClientId string `json:"client_id"` GroupId string `json:"group_id"` EnableLog bool `json:"enable_log"` QueueSize int `json:"queue_size"` TopicIn string `json:"topic_in"` TopicOut string `json:"topic_out"` CommitRate time.Duration `json:"commit_rate"` RebalanceStrategy string `json:"rebalance_strategy"` }
func (*Configuration) Default ¶
func (c *Configuration) Default()
func (*Configuration) FromEnv ¶
func (c *Configuration) FromEnv(envs map[string]string)
func (*Configuration) Validate ¶
func (c *Configuration) Validate() error
type ErrorHandlerFx ¶
type ErrorHandlerFx func(error)
type Owner ¶
type Owner interface { Initialize(config *Configuration) (err error) Shutdown() }
type Wrapper ¶
func WrapperConvertMultiple ¶
func WrapperConvertMultiple(items []interface{}) []*Wrapper
func WrapperConvertSingle ¶
func WrapperConvertSingle(item interface{}) *Wrapper
func (*Wrapper) FromWrapper ¶
func (*Wrapper) MarshalBinary ¶
func (*Wrapper) UnmarshalBinary ¶
Click to show internal directories.
Click to hide internal directories.