Documentation ¶
Index ¶
- type Checkpoint
- type Checkpoints
- type KafkaHandler
- type KafkaSensor
- func (s *KafkaSensor) Action(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
- func (s *KafkaSensor) Close() error
- func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpression string, ...) (eventbuscommon.TriggerConnection, error)
- func (s *KafkaSensor) Disconnect()
- func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
- func (s *KafkaSensor) Initialize() error
- func (s *KafkaSensor) IsClosed() bool
- func (s *KafkaSensor) Listen(ctx context.Context)
- func (s *KafkaSensor) Reset() error
- func (s *KafkaSensor) Trigger(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
- type KafkaTransaction
- type KafkaTriggerConnection
- func (c *KafkaTriggerConnection) Action(events []*cloudevents.Event) func()
- func (c *KafkaTriggerConnection) Close() error
- func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bool)
- func (c *KafkaTriggerConnection) Filter(depName string, event *cloudevents.Event) bool
- func (c *KafkaTriggerConnection) IsClosed() bool
- func (c *KafkaTriggerConnection) Name() string
- func (c *KafkaTriggerConnection) Offset(partition int32, offset int64) int64
- func (c *KafkaTriggerConnection) OneAndDone() bool
- func (c *KafkaTriggerConnection) Ready() bool
- func (c *KafkaTriggerConnection) Reset()
- func (c *KafkaTriggerConnection) String() string
- func (c *KafkaTriggerConnection) Subscribe(ctx context.Context, closeCh <-chan struct{}, ...) error
- func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error)
- func (c *KafkaTriggerConnection) Update(event *cloudevents.Event, partition int32, offset int64, timestamp time.Time) ([]*cloudevents.Event, error)
- type KafkaTriggerHandler
- type Parameters
- type Topics
- type TriggerWithDepName
- type Triggers
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct { Logger *zap.SugaredLogger Init bool Offsets map[string]int64 }
func (*Checkpoint) Metadata ¶
func (c *Checkpoint) Metadata() string
func (*Checkpoint) Set ¶
func (c *Checkpoint) Set(key string, offset int64)
type Checkpoints ¶
type Checkpoints map[string]map[int32]*Checkpoint
type KafkaHandler ¶
type KafkaHandler struct { *sync.Mutex Logger *zap.SugaredLogger // kafka details GroupName string Producer sarama.AsyncProducer OffsetManager sarama.OffsetManager TriggerTopic string // handler functions // one function for each consumed topic, return messages, an // offset and an optional function that will in a transaction Handlers map[string]func(*sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func()) // cleanup function // used to clear state when consumer group is rebalanced Reset func() error // contains filtered or unexported fields }
func (*KafkaHandler) Cleanup ¶
func (h *KafkaHandler) Cleanup(session sarama.ConsumerGroupSession) error
func (*KafkaHandler) Close ¶
func (h *KafkaHandler) Close() error
func (*KafkaHandler) ConsumeClaim ¶
func (h *KafkaHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*KafkaHandler) Setup ¶
func (h *KafkaHandler) Setup(session sarama.ConsumerGroupSession) error
type KafkaSensor ¶
func NewKafkaSensor ¶
func NewKafkaSensor(kafkaConfig *eventbusv1alpha1.KafkaBus, sensor *sensorv1alpha1.Sensor, hostname string, logger *zap.SugaredLogger) *KafkaSensor
func (*KafkaSensor) Action ¶
func (s *KafkaSensor) Action(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
func (*KafkaSensor) Close ¶
func (s *KafkaSensor) Close() error
func (*KafkaSensor) Connect ¶
func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpression string, dependencies []eventbuscommon.Dependency, atLeastOnce bool) (eventbuscommon.TriggerConnection, error)
func (*KafkaSensor) Disconnect ¶
func (s *KafkaSensor) Disconnect()
func (*KafkaSensor) Event ¶
func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
func (*KafkaSensor) Initialize ¶
func (s *KafkaSensor) Initialize() error
func (*KafkaSensor) IsClosed ¶
func (s *KafkaSensor) IsClosed() bool
func (*KafkaSensor) Listen ¶
func (s *KafkaSensor) Listen(ctx context.Context)
func (*KafkaSensor) Reset ¶ added in v1.8.1
func (s *KafkaSensor) Reset() error
func (*KafkaSensor) Trigger ¶
func (s *KafkaSensor) Trigger(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func())
type KafkaTransaction ¶
type KafkaTransaction struct { Logger *zap.SugaredLogger // kafka details Producer sarama.AsyncProducer GroupName string Topic string Partition int32 // used to reset the offset and metadata if transaction fails ResetOffset int64 ResetMetadata string }
func (*KafkaTransaction) Commit ¶
func (t *KafkaTransaction) Commit(session sarama.ConsumerGroupSession, messages []*sarama.ProducerMessage, offset int64, metadata string) error
type KafkaTriggerConnection ¶
type KafkaTriggerConnection struct { *base.KafkaConnection KafkaTriggerHandler // contains filtered or unexported fields }
func (*KafkaTriggerConnection) Action ¶
func (c *KafkaTriggerConnection) Action(events []*cloudevents.Event) func()
func (*KafkaTriggerConnection) Close ¶
func (c *KafkaTriggerConnection) Close() error
func (*KafkaTriggerConnection) DependsOn ¶
func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bool)
func (*KafkaTriggerConnection) Filter ¶
func (c *KafkaTriggerConnection) Filter(depName string, event *cloudevents.Event) bool
func (*KafkaTriggerConnection) IsClosed ¶
func (c *KafkaTriggerConnection) IsClosed() bool
func (*KafkaTriggerConnection) Name ¶
func (c *KafkaTriggerConnection) Name() string
func (*KafkaTriggerConnection) Offset ¶
func (c *KafkaTriggerConnection) Offset(partition int32, offset int64) int64
func (*KafkaTriggerConnection) OneAndDone ¶
func (c *KafkaTriggerConnection) OneAndDone() bool
func (*KafkaTriggerConnection) Ready ¶
func (c *KafkaTriggerConnection) Ready() bool
func (*KafkaTriggerConnection) Reset ¶ added in v1.8.1
func (c *KafkaTriggerConnection) Reset()
func (*KafkaTriggerConnection) String ¶
func (c *KafkaTriggerConnection) String() string
func (*KafkaTriggerConnection) Subscribe ¶
func (c *KafkaTriggerConnection) Subscribe( ctx context.Context, closeCh <-chan struct{}, resetConditionsCh <-chan struct{}, lastResetTime time.Time, transform func(depName string, event cloudevents.Event) (*cloudevents.Event, error), filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event), topic *string) error
func (*KafkaTriggerConnection) Transform ¶
func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error)
func (*KafkaTriggerConnection) Update ¶
func (c *KafkaTriggerConnection) Update(event *cloudevents.Event, partition int32, offset int64, timestamp time.Time) ([]*cloudevents.Event, error)
type KafkaTriggerHandler ¶
type KafkaTriggerHandler interface { common.TriggerConnection Name() string Ready() bool Reset() OneAndDone() bool DependsOn(*cloudevents.Event) (string, bool) Transform(string, *cloudevents.Event) (*cloudevents.Event, error) Filter(string, *cloudevents.Event) bool Update(event *cloudevents.Event, partition int32, offset int64, timestamp time.Time) ([]*cloudevents.Event, error) Offset(int32, int64) int64 Action([]*cloudevents.Event) func() }
type Parameters ¶
func (Parameters) Get ¶
func (p Parameters) Get(name string) (interface{}, error)
type TriggerWithDepName ¶
type TriggerWithDepName struct { KafkaTriggerHandler // contains filtered or unexported fields }
type Triggers ¶
type Triggers map[string]KafkaTriggerHandler
func (Triggers) List ¶
func (t Triggers) List(event *cloudevents.Event) []*TriggerWithDepName
Click to show internal directories.
Click to hide internal directories.