Documentation ¶
Overview ¶
nolint:revive
FIXME: golangci-lint nolint:revive
FIXME: golangci-lint nolint:errcheck,revive Package kafkacommon contains all common kafka functions
FIXME: golangci-lint nolint:revive
FIXME: golangci-lint nolint:revive
FIXME: golangci-lint nolint:revive
Index ¶
- Constants
- func CreateEdgeEvent(orgID string, source string, reqID string, eventType string, subject string, ...) models.CRCCloudEvent
- type Consumer
- type ConsumerService
- type ConsumerServiceInterface
- type KafkaConfigMapService
- type KafkaConfigMapServiceInterface
- type Producer
- type ProducerService
- type ProducerServiceInterface
- type TopicNotFoundError
- type TopicService
- type TopicServiceInterface
Constants ¶
const ( // RecordKeyCreateCommit kafka record key const for creating a commit RecordKeyCreateCommit string = "create_commit" // RecordKeyCreateImage kafka record key const for creating an image (wrapper for commit and installer) RecordKeyCreateImage string = "create_image" // RecordKeyCreateImageUpdate kafka record key const for creating an update RecordKeyCreateImageUpdate string = "create_image_update" // RecordKeyCreateInstaller kafka record key const for creating an installer RecordKeyCreateInstaller string = "create_installer" // RecordKeyCreateKickstart kafka record key const for creating and injecting a kickstart file RecordKeyCreateKickstart string = "create_kickstart" )
const ( // TopicFleetmgmtImageBuild topic name TopicFleetmgmtImageBuild string = "platform.edge.fleetmgmt.image-build" // TopicFleetmgmtImageISOBuild topic name TopicFleetmgmtImageISOBuild string = "platform.edge.fleetmgmt.image-iso-build" // TopicPlaybookDispatcherRuns external topic for playbook dispatcher results TopicPlaybookDispatcherRuns string = "platform.playbook-dispatcher.runs" // TopicInventoryEvents external topic for hosted inventory events TopicInventoryEvents string = "platform.inventory.events" // TopicFleetmgmtUpdateRepoRequested topic name for update repo requested event TopicFleetmgmtUpdateRepoRequested string = "platform.edge.fleetmgmt.update-repo-requested" // TopicFleetmgmtUpdateWriteTemplateRequested topic name for write template playbook event TopicFleetmgmtUpdateWriteTemplateRequested string = "platform.edge.fleetmgmt.update-write-template-requested" )
Variables ¶
This section is empty.
Functions ¶
func CreateEdgeEvent ¶
func CreateEdgeEvent(orgID string, source string, reqID string, eventType string, subject string, payload interface{}) models.CRCCloudEvent
CreateEdgeEvent creates an event with standard CRC fields and edge payload
Types ¶
type Consumer ¶
type Consumer interface { Assign(partitions []kafka.TopicPartition) (err error) Assignment() (partitions []kafka.TopicPartition, err error) AssignmentLost() bool Close() (err error) Commit() ([]kafka.TopicPartition, error) CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) Events() chan kafka.Event GetConsumerGroupMetadata() (*kafka.ConsumerGroupMetadata, error) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) GetRebalanceProtocol() string GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) IncrementalAssign(partitions []kafka.TopicPartition) (err error) IncrementalUnassign(partitions []kafka.TopicPartition) (err error) OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) Pause(partitions []kafka.TopicPartition) (err error) Poll(timeoutMs int) (event kafka.Event) Position(partitions []kafka.TopicPartition) (offsets []kafka.TopicPartition, err error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) ReadMessage(timeout time.Duration) (*kafka.Message, error) Resume(partitions []kafka.TopicPartition) (err error) Seek(partition kafka.TopicPartition, timeoutMs int) error SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error StoreMessage(m *kafka.Message) (storedOffsets []kafka.TopicPartition, err error) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error) String() string Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) Subscription() (topics []string, err error) Unsubscribe() (err error) Unassign() (err error) }
Consumer is an interface for a kafka consumer, it matches the confluent consumer type definition
type ConsumerService ¶
type ConsumerService struct { Topic TopicServiceInterface KafkaConfigMap KafkaConfigMapServiceInterface // contains filtered or unexported fields }
ConsumerService is the consumer service for edge
func (*ConsumerService) GetConsumer ¶
func (s *ConsumerService) GetConsumer(groupID string) (Consumer, error)
GetConsumer returns a kafka Consumer
type ConsumerServiceInterface ¶
ConsumerServiceInterface is the interface that defines the consumer service
func NewConsumerService ¶
func NewConsumerService(ctx context.Context, log log.FieldLogger) ConsumerServiceInterface
NewConsumerService returns a new service
type KafkaConfigMapService ¶
type KafkaConfigMapService struct { }
KafkaConfigMapService is the config map service
func (*KafkaConfigMapService) GetKafkaConsumerConfigMap ¶
func (k *KafkaConfigMapService) GetKafkaConsumerConfigMap(consumerGroup string) kafka.ConfigMap
GetKafkaConsumerConfigMap returns the correct kafka auth based on the environment and given config
func (*KafkaConfigMapService) GetKafkaProducerConfigMap ¶
func (k *KafkaConfigMapService) GetKafkaProducerConfigMap() kafka.ConfigMap
GetKafkaProducerConfigMap returns the correct kafka auth based on the environment and given config
type KafkaConfigMapServiceInterface ¶
type KafkaConfigMapServiceInterface interface { GetKafkaProducerConfigMap() kafka.ConfigMap GetKafkaConsumerConfigMap(consumerGroup string) kafka.ConfigMap }
KafkaConfigMapServiceInterface is the interface that defines the config map service
func NewKafkaConfigMapService ¶
func NewKafkaConfigMapService() KafkaConfigMapServiceInterface
NewKafkaConfigMapService returns a new service
type Producer ¶
type Producer interface { Close() Events() chan kafka.Event Flush(timeoutMs int) int GetFatalError() error GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) Len() int OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error ProduceChannel() chan *kafka.Message QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error SetOAuthBearerTokenFailure(errstr string) error String() string TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode }
Producer is an interface for a kafka producer, it matches the confluent producer type definition
type ProducerService ¶
type ProducerService struct { Topic TopicServiceInterface KafkaConfigMap KafkaConfigMapServiceInterface }
ProducerService is the producer service for edge
func (*ProducerService) GetProducerInstance ¶
func (p *ProducerService) GetProducerInstance() Producer
GetProducerInstance returns a kafka producer instance
func (*ProducerService) ProduceEvent ¶
func (p *ProducerService) ProduceEvent(requestedTopic, recordKey string, event models.CRCCloudEvent) error
ProduceEvent is a helper for the kafka producer
func (*ProducerService) UnsetProducer ¶
func (p *ProducerService) UnsetProducer()
UnsetProducer sets the producer singleton to nil
type ProducerServiceInterface ¶
type ProducerServiceInterface interface { GetProducerInstance() Producer ProduceEvent(requestedTopic, recordKey string, event models.CRCCloudEvent) error }
ProducerServiceInterface is an interface that defines the producer service
func NewProducerService ¶
func NewProducerService() ProducerServiceInterface
NewProducerService returns a new service
type TopicNotFoundError ¶
type TopicNotFoundError struct{}
TopicNotFoundError indicates the account was nil
func (*TopicNotFoundError) Error ¶
func (e *TopicNotFoundError) Error() string
type TopicServiceInterface ¶
TopicServiceInterface is the interface for the service
func NewTopicService ¶
func NewTopicService() TopicServiceInterface
NewTopicService returns a new service
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mock_kafkacommon is a generated GoMock package.
|
Package mock_kafkacommon is a generated GoMock package. |