Documentation ¶
Index ¶
- Constants
- Variables
- type IndexerMessageSender
- type IndexerMessageSenderInMemoryCollector
- func (i *IndexerMessageSenderInMemoryCollector) Clear()
- func (i *IndexerMessageSenderInMemoryCollector) Close() error
- func (i *IndexerMessageSenderInMemoryCollector) Enabled() bool
- func (i *IndexerMessageSenderInMemoryCollector) GetOffchainMessages() []Message
- func (i *IndexerMessageSenderInMemoryCollector) GetOnchainMessages() []Message
- func (i *IndexerMessageSenderInMemoryCollector) SendOffchainData(message Message)
- func (i *IndexerMessageSenderInMemoryCollector) SendOnchainData(message Message)
- type IndexerMessageSenderKafka
- type IndexerMessageSenderNoop
- type Message
- type MessageHeader
Constants ¶
const ( // Kafka topic names for on-chain/off-chain data (Ender - on-chain data/ Vulcan - off-chain data) ON_CHAIN_KAFKA_TOPIC = "to-ender" OFF_CHAIN_KAFKA_TOPIC = "to-vulcan" )
Variables ¶
var ErrKafkaAlreadyClosed = errors.New("IndexerMessageSenderKafka is already closed")
var TransactionHashHeaderKey = []byte("TransactionHash")
Functions ¶
This section is empty.
Types ¶
type IndexerMessageSender ¶
type IndexerMessageSender interface { Enabled() bool // whether the IndexerMessageSender will send messages to the Indexer SendOnchainData(message Message) SendOffchainData(message Message) Close() error }
IndexerMessageSender is an interface that exposes methods to send messages to the on-chain/off-chain data archival services in the Indexer. The `Enabled` function is used to determine if any additional computations needed to generate Indexer-specific data should be run in various modules.
type IndexerMessageSenderInMemoryCollector ¶
type IndexerMessageSenderInMemoryCollector struct {
// contains filtered or unexported fields
}
func NewIndexerMessageSenderInMemoryCollector ¶
func NewIndexerMessageSenderInMemoryCollector() *IndexerMessageSenderInMemoryCollector
func (*IndexerMessageSenderInMemoryCollector) Clear ¶
func (i *IndexerMessageSenderInMemoryCollector) Clear()
func (*IndexerMessageSenderInMemoryCollector) Close ¶
func (i *IndexerMessageSenderInMemoryCollector) Close() error
func (*IndexerMessageSenderInMemoryCollector) Enabled ¶
func (i *IndexerMessageSenderInMemoryCollector) Enabled() bool
func (*IndexerMessageSenderInMemoryCollector) GetOffchainMessages ¶
func (i *IndexerMessageSenderInMemoryCollector) GetOffchainMessages() []Message
func (*IndexerMessageSenderInMemoryCollector) GetOnchainMessages ¶
func (i *IndexerMessageSenderInMemoryCollector) GetOnchainMessages() []Message
func (*IndexerMessageSenderInMemoryCollector) SendOffchainData ¶
func (i *IndexerMessageSenderInMemoryCollector) SendOffchainData(message Message)
func (*IndexerMessageSenderInMemoryCollector) SendOnchainData ¶
func (i *IndexerMessageSenderInMemoryCollector) SendOnchainData(message Message)
type IndexerMessageSenderKafka ¶
type IndexerMessageSenderKafka struct {
// contains filtered or unexported fields
}
Implementation of the IndexerMessageSender interface that sends data to Kafka. Will be used when the V4 application is connected to an Indexer. NOTE: This struct is go-routine safe. Messages are sent by writing to a single-channel, and a mutex and boolean variable is used to ensure `Close` only closes the underlying Kafka producer once.
func NewIndexerMessageSenderKafka ¶
func NewIndexerMessageSenderKafka( indexerFlags indexer.IndexerFlags, config *sarama.Config, logger log.Logger, ) (*IndexerMessageSenderKafka, error)
func NewIndexerMessageSenderKafkaWithProducer ¶
func NewIndexerMessageSenderKafkaWithProducer( producer sarama.AsyncProducer, logger log.Logger, ) *IndexerMessageSenderKafka
func (*IndexerMessageSenderKafka) Close ¶
func (msgSender *IndexerMessageSenderKafka) Close() error
Close closes the underlying `AsyncProducer` and waits for all errors/success messages to be processed before returning.
func (*IndexerMessageSenderKafka) Enabled ¶
func (msgSender *IndexerMessageSenderKafka) Enabled() bool
func (*IndexerMessageSenderKafka) SendOffchainData ¶
func (msgSender *IndexerMessageSenderKafka) SendOffchainData(message Message)
SendOffchainData sends a key/value pair of byte slices to the off-chain data kafka topic. This method is go-routine safe.
func (*IndexerMessageSenderKafka) SendOnchainData ¶
func (msgSender *IndexerMessageSenderKafka) SendOnchainData(message Message)
SendOnchainData sends a key/value pair of byte slices to the on-chain data kafka topic. This method is go-routine safe.
type IndexerMessageSenderNoop ¶
type IndexerMessageSenderNoop struct {
// contains filtered or unexported fields
}
No-op implementation of the IndexerMessageSender interface. Will be used in tests or when the V4 application is not connected to an Indexer.
func NewIndexerMessageSenderNoop ¶
func NewIndexerMessageSenderNoop() *IndexerMessageSenderNoop
func NewIndexerMessageSenderNoopEnabled ¶
func NewIndexerMessageSenderNoopEnabled() *IndexerMessageSenderNoop
func (*IndexerMessageSenderNoop) Close ¶
func (msgSender *IndexerMessageSenderNoop) Close() error
func (*IndexerMessageSenderNoop) Enabled ¶
func (msgSender *IndexerMessageSenderNoop) Enabled() bool
func (*IndexerMessageSenderNoop) SendOffchainData ¶
func (msgSender *IndexerMessageSenderNoop) SendOffchainData(message Message)
func (*IndexerMessageSenderNoop) SendOnchainData ¶
func (msgSender *IndexerMessageSenderNoop) SendOnchainData(message Message)
type Message ¶
type Message struct { Key []byte Value []byte Headers []sarama.RecordHeader }
Message is a key/value pair of byte slices that can be sent via the send functions in the IndexerMessageSender.
func (Message) AddHeader ¶
func (msg Message) AddHeader(header MessageHeader) Message
AddHeader adds a `RecordHeader` to a `Message`. If there are already existing headers in the `Message`, the new header will be appended to the slice of existing headers.
type MessageHeader ¶
Message header is a key/value pair of byte slices that can be added to a Message to be sent along with the main key/value pair of the Message. This is converted to a `sarama.RecordHeader` in the `Message`.