Documentation ¶
Index ¶
- type KafkaSource
- func (r *KafkaSource) Ack(_ context.Context, offsets []isb.Offset) []error
- func (r *KafkaSource) Close() error
- func (r *KafkaSource) ForceStop()
- func (r *KafkaSource) GetName() string
- func (r *KafkaSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
- func (r *KafkaSource) Start() <-chan struct{}
- func (r *KafkaSource) Stop()
- type Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaSource ¶
type KafkaSource struct {
// contains filtered or unexported fields
}
func NewKafkaSource ¶
func NewKafkaSource(vertex *dfv1.Vertex, writers []isb.BufferWriter, opts ...Option) (*KafkaSource, error)
NewKafkaSource returns a KafkaSource reader based on Kafka Consumer Group .
func (*KafkaSource) Close ¶
func (r *KafkaSource) Close() error
func (*KafkaSource) ForceStop ¶
func (r *KafkaSource) ForceStop()
func (*KafkaSource) GetName ¶
func (r *KafkaSource) GetName() string
func (*KafkaSource) Read ¶
func (r *KafkaSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error)
Read reads a chunk of messages and returns at the first occurrence of an error. Error does not indicate that the array of result is empty, the callee should process all the elements in the array even if the error is set. Read will not mark the message in the buffer as "READ" if the read for that index is erring. There is a chance that we have read the message and the container got forcefully terminated before processing. To provide at-least-once semantics for reading, during restart we will have to reprocess all unacknowledged messages.
func (*KafkaSource) Start ¶
func (r *KafkaSource) Start() <-chan struct{}
func (*KafkaSource) Stop ¶
func (r *KafkaSource) Stop()
type Option ¶
type Option func(*KafkaSource) error
func WithBufferSize ¶
WithBufferSize is used to return size of message channel information
func WithGroupName ¶
WithGroupName is used to set the group name
func WithLogger ¶
func WithLogger(l *zap.SugaredLogger) Option
WithLogger is used to return logger information
func WithReadTimeOut ¶
WithReadTimeOut is used to set the read timeout for the from buffer