Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewKafkaSource ¶
func NewKafkaSource(ctx context.Context, vertexInstance *dfv1.VertexInstance, handler *ConsumerHandler, opts ...Option) (sourcer.SourceReader, error)
NewKafkaSource returns a kafkaSource reader based on Kafka Consumer Group.
Types ¶
type ConsumerHandler ¶ added in v1.3.0
type ConsumerHandler struct {
// contains filtered or unexported fields
}
ConsumerHandler struct
func NewConsumerHandler ¶ added in v1.3.0
func NewConsumerHandler(readChanSize int) *ConsumerHandler
NewConsumerHandler creates new handler and initializes the channel for passing messages
func (*ConsumerHandler) Cleanup ¶ added in v1.3.0
func (consumer *ConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerHandler) ConsumeClaim ¶ added in v1.3.0
func (consumer *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerHandler) Setup ¶ added in v1.3.0
func (consumer *ConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Option ¶
type Option func(*kafkaSource) error
func WithBufferSize ¶
WithBufferSize is used to return size of message channel information
func WithGroupName ¶
WithGroupName is used to set the group name
func WithReadTimeOut ¶
WithReadTimeOut is used to set the read timeout for the from buffer
Click to show internal directories.
Click to hide internal directories.