Documentation ¶
Index ¶
- Variables
- func CreateTopicIfDoesNotExist(brokerAddr, topic string, numPartitions int32, ...) error
- func ParseKafkaURL(brokerURL string) ([]string, *sarama.Config)
- type Consumer
- type KafkaConsumer
- func (kc *KafkaConsumer) Close()
- func (kc *KafkaConsumer) Ready() <-chan struct{}
- func (kc *KafkaConsumer) Start() error
- func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription
- func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
- func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string
- type KafkaProducer
- func (kp *KafkaProducer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, ...) error
- func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)
- func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)
- func (kp *KafkaProducer) SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error
- type KafkaResumptionMessage
- type Producer
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
Types ¶
type Consumer ¶
type Consumer interface { // Start sets up communication with the broker and begins processing // messages. If you want to ensure receipt of 100% of messages, you should // call Start() only after setting up subscriptions with Subscribe() Start() error // Subscribe enables subscribing to either oredred chain updates or unordered // pending batches. Calling Subscribe on a chan *ChainUpdate will return a // subscription for ordered chain updates. Calling subscribe on a // *PendingBatch will return a subscription for unordered pending batches. Subscribe(ch interface{}) types.Subscription // SubscribeReorg subscribes to information about large chain reorgs. SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription // Close shuts down the transport layer, which in turn will cause // subscriptions to stop producing messages. Close() Ready() <-chan struct{} WhyNotReady(types.Hash) string }
Consumer can be used to receive messages over a transport layer.
func NewKafkaConsumer ¶
func NewKafkaConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)
NewKafkaConsumer provides a transports.Consumer that pulls messages from a Kafka broker
func NewNullConsumer ¶
func NewNullConsumer() Consumer
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close()
func (*KafkaConsumer) Ready ¶
func (kc *KafkaConsumer) Ready() <-chan struct{}
func (*KafkaConsumer) Start ¶
func (kc *KafkaConsumer) Start() error
func (*KafkaConsumer) Subscribe ¶
func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription
func (*KafkaConsumer) SubscribeReorg ¶
func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
func (*KafkaConsumer) WhyNotReady ¶
func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func (*KafkaProducer) LatestBlockFromFeed ¶
func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)
LatestBlockFromFeed scans the feed the producer is configured for and finds the latest block number. This should be used once on startup, and is intended to allow producers to sync to a particular block before the begin emitting messages. Producers should start emitting when they reach this number, to avoid skipped blocks (which will hault consumers). Producer applications should provide some kind of override, resuming at a block specified by an operator in case messages are needed to start on the correct side of a reorg while the feed has messages from a longer but invalid chain.
func (*KafkaProducer) Reorg ¶
func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)
type KafkaResumptionMessage ¶
type KafkaResumptionMessage struct {
// contains filtered or unexported fields
}
func (*KafkaResumptionMessage) Key ¶
func (m *KafkaResumptionMessage) Key() []byte
func (*KafkaResumptionMessage) Offset ¶
func (m *KafkaResumptionMessage) Offset() int64
func (*KafkaResumptionMessage) Source ¶
func (m *KafkaResumptionMessage) Source() string
func (*KafkaResumptionMessage) Time ¶
func (m *KafkaResumptionMessage) Time() time.Time
func (*KafkaResumptionMessage) Value ¶
func (m *KafkaResumptionMessage) Value() []byte
type Producer ¶
type Producer interface { LatestBlockFromFeed() (int64, error) // AddBlock will send information about a block over the transport layer. AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error // SendBatch will send information about batches over the transport layer. // Batches should correspond to batches indicated in a previous AddBlock call SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error // Reorg will send information about large chain reorgs over the transport // layer. The "done" function returned by the Reorg() method should be called // after all blocks and batches for a given reorg have been sent to the // producer. Reorg(number int64, hash types.Hash) (func(), error) }
Producer can be used to send block metadata over a messaging transport.
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool