transports

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2021 License: MIT Imports: 19 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Functions

func CreateTopicIfDoesNotExist

func CreateTopicIfDoesNotExist(brokerAddr, topic string, numPartitions int32, configEntries map[string]*string) error

func ParseKafkaURL

func ParseKafkaURL(brokerURL string) ([]string, *sarama.Config)

Types

type Consumer

type Consumer interface {
	// Start sets up communication with the broker and begins processing
	// messages. If you want to ensure receipt of 100% of messages, you should
	// call Start() only after setting up subscriptions with Subscribe()
	Start() error
	// Subscribe enables subscribing to either oredred chain updates or unordered
	// pending batches. Calling Subscribe on a chan *ChainUpdate will return a
	// subscription for ordered chain updates. Calling subscribe on a
	// *PendingBatch will return a subscription for unordered pending batches.
	Subscribe(ch interface{}) types.Subscription
	// SubscribeReorg subscribes to information about large chain reorgs.
	SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
	// Close shuts down the transport layer, which in turn will cause
	// subscriptions to stop producing messages.
	Close()
	Ready() <-chan struct{}
	WhyNotReady(types.Hash) string
}

Consumer can be used to receive messages over a transport layer.

func NewKafkaConsumer

func NewKafkaConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)

NewKafkaConsumer provides a transports.Consumer that pulls messages from a Kafka broker

func NewNullConsumer

func NewNullConsumer() Consumer

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Close

func (kc *KafkaConsumer) Close()

func (*KafkaConsumer) Ready

func (kc *KafkaConsumer) Ready() <-chan struct{}

func (*KafkaConsumer) Start

func (kc *KafkaConsumer) Start() error

func (*KafkaConsumer) Subscribe

func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription

func (*KafkaConsumer) SubscribeReorg

func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription

func (*KafkaConsumer) WhyNotReady

func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func (*KafkaProducer) AddBlock

func (kp *KafkaProducer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error

func (*KafkaProducer) LatestBlockFromFeed

func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)

LatestBlockFromFeed scans the feed the producer is configured for and finds the latest block number. This should be used once on startup, and is intended to allow producers to sync to a particular block before the begin emitting messages. Producers should start emitting when they reach this number, to avoid skipped blocks (which will hault consumers). Producer applications should provide some kind of override, resuming at a block specified by an operator in case messages are needed to start on the correct side of a reorg while the feed has messages from a longer but invalid chain.

func (*KafkaProducer) Reorg

func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)

func (*KafkaProducer) SendBatch

func (kp *KafkaProducer) SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error

type KafkaResumptionMessage

type KafkaResumptionMessage struct {
	// contains filtered or unexported fields
}

func (*KafkaResumptionMessage) Key

func (m *KafkaResumptionMessage) Key() []byte

func (*KafkaResumptionMessage) Offset

func (m *KafkaResumptionMessage) Offset() int64

func (*KafkaResumptionMessage) Source

func (m *KafkaResumptionMessage) Source() string

func (*KafkaResumptionMessage) Time

func (m *KafkaResumptionMessage) Time() time.Time

func (*KafkaResumptionMessage) Value

func (m *KafkaResumptionMessage) Value() []byte

type Producer

type Producer interface {
	LatestBlockFromFeed() (int64, error)
	// AddBlock will send information about a block over the transport layer.
	AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error
	// SendBatch will send information about batches over the transport layer.
	// Batches should correspond to batches indicated in a previous AddBlock call
	SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error
	// Reorg will send information about large chain reorgs over the transport
	// layer. The "done" function returned by the Reorg() method should be called
	// after all blocks and batches for a given reorg have been sent to the
	// producer.
	Reorg(number int64, hash types.Hash) (func(), error)
}

Producer can be used to send block metadata over a messaging transport.

func NewKafkaProducer

func NewKafkaProducer(brokerURL, defaultTopic string, schema map[string]string) (Producer, error)

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL