kafka

package
v0.0.0-...-ffe42a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultUpperBound     int64   = 80 * (1024 * 1024)
	DefaultResetThreshold float64 = 0.8
	DefaultLogFrequency           = 30 * time.Second
)
View Source
var (
	DefaultSendRetryDelay = 5 * time.Second
)

Functions

func GetCircuitBreaker

func GetCircuitBreaker() *circuitBreaker

func InitCircuitBreaker

func InitCircuitBreaker(conf *CircuitBreakerConf) error

func KafkaCommonCobraInit

func KafkaCommonCobraInit(cmd *cobra.Command, kconf *KafkaCommonConf)

KafkaCommonCobraInit commandline common parameter init for Kafka

func KafkaValidateConf

func KafkaValidateConf(kconf *KafkaCommonConf) (err error)

KafkaValidateConf validates supplied configuration

Types

type CircuitBreaker

type CircuitBreaker interface {
	Update(topic string, partition int32, hwm, offset, size int64)
	Check(topic string) error
}

type CircuitBreakerConf

type CircuitBreakerConf struct {
	Enabled         bool    `json:"enabled,omitempty"`
	UpperBound      int64   `json:"upperBound,omitempty"`
	ResetThreshold  float64 `json:"resetFraction,omitempty"`
	LogFrequencySec int     `json:"logFrequencySec,omitempty"`
	// contains filtered or unexported fields
}

CircuitBreakerConf defines the YAML config structure for the circuit breaker

type KafkaBridge

type KafkaBridge struct {
	// contains filtered or unexported fields
}

KafkaBridge receives messages from Kafka and dispatches them to go-ethereum over JSON/RPC

func NewKafkaBridge

func NewKafkaBridge(printYAML *bool) *KafkaBridge

NewKafkaBridge creates a new KafkaBridge

func (*KafkaBridge) CobraInit

func (k *KafkaBridge) CobraInit() (cmd *cobra.Command)

CobraInit retruns a cobra command to configure this KafkaBridge

func (*KafkaBridge) Conf

func (k *KafkaBridge) Conf() *KafkaBridgeConf

Conf gets the config for this bridge

func (*KafkaBridge) ConsumerMessagesLoop

func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ConsumerMessagesLoop - goroutine to process messages

func (*KafkaBridge) ProducerErrorLoop

func (k *KafkaBridge) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ProducerErrorLoop - goroutine to process producer errors

func (*KafkaBridge) ProducerSuccessLoop

func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)

ProducerSuccessLoop - goroutine to process producer successes

func (*KafkaBridge) SetConf

func (k *KafkaBridge) SetConf(conf *KafkaBridgeConf)

SetConf sets the config for this bridge

func (*KafkaBridge) Start

func (k *KafkaBridge) Start(receiptStore receipts.ReceiptStorePersistence) (err error)

Start kicks off the bridge

func (*KafkaBridge) ValidateConf

func (k *KafkaBridge) ValidateConf() (err error)

ValidateConf validates the configuration

type KafkaBridgeConf

type KafkaBridgeConf struct {
	CircuitBreaker CircuitBreakerConf `json:"circuitBreaker,omitempty"`
	Kafka          KafkaCommonConf    `json:"kafka"`
	MaxInFlight    int                `json:"maxInFlight"`
	tx.TxnProcessorConf
	eth.RPCConf
}

KafkaBridgeConf defines the YAML config structure for a Kafka bridge instance

type KafkaClient

type KafkaClient interface {
	NewProducer(KafkaCommon) (KafkaProducer, error)
	NewConsumer(KafkaCommon) (KafkaConsumer, error)
	Brokers() []*sarama.Broker
}

KafkaClient is the kafka client

type KafkaCommon

type KafkaCommon interface {
	ValidateConf() error
	CobraInit(cmd *cobra.Command)
	Start() error
	Conf() *KafkaCommonConf
	Producer() KafkaProducer
}

KafkaCommon is the base interface for bridges that interact with Kafka

func NewKafkaCommon

func NewKafkaCommon(kf KafkaFactory, conf *KafkaCommonConf, kafkaGoRoutines KafkaGoRoutines) (k KafkaCommon)

NewKafkaCommon constructs a new KafkaCommon instance

type KafkaCommonConf

type KafkaCommonConf struct {
	Brokers          []string `json:"brokers"`
	ClientID         string   `json:"clientID"`
	ConsumerGroup    string   `json:"consumerGroup"`
	TopicIn          string   `json:"topicIn"`
	TopicOut         string   `json:"topicOut"`
	SendRetryDelayMS int      `json:"sendRetryDelayMS"`
	ProducerFlush    struct {
		Frequency int `json:"frequency"`
		Messages  int `json:"messages"`
		Bytes     int `json:"bytes"`
	} `json:"producerFlush"`
	SASL struct {
		Username string
		Password string
	} `json:"sasl"`
	TLS utils.TLSConfig `json:"tls"`
	// contains filtered or unexported fields
}

KafkaCommonConf - Common configuration for Kafka

type KafkaConsumer

type KafkaConsumer interface {
	Close() error
	Messages() <-chan *sarama.ConsumerMessage
	Errors() <-chan error
	MarkOffset(*sarama.ConsumerMessage, string)
}

KafkaConsumer provides the interface passed from KafkaCommon to consume messages

type KafkaFactory

type KafkaFactory interface {
	NewClient(KafkaCommon, *sarama.Config) (KafkaClient, error)
}

KafkaFactory builds new clients

type KafkaGoRoutines

type KafkaGoRoutines interface {
	ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
	ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
}

KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon

type KafkaProducer

type KafkaProducer interface {
	AsyncClose()
	Input(topic string) (chan<- *sarama.ProducerMessage, error)
	Successes() <-chan *sarama.ProducerMessage
	Errors() <-chan *sarama.ProducerError
}

KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)

type MockKafkaConsumer

type MockKafkaConsumer struct {
	MockMessages       chan *sarama.ConsumerMessage
	MockErrors         chan error
	OffsetsByPartition map[int32]int64
}

MockKafkaConsumer - mock

func (*MockKafkaConsumer) Close

func (c *MockKafkaConsumer) Close() error

Close - mock

func (*MockKafkaConsumer) Errors

func (c *MockKafkaConsumer) Errors() <-chan error

Errors - mock

func (*MockKafkaConsumer) MarkOffset

func (c *MockKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset - mock

func (*MockKafkaConsumer) Messages

func (c *MockKafkaConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages - mock

type MockKafkaFactory

type MockKafkaFactory struct {
	ClientConf         *sarama.Config
	ErrorOnNewClient   error
	ErrorOnNewProducer error
	ErrorOnNewConsumer error
	Producer           *MockKafkaProducer
	Consumer           *MockKafkaConsumer
}

MockKafkaFactory - mock

func NewErrorMockKafkaFactory

func NewErrorMockKafkaFactory(errorOnNewClient error, errorOnNewConsumer error, errorOnNewProducer error) *MockKafkaFactory

NewErrorMockKafkaFactory - mock

func NewMockKafkaFactory

func NewMockKafkaFactory() *MockKafkaFactory

NewMockKafkaFactory - mock

func (*MockKafkaFactory) Brokers

func (f *MockKafkaFactory) Brokers() []*sarama.Broker

Brokers - mock

func (*MockKafkaFactory) NewClient

func (f *MockKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (KafkaClient, error)

NewClient - mock

func (*MockKafkaFactory) NewConsumer

func (f *MockKafkaFactory) NewConsumer(k KafkaCommon) (KafkaConsumer, error)

NewConsumer - mock

func (*MockKafkaFactory) NewProducer

func (f *MockKafkaFactory) NewProducer(k KafkaCommon) (KafkaProducer, error)

NewProducer - mock

type MockKafkaProducer

type MockKafkaProducer struct {
	MockInput      chan *sarama.ProducerMessage
	MockSuccesses  chan *sarama.ProducerMessage
	MockErrors     chan *sarama.ProducerError
	FirstSendError error
	Closed         bool
	CloseSync      sync.Mutex
}

MockKafkaProducer - mock

func (*MockKafkaProducer) AsyncClose

func (p *MockKafkaProducer) AsyncClose()

AsyncClose - mock

func (*MockKafkaProducer) Errors

func (p *MockKafkaProducer) Errors() <-chan *sarama.ProducerError

Errors - mock

func (*MockKafkaProducer) Input

func (p *MockKafkaProducer) Input(topic string) (chan<- *sarama.ProducerMessage, error)

Input - mock

func (*MockKafkaProducer) Successes

func (p *MockKafkaProducer) Successes() <-chan *sarama.ProducerMessage

Successes - mock

type SaramaKafkaFactory

type SaramaKafkaFactory struct{}

SaramaKafkaFactory - uses sarama

func (*SaramaKafkaFactory) NewClient

func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (c KafkaClient, err error)

NewClient - returns a new client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL