msgsender

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Kafka topic names for on-chain/off-chain data (Ender - on-chain data/ Vulcan - off-chain data)
	ON_CHAIN_KAFKA_TOPIC  = "to-ender"
	OFF_CHAIN_KAFKA_TOPIC = "to-vulcan"
)

Variables

View Source
var ErrKafkaAlreadyClosed = errors.New("IndexerMessageSenderKafka is already closed")
View Source
var TransactionHashHeaderKey = []byte("TransactionHash")

Functions

This section is empty.

Types

type IndexerMessageSender

type IndexerMessageSender interface {
	Enabled() bool // whether the IndexerMessageSender will send messages to the Indexer
	SendOnchainData(message Message)
	SendOffchainData(message Message)
	Close() error
}

IndexerMessageSender is an interface that exposes methods to send messages to the on-chain/off-chain data archival services in the Indexer. The `Enabled` function is used to determine if any additional computations needed to generate Indexer-specific data should be run in various modules.

type IndexerMessageSenderInMemoryCollector

type IndexerMessageSenderInMemoryCollector struct {
	// contains filtered or unexported fields
}

func NewIndexerMessageSenderInMemoryCollector

func NewIndexerMessageSenderInMemoryCollector() *IndexerMessageSenderInMemoryCollector

func (*IndexerMessageSenderInMemoryCollector) Clear

func (*IndexerMessageSenderInMemoryCollector) Close

func (*IndexerMessageSenderInMemoryCollector) Enabled

func (*IndexerMessageSenderInMemoryCollector) GetOffchainMessages

func (i *IndexerMessageSenderInMemoryCollector) GetOffchainMessages() []Message

func (*IndexerMessageSenderInMemoryCollector) GetOnchainMessages

func (i *IndexerMessageSenderInMemoryCollector) GetOnchainMessages() []Message

func (*IndexerMessageSenderInMemoryCollector) SendOffchainData

func (i *IndexerMessageSenderInMemoryCollector) SendOffchainData(message Message)

func (*IndexerMessageSenderInMemoryCollector) SendOnchainData

func (i *IndexerMessageSenderInMemoryCollector) SendOnchainData(message Message)

type IndexerMessageSenderKafka

type IndexerMessageSenderKafka struct {
	// contains filtered or unexported fields
}

Implementation of the IndexerMessageSender interface that sends data to Kafka. Will be used when the V4 application is connected to an Indexer. NOTE: This struct is go-routine safe. Messages are sent by writing to a single-channel, and a mutex and boolean variable is used to ensure `Close` only closes the underlying Kafka producer once.

func NewIndexerMessageSenderKafka

func NewIndexerMessageSenderKafka(
	indexerFlags indexer.IndexerFlags,
	config *sarama.Config,
	logger log.Logger,
) (*IndexerMessageSenderKafka, error)

func NewIndexerMessageSenderKafkaWithProducer

func NewIndexerMessageSenderKafkaWithProducer(
	producer sarama.AsyncProducer,
	logger log.Logger,
) *IndexerMessageSenderKafka

func (*IndexerMessageSenderKafka) Close

func (msgSender *IndexerMessageSenderKafka) Close() error

Close closes the underlying `AsyncProducer` and waits for all errors/success messages to be processed before returning.

func (*IndexerMessageSenderKafka) Enabled

func (msgSender *IndexerMessageSenderKafka) Enabled() bool

func (*IndexerMessageSenderKafka) SendOffchainData

func (msgSender *IndexerMessageSenderKafka) SendOffchainData(message Message)

SendOffchainData sends a key/value pair of byte slices to the off-chain data kafka topic. This method is go-routine safe.

func (*IndexerMessageSenderKafka) SendOnchainData

func (msgSender *IndexerMessageSenderKafka) SendOnchainData(message Message)

SendOnchainData sends a key/value pair of byte slices to the on-chain data kafka topic. This method is go-routine safe.

type IndexerMessageSenderNoop

type IndexerMessageSenderNoop struct {
	// contains filtered or unexported fields
}

No-op implementation of the IndexerMessageSender interface. Will be used in tests or when the V4 application is not connected to an Indexer.

func NewIndexerMessageSenderNoop

func NewIndexerMessageSenderNoop() *IndexerMessageSenderNoop

func NewIndexerMessageSenderNoopEnabled

func NewIndexerMessageSenderNoopEnabled() *IndexerMessageSenderNoop

func (*IndexerMessageSenderNoop) Close

func (msgSender *IndexerMessageSenderNoop) Close() error

func (*IndexerMessageSenderNoop) Enabled

func (msgSender *IndexerMessageSenderNoop) Enabled() bool

func (*IndexerMessageSenderNoop) SendOffchainData

func (msgSender *IndexerMessageSenderNoop) SendOffchainData(message Message)

func (*IndexerMessageSenderNoop) SendOnchainData

func (msgSender *IndexerMessageSenderNoop) SendOnchainData(message Message)

type Message

type Message struct {
	Key     []byte
	Value   []byte
	Headers []sarama.RecordHeader
}

Message is a key/value pair of byte slices that can be sent via the send functions in the IndexerMessageSender.

func (Message) AddHeader

func (msg Message) AddHeader(header MessageHeader) Message

AddHeader adds a `RecordHeader` to a `Message`. If there are already existing headers in the `Message`, the new header will be appended to the slice of existing headers.

type MessageHeader

type MessageHeader struct {
	Key   []byte
	Value []byte
}

Message header is a key/value pair of byte slices that can be added to a Message to be sent along with the main key/value pair of the Message. This is converted to a `sarama.RecordHeader` in the `Message`.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL