kafka

package
v1.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	Logger  *zap.SugaredLogger
	Init    bool
	Offsets map[string]int64
}

func (*Checkpoint) Metadata

func (c *Checkpoint) Metadata() string

func (*Checkpoint) Set

func (c *Checkpoint) Set(key string, offset int64)

func (*Checkpoint) Skip

func (c *Checkpoint) Skip(key string, offset int64) bool

type Checkpoints

type Checkpoints map[string]map[int32]*Checkpoint

type KafkaHandler

type KafkaHandler struct {
	*sync.Mutex
	Logger *zap.SugaredLogger

	// kafka details
	GroupName     string
	Producer      sarama.AsyncProducer
	OffsetManager sarama.OffsetManager
	TriggerTopic  string

	// handler functions
	// one function for each consumed topic, return messages, an
	// offset and an optional function that will in a transaction
	Handlers map[string]func(*sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())

	// cleanup function
	// used to clear state when consumer group is rebalanced
	Reset func() error
	// contains filtered or unexported fields
}

func (*KafkaHandler) Cleanup

func (h *KafkaHandler) Cleanup(session sarama.ConsumerGroupSession) error

func (*KafkaHandler) Close

func (h *KafkaHandler) Close() error

func (*KafkaHandler) ConsumeClaim

func (h *KafkaHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*KafkaHandler) Setup

func (h *KafkaHandler) Setup(session sarama.ConsumerGroupSession) error

type KafkaSensor

type KafkaSensor struct {
	*base.Kafka
	*sync.Mutex
	// contains filtered or unexported fields
}

func NewKafkaSensor

func NewKafkaSensor(kafkaConfig *eventbusv1alpha1.KafkaBus, sensor *sensorv1alpha1.Sensor, hostname string, logger *zap.SugaredLogger) *KafkaSensor

func (*KafkaSensor) Action

func (s *KafkaSensor) Action(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())

func (*KafkaSensor) Close

func (s *KafkaSensor) Close() error

func (*KafkaSensor) Connect

func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpression string, dependencies []eventbuscommon.Dependency, atLeastOnce bool) (eventbuscommon.TriggerConnection, error)

func (*KafkaSensor) Disconnect

func (s *KafkaSensor) Disconnect()

func (*KafkaSensor) Event

func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())

func (*KafkaSensor) Initialize

func (s *KafkaSensor) Initialize() error

func (*KafkaSensor) IsClosed

func (s *KafkaSensor) IsClosed() bool

func (*KafkaSensor) Listen

func (s *KafkaSensor) Listen(ctx context.Context)

func (*KafkaSensor) Reset added in v1.8.1

func (s *KafkaSensor) Reset() error

func (*KafkaSensor) Trigger

func (s *KafkaSensor) Trigger(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())

type KafkaTransaction

type KafkaTransaction struct {
	Logger *zap.SugaredLogger

	// kafka details
	Producer  sarama.AsyncProducer
	GroupName string
	Topic     string
	Partition int32

	// used to reset the offset and metadata if transaction fails
	ResetOffset   int64
	ResetMetadata string
}

func (*KafkaTransaction) Commit

func (t *KafkaTransaction) Commit(session sarama.ConsumerGroupSession, messages []*sarama.ProducerMessage, offset int64, metadata string) error

type KafkaTriggerConnection

type KafkaTriggerConnection struct {
	*base.KafkaConnection
	KafkaTriggerHandler
	// contains filtered or unexported fields
}

func (*KafkaTriggerConnection) Action

func (c *KafkaTriggerConnection) Action(events []*cloudevents.Event) func()

func (*KafkaTriggerConnection) Close

func (c *KafkaTriggerConnection) Close() error

func (*KafkaTriggerConnection) DependsOn

func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bool)

func (*KafkaTriggerConnection) Filter

func (c *KafkaTriggerConnection) Filter(depName string, event *cloudevents.Event) bool

func (*KafkaTriggerConnection) IsClosed

func (c *KafkaTriggerConnection) IsClosed() bool

func (*KafkaTriggerConnection) Name

func (c *KafkaTriggerConnection) Name() string

func (*KafkaTriggerConnection) Offset

func (c *KafkaTriggerConnection) Offset(partition int32, offset int64) int64

func (*KafkaTriggerConnection) OneAndDone

func (c *KafkaTriggerConnection) OneAndDone() bool

func (*KafkaTriggerConnection) Ready

func (c *KafkaTriggerConnection) Ready() bool

func (*KafkaTriggerConnection) Reset added in v1.8.1

func (c *KafkaTriggerConnection) Reset()

func (*KafkaTriggerConnection) String

func (c *KafkaTriggerConnection) String() string

func (*KafkaTriggerConnection) Subscribe

func (c *KafkaTriggerConnection) Subscribe(
	ctx context.Context,
	closeCh <-chan struct{},
	resetConditionsCh <-chan struct{},
	lastResetTime time.Time,
	transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error),
	filter func(string, cloudevents.Event) bool,
	action func(map[string]cloudevents.Event),
	topic *string) error

func (*KafkaTriggerConnection) Transform

func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error)

func (*KafkaTriggerConnection) Update

func (c *KafkaTriggerConnection) Update(event *cloudevents.Event, partition int32, offset int64, timestamp time.Time) ([]*cloudevents.Event, error)

type KafkaTriggerHandler

type KafkaTriggerHandler interface {
	common.TriggerConnection
	Name() string
	Ready() bool
	Reset()
	OneAndDone() bool
	DependsOn(*cloudevents.Event) (string, bool)
	Transform(string, *cloudevents.Event) (*cloudevents.Event, error)
	Filter(string, *cloudevents.Event) bool
	Update(event *cloudevents.Event, partition int32, offset int64, timestamp time.Time) ([]*cloudevents.Event, error)
	Offset(int32, int64) int64
	Action([]*cloudevents.Event) func()
}

type Parameters

type Parameters map[string]bool

func (Parameters) Get

func (p Parameters) Get(name string) (interface{}, error)

type Topics

type Topics struct {
	// contains filtered or unexported fields
}

func (*Topics) List

func (t *Topics) List() []string

type TriggerWithDepName

type TriggerWithDepName struct {
	KafkaTriggerHandler
	// contains filtered or unexported fields
}

type Triggers

type Triggers map[string]KafkaTriggerHandler

func (Triggers) List

func (t Triggers) List(event *cloudevents.Event) []*TriggerWithDepName

func (Triggers) Ready

func (t Triggers) Ready() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL