Documentation ¶
Index ¶
- func NewBundleMetadata(partition int32, offset kafka.Offset) *bundleMetadata
- func NewCommitter(committerInterval time.Duration, topic string, client *kafka.Consumer, ...) *committer
- type Consumer
- type GenericConsumer
- type KafkaConsumer
- func (c *KafkaConsumer) BundleRegister(registration *registration.BundleRegistration)
- func (c *KafkaConsumer) Commit(msg *kafka.Message) error
- func (c *KafkaConsumer) Consumer() *kafka.Consumer
- func (c *KafkaConsumer) CustomBundleRegister(msgID string, customBundleRegistration *registration.CustomBundleRegistration)
- func (c *KafkaConsumer) GetGenericBundleChan() chan *bundle.GenericBundle
- func (c *KafkaConsumer) GetMessageChan() chan *kafka.Message
- func (c *KafkaConsumer) SetCommitter(committer *committer)
- func (c *KafkaConsumer) SetConflationManager(conflationMgr *conflator.ConflationManager)
- func (c *KafkaConsumer) SetLeafHubName(leafHubName string)
- func (c *KafkaConsumer) SetStatistics(statistics *statistics.Statistics)
- func (c *KafkaConsumer) Start(ctx context.Context) error
- func (c *KafkaConsumer) Subscribe(topic string) error
- func (c *KafkaConsumer) SyncCustomBundle(customBundleRegistration *registration.CustomBundleRegistration, ...) error
- type KafkaConsumerConfig
- type SaramaConsumer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewBundleMetadata ¶
NewBundleMetadata returns a new instance of BundleMetadata.
Types ¶
type Consumer ¶
type Consumer interface { // Start starts the transport. Start(ctx context.Context) error // CustomBundleRegister registers a bundle ID to a CustomBundleRegistration. None-registered bundles are assumed to be // of type GenericBundle, and are handled by the GenericBundleSyncer. CustomBundleRegister(msgID string, customBundleRegistration *registration.CustomBundleRegistration) // BundleRegister function registers a msgID to the bundle updates channel. BundleRegister(registration *registration.BundleRegistration) // provide the generic bundle for message producer GetGenericBundleChan() chan *bundle.GenericBundle }
Transport is an interface for transport layer.
type GenericConsumer ¶
type GenericConsumer struct {
// contains filtered or unexported fields
}
func NewGenericConsumer ¶
func NewGenericConsumer(transportConfig *transport.TransportConfig) (*GenericConsumer, error)
func (*GenericConsumer) MessageChan ¶
func (c *GenericConsumer) MessageChan() chan *transport.Message
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
Consumer abstracts hub-of-hubs/pkg/kafka kafka-consumer's generic usage.
func NewKafkaConsumer ¶
func NewKafkaConsumer(kafkaConfig *transport.KafkaConfig, log logr.Logger, ) (*KafkaConsumer, error)
NewConsumer creates a new instance of Consumer.
func (*KafkaConsumer) BundleRegister ¶
func (c *KafkaConsumer) BundleRegister(registration *registration.BundleRegistration)
Register function registers a msgID to the bundle updates channel.
func (*KafkaConsumer) Commit ¶
func (c *KafkaConsumer) Commit(msg *kafka.Message) error
Commit commits a kafka message.
func (*KafkaConsumer) Consumer ¶
func (c *KafkaConsumer) Consumer() *kafka.Consumer
Consumer returns the wrapped Confluent KafkaConsumer.
func (*KafkaConsumer) CustomBundleRegister ¶
func (c *KafkaConsumer) CustomBundleRegister(msgID string, customBundleRegistration *registration.CustomBundleRegistration, )
Register function registers a bundle ID to a CustomBundleRegistration.
func (*KafkaConsumer) GetGenericBundleChan ¶
func (c *KafkaConsumer) GetGenericBundleChan() chan *bundle.GenericBundle
func (*KafkaConsumer) GetMessageChan ¶
func (c *KafkaConsumer) GetMessageChan() chan *kafka.Message
func (*KafkaConsumer) SetCommitter ¶
func (c *KafkaConsumer) SetCommitter(committer *committer)
func (*KafkaConsumer) SetConflationManager ¶
func (c *KafkaConsumer) SetConflationManager(conflationMgr *conflator.ConflationManager)
func (*KafkaConsumer) SetLeafHubName ¶
func (c *KafkaConsumer) SetLeafHubName(leafHubName string)
func (*KafkaConsumer) SetStatistics ¶
func (c *KafkaConsumer) SetStatistics(statistics *statistics.Statistics)
func (*KafkaConsumer) Start ¶
func (c *KafkaConsumer) Start(ctx context.Context) error
Start function starts the consumer.
func (*KafkaConsumer) Subscribe ¶
func (c *KafkaConsumer) Subscribe(topic string) error
Subscribe subscribes consumer to the given topic.
func (*KafkaConsumer) SyncCustomBundle ¶
func (c *KafkaConsumer) SyncCustomBundle(customBundleRegistration *registration.CustomBundleRegistration, payload []byte, ) error
SyncCustomBundle writes a custom bundle to its respective syncer channel.
type KafkaConsumerConfig ¶
type SaramaConsumer ¶
type SaramaConsumer interface { Start(ctx context.Context) error MessageChan() chan *sarama.ConsumerMessage MarkOffset(topic string, partition int32, offset int64) }
func NewSaramaConsumer ¶
func NewSaramaConsumer(ctx context.Context, kafkaConfig *transport.KafkaConfig) (SaramaConsumer, error)
Click to show internal directories.
Click to hide internal directories.