Documentation ¶
Index ¶
- Variables
- func GetCircuitBreaker() *circuitBreaker
- func InitCircuitBreaker(conf *CircuitBreakerConf) error
- func KafkaCommonCobraInit(cmd *cobra.Command, kconf *KafkaCommonConf)
- func KafkaValidateConf(kconf *KafkaCommonConf) (err error)
- type CircuitBreaker
- type CircuitBreakerConf
- type KafkaBridge
- func (k *KafkaBridge) CobraInit() (cmd *cobra.Command)
- func (k *KafkaBridge) Conf() *KafkaBridgeConf
- func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
- func (k *KafkaBridge) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
- func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
- func (k *KafkaBridge) SetConf(conf *KafkaBridgeConf)
- func (k *KafkaBridge) Start(receiptStore receipts.ReceiptStorePersistence) (err error)
- func (k *KafkaBridge) ValidateConf() (err error)
- type KafkaBridgeConf
- type KafkaClient
- type KafkaCommon
- type KafkaCommonConf
- type KafkaConsumer
- type KafkaFactory
- type KafkaGoRoutines
- type KafkaProducer
- type MockKafkaConsumer
- type MockKafkaFactory
- func (f *MockKafkaFactory) Brokers() []*sarama.Broker
- func (f *MockKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (KafkaClient, error)
- func (f *MockKafkaFactory) NewConsumer(k KafkaCommon) (KafkaConsumer, error)
- func (f *MockKafkaFactory) NewProducer(k KafkaCommon) (KafkaProducer, error)
- type MockKafkaProducer
- type SaramaKafkaFactory
Constants ¶
This section is empty.
Variables ¶
var ( DefaultUpperBound int64 = 80 * (1024 * 1024) DefaultResetThreshold float64 = 0.8 DefaultLogFrequency = 30 * time.Second )
var (
DefaultSendRetryDelay = 5 * time.Second
)
Functions ¶
func GetCircuitBreaker ¶
func GetCircuitBreaker() *circuitBreaker
func InitCircuitBreaker ¶
func InitCircuitBreaker(conf *CircuitBreakerConf) error
func KafkaCommonCobraInit ¶
func KafkaCommonCobraInit(cmd *cobra.Command, kconf *KafkaCommonConf)
KafkaCommonCobraInit commandline common parameter init for Kafka
func KafkaValidateConf ¶
func KafkaValidateConf(kconf *KafkaCommonConf) (err error)
KafkaValidateConf validates supplied configuration
Types ¶
type CircuitBreaker ¶
type CircuitBreakerConf ¶
type CircuitBreakerConf struct { Enabled bool `json:"enabled,omitempty"` UpperBound int64 `json:"upperBound,omitempty"` ResetThreshold float64 `json:"resetFraction,omitempty"` LogFrequencySec int `json:"logFrequencySec,omitempty"` // contains filtered or unexported fields }
CircuitBreakerConf defines the YAML config structure for the circuit breaker
type KafkaBridge ¶
type KafkaBridge struct {
// contains filtered or unexported fields
}
KafkaBridge receives messages from Kafka and dispatches them to go-ethereum over JSON/RPC
func NewKafkaBridge ¶
func NewKafkaBridge(printYAML *bool) *KafkaBridge
NewKafkaBridge creates a new KafkaBridge
func (*KafkaBridge) CobraInit ¶
func (k *KafkaBridge) CobraInit() (cmd *cobra.Command)
CobraInit retruns a cobra command to configure this KafkaBridge
func (*KafkaBridge) Conf ¶
func (k *KafkaBridge) Conf() *KafkaBridgeConf
Conf gets the config for this bridge
func (*KafkaBridge) ConsumerMessagesLoop ¶
func (k *KafkaBridge) ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
ConsumerMessagesLoop - goroutine to process messages
func (*KafkaBridge) ProducerErrorLoop ¶
func (k *KafkaBridge) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
ProducerErrorLoop - goroutine to process producer errors
func (*KafkaBridge) ProducerSuccessLoop ¶
func (k *KafkaBridge) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup)
ProducerSuccessLoop - goroutine to process producer successes
func (*KafkaBridge) SetConf ¶
func (k *KafkaBridge) SetConf(conf *KafkaBridgeConf)
SetConf sets the config for this bridge
func (*KafkaBridge) Start ¶
func (k *KafkaBridge) Start(receiptStore receipts.ReceiptStorePersistence) (err error)
Start kicks off the bridge
func (*KafkaBridge) ValidateConf ¶
func (k *KafkaBridge) ValidateConf() (err error)
ValidateConf validates the configuration
type KafkaBridgeConf ¶
type KafkaBridgeConf struct { CircuitBreaker CircuitBreakerConf `json:"circuitBreaker,omitempty"` Kafka KafkaCommonConf `json:"kafka"` MaxInFlight int `json:"maxInFlight"` tx.TxnProcessorConf eth.RPCConf }
KafkaBridgeConf defines the YAML config structure for a Kafka bridge instance
type KafkaClient ¶
type KafkaClient interface { NewProducer(KafkaCommon) (KafkaProducer, error) NewConsumer(KafkaCommon) (KafkaConsumer, error) Brokers() []*sarama.Broker }
KafkaClient is the kafka client
type KafkaCommon ¶
type KafkaCommon interface { ValidateConf() error CobraInit(cmd *cobra.Command) Start() error Conf() *KafkaCommonConf Producer() KafkaProducer }
KafkaCommon is the base interface for bridges that interact with Kafka
func NewKafkaCommon ¶
func NewKafkaCommon(kf KafkaFactory, conf *KafkaCommonConf, kafkaGoRoutines KafkaGoRoutines) (k KafkaCommon)
NewKafkaCommon constructs a new KafkaCommon instance
type KafkaCommonConf ¶
type KafkaCommonConf struct { Brokers []string `json:"brokers"` ClientID string `json:"clientID"` ConsumerGroup string `json:"consumerGroup"` TopicIn string `json:"topicIn"` TopicOut string `json:"topicOut"` SendRetryDelayMS int `json:"sendRetryDelayMS"` ProducerFlush struct { Frequency int `json:"frequency"` Messages int `json:"messages"` Bytes int `json:"bytes"` } `json:"producerFlush"` SASL struct { Username string Password string } `json:"sasl"` TLS utils.TLSConfig `json:"tls"` // contains filtered or unexported fields }
KafkaCommonConf - Common configuration for Kafka
type KafkaConsumer ¶
type KafkaConsumer interface { Close() error Messages() <-chan *sarama.ConsumerMessage Errors() <-chan error MarkOffset(*sarama.ConsumerMessage, string) }
KafkaConsumer provides the interface passed from KafkaCommon to consume messages
type KafkaFactory ¶
type KafkaFactory interface {
NewClient(KafkaCommon, *sarama.Config) (KafkaClient, error)
}
KafkaFactory builds new clients
type KafkaGoRoutines ¶
type KafkaGoRoutines interface { ConsumerMessagesLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) ProducerErrorLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) ProducerSuccessLoop(consumer KafkaConsumer, producer KafkaProducer, wg *sync.WaitGroup) }
KafkaGoRoutines defines goroutines for processing Kafka messages from KafkaCommon
type KafkaProducer ¶
type KafkaProducer interface { AsyncClose() Input(topic string) (chan<- *sarama.ProducerMessage, error) Successes() <-chan *sarama.ProducerMessage Errors() <-chan *sarama.ProducerError }
KafkaProducer provides the interface passed from KafkaCommon to produce messages (subset of sarama)
type MockKafkaConsumer ¶
type MockKafkaConsumer struct { MockMessages chan *sarama.ConsumerMessage MockErrors chan error OffsetsByPartition map[int32]int64 }
MockKafkaConsumer - mock
func (*MockKafkaConsumer) MarkOffset ¶
func (c *MockKafkaConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
MarkOffset - mock
func (*MockKafkaConsumer) Messages ¶
func (c *MockKafkaConsumer) Messages() <-chan *sarama.ConsumerMessage
Messages - mock
type MockKafkaFactory ¶
type MockKafkaFactory struct { ClientConf *sarama.Config ErrorOnNewClient error ErrorOnNewProducer error ErrorOnNewConsumer error Producer *MockKafkaProducer Consumer *MockKafkaConsumer }
MockKafkaFactory - mock
func NewErrorMockKafkaFactory ¶
func NewErrorMockKafkaFactory(errorOnNewClient error, errorOnNewConsumer error, errorOnNewProducer error) *MockKafkaFactory
NewErrorMockKafkaFactory - mock
func (*MockKafkaFactory) Brokers ¶
func (f *MockKafkaFactory) Brokers() []*sarama.Broker
Brokers - mock
func (*MockKafkaFactory) NewClient ¶
func (f *MockKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (KafkaClient, error)
NewClient - mock
func (*MockKafkaFactory) NewConsumer ¶
func (f *MockKafkaFactory) NewConsumer(k KafkaCommon) (KafkaConsumer, error)
NewConsumer - mock
func (*MockKafkaFactory) NewProducer ¶
func (f *MockKafkaFactory) NewProducer(k KafkaCommon) (KafkaProducer, error)
NewProducer - mock
type MockKafkaProducer ¶
type MockKafkaProducer struct { MockInput chan *sarama.ProducerMessage MockSuccesses chan *sarama.ProducerMessage MockErrors chan *sarama.ProducerError FirstSendError error Closed bool CloseSync sync.Mutex }
MockKafkaProducer - mock
func (*MockKafkaProducer) Errors ¶
func (p *MockKafkaProducer) Errors() <-chan *sarama.ProducerError
Errors - mock
func (*MockKafkaProducer) Input ¶
func (p *MockKafkaProducer) Input(topic string) (chan<- *sarama.ProducerMessage, error)
Input - mock
func (*MockKafkaProducer) Successes ¶
func (p *MockKafkaProducer) Successes() <-chan *sarama.ProducerMessage
Successes - mock
type SaramaKafkaFactory ¶
type SaramaKafkaFactory struct{}
SaramaKafkaFactory - uses sarama
func (*SaramaKafkaFactory) NewClient ¶
func (f *SaramaKafkaFactory) NewClient(k KafkaCommon, clientConf *sarama.Config) (c KafkaClient, err error)
NewClient - returns a new client