kafkacommon

package
v0.0.0-...-c33b5b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

nolint:revive

FIXME: golangci-lint nolint:revive

FIXME: golangci-lint nolint:errcheck,revive Package kafkacommon contains all common kafka functions

FIXME: golangci-lint nolint:revive

FIXME: golangci-lint nolint:revive

FIXME: golangci-lint nolint:revive

Index

Constants

View Source
const (
	// RecordKeyCreateCommit kafka record key const for creating a commit
	RecordKeyCreateCommit string = "create_commit"
	// RecordKeyCreateImage kafka record key const for creating an image (wrapper for commit and installer)
	RecordKeyCreateImage string = "create_image"
	// RecordKeyCreateImageUpdate kafka record key const for creating an update
	RecordKeyCreateImageUpdate string = "create_image_update"
	// RecordKeyCreateInstaller kafka record key const for creating an installer
	RecordKeyCreateInstaller string = "create_installer"
	// RecordKeyCreateKickstart kafka record key const for creating and injecting a kickstart file
	RecordKeyCreateKickstart string = "create_kickstart"
)
View Source
const (
	// TopicFleetmgmtImageBuild topic name
	TopicFleetmgmtImageBuild string = "platform.edge.fleetmgmt.image-build"
	// TopicFleetmgmtImageISOBuild topic name
	TopicFleetmgmtImageISOBuild string = "platform.edge.fleetmgmt.image-iso-build"

	// TopicPlaybookDispatcherRuns external topic for playbook dispatcher results
	TopicPlaybookDispatcherRuns string = "platform.playbook-dispatcher.runs"
	// TopicInventoryEvents external topic for hosted inventory events
	TopicInventoryEvents string = "platform.inventory.events"
	// TopicFleetmgmtUpdateRepoRequested topic name for update repo requested event
	TopicFleetmgmtUpdateRepoRequested string = "platform.edge.fleetmgmt.update-repo-requested"
	// TopicFleetmgmtUpdateWriteTemplateRequested topic name for write template playbook event
	TopicFleetmgmtUpdateWriteTemplateRequested string = "platform.edge.fleetmgmt.update-write-template-requested"
)

Variables

This section is empty.

Functions

func CreateEdgeEvent

func CreateEdgeEvent(orgID string, source string, reqID string,
	eventType string, subject string, payload interface{}) models.CRCCloudEvent

CreateEdgeEvent creates an event with standard CRC fields and edge payload

Types

type Consumer

type Consumer interface {
	Assign(partitions []kafka.TopicPartition) (err error)
	Assignment() (partitions []kafka.TopicPartition, err error)
	AssignmentLost() bool
	Close() (err error)
	Commit() ([]kafka.TopicPartition, error)
	CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
	CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
	Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
	Events() chan kafka.Event
	GetConsumerGroupMetadata() (*kafka.ConsumerGroupMetadata, error)
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
	GetRebalanceProtocol() string
	GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
	IncrementalAssign(partitions []kafka.TopicPartition) (err error)
	IncrementalUnassign(partitions []kafka.TopicPartition) (err error)
	OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
	Pause(partitions []kafka.TopicPartition) (err error)
	Poll(timeoutMs int) (event kafka.Event)
	Position(partitions []kafka.TopicPartition) (offsets []kafka.TopicPartition, err error)
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
	ReadMessage(timeout time.Duration) (*kafka.Message, error)
	Resume(partitions []kafka.TopicPartition) (err error)
	Seek(partition kafka.TopicPartition, timeoutMs int) error
	SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
	SetOAuthBearerTokenFailure(errstr string) error
	StoreMessage(m *kafka.Message) (storedOffsets []kafka.TopicPartition, err error)
	StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
	String() string
	Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
	Subscription() (topics []string, err error)
	Unsubscribe() (err error)
	Unassign() (err error)
}

Consumer is an interface for a kafka consumer, it matches the confluent consumer type definition

type ConsumerService

type ConsumerService struct {
	Topic          TopicServiceInterface
	KafkaConfigMap KafkaConfigMapServiceInterface
	// contains filtered or unexported fields
}

ConsumerService is the consumer service for edge

func (*ConsumerService) GetConsumer

func (s *ConsumerService) GetConsumer(groupID string) (Consumer, error)

GetConsumer returns a kafka Consumer

type ConsumerServiceInterface

type ConsumerServiceInterface interface {
	GetConsumer(groupID string) (Consumer, error)
}

ConsumerServiceInterface is the interface that defines the consumer service

func NewConsumerService

func NewConsumerService(ctx context.Context, log log.FieldLogger) ConsumerServiceInterface

NewConsumerService returns a new service

type KafkaConfigMapService

type KafkaConfigMapService struct {
}

KafkaConfigMapService is the config map service

func (*KafkaConfigMapService) GetKafkaConsumerConfigMap

func (k *KafkaConfigMapService) GetKafkaConsumerConfigMap(consumerGroup string) kafka.ConfigMap

GetKafkaConsumerConfigMap returns the correct kafka auth based on the environment and given config

func (*KafkaConfigMapService) GetKafkaProducerConfigMap

func (k *KafkaConfigMapService) GetKafkaProducerConfigMap() kafka.ConfigMap

GetKafkaProducerConfigMap returns the correct kafka auth based on the environment and given config

type KafkaConfigMapServiceInterface

type KafkaConfigMapServiceInterface interface {
	GetKafkaProducerConfigMap() kafka.ConfigMap
	GetKafkaConsumerConfigMap(consumerGroup string) kafka.ConfigMap
}

KafkaConfigMapServiceInterface is the interface that defines the config map service

func NewKafkaConfigMapService

func NewKafkaConfigMapService() KafkaConfigMapServiceInterface

NewKafkaConfigMapService returns a new service

type Producer

type Producer interface {
	Close()
	Events() chan kafka.Event
	Flush(timeoutMs int) int
	GetFatalError() error
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
	Len() int
	OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
	ProduceChannel() chan *kafka.Message
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
	SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
	SetOAuthBearerTokenFailure(errstr string) error
	String() string
	TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
}

Producer is an interface for a kafka producer, it matches the confluent producer type definition

type ProducerService

type ProducerService struct {
	Topic          TopicServiceInterface
	KafkaConfigMap KafkaConfigMapServiceInterface
}

ProducerService is the producer service for edge

func (*ProducerService) GetProducerInstance

func (p *ProducerService) GetProducerInstance() Producer

GetProducerInstance returns a kafka producer instance

func (*ProducerService) ProduceEvent

func (p *ProducerService) ProduceEvent(requestedTopic, recordKey string, event models.CRCCloudEvent) error

ProduceEvent is a helper for the kafka producer

func (*ProducerService) UnsetProducer

func (p *ProducerService) UnsetProducer()

UnsetProducer sets the producer singleton to nil

type ProducerServiceInterface

type ProducerServiceInterface interface {
	GetProducerInstance() Producer
	ProduceEvent(requestedTopic, recordKey string, event models.CRCCloudEvent) error
}

ProducerServiceInterface is an interface that defines the producer service

func NewProducerService

func NewProducerService() ProducerServiceInterface

NewProducerService returns a new service

type TopicNotFoundError

type TopicNotFoundError struct{}

TopicNotFoundError indicates the account was nil

func (*TopicNotFoundError) Error

func (e *TopicNotFoundError) Error() string

type TopicService

type TopicService struct {
}

TopicService is the service struct

func (*TopicService) GetTopic

func (t *TopicService) GetTopic(requested string) (string, error)

GetTopic takes the requested kafka topic and returns the topic actually created

type TopicServiceInterface

type TopicServiceInterface interface {
	GetTopic(requested string) (string, error)
}

TopicServiceInterface is the interface for the service

func NewTopicService

func NewTopicService() TopicServiceInterface

NewTopicService returns a new service

Directories

Path Synopsis
Package mock_kafkacommon is a generated GoMock package.
Package mock_kafkacommon is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL