Documentation ¶
Index ¶
- Constants
- type Config
- type Source
- func (k *Source) Category() api.Category
- func (k *Source) Commit(events []api.Event)
- func (k *Source) Config() interface{}
- func (k *Source) Init(context api.Context) error
- func (k *Source) ProductLoop(productFunc api.ProductFunc)
- func (k *Source) Start() error
- func (k *Source) Stop()
- func (k *Source) String() string
- func (k *Source) Type() api.Type
Constants ¶
View Source
const Type = "kafka"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Brokers []string `yaml:"brokers,omitempty" validate:"required"` Topic string `yaml:"topic,omitempty" validate:"required"` GroupId string `yaml:"groupId,omitempty" default:"loggie"` QueueCapacity int `yaml:"queueCapacity" default:"100"` MinAcceptedBytes int `yaml:"minAcceptedBytes" default:"1"` MaxAcceptedBytes int `yaml:"maxAcceptedBytes" default:"1024000"` ReadMaxAttempts int `yaml:"readMaxAttempts" default:"3"` MaxReadWait time.Duration `yaml:"maxPollWait" default:"10s"` ReadBackoffMin time.Duration `yaml:"readBackoffMin" default:"100ms"` ReadBackoffMax time.Duration `yaml:"readBackoffMax" default:"1s"` EnableAutoCommit bool `yaml:"enableAutoCommit"` AutoCommitInterval time.Duration `yaml:"autoCommitInterval" default:"1s"` AutoOffsetReset string `yaml:"autoOffsetReset" default:"latest" validate:"oneof=earliest latest"` SASL kakfasink.SASL `yaml:"SASL,omitempty"` }
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
func (*Source) ProductLoop ¶
func (k *Source) ProductLoop(productFunc api.ProductFunc)
Click to show internal directories.
Click to hide internal directories.