Documentation ¶
Index ¶
- Variables
- func CreateTopicIfDoesNotExist(brokerAddr, topic string, numPartitions int32, ...) error
- func ParseKafkaURL(brokerURL string) ([]string, *sarama.Config)
- func ResumptionForTimestamp(brokerParams []BrokerParams, timestamp int64) ([]byte, error)
- type BrokerParams
- type Consumer
- func NewKafkaConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, ...) (Consumer, error)
- func NewNullConsumer() Consumer
- func ResolveConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, ...) (Consumer, error)
- func ResolveMuxConsumer(brokerParams []BrokerParams, resumption []byte, lastNumber int64, ...) (Consumer, error)
- type KafkaConsumer
- func (kc *KafkaConsumer) Close()
- func (kc *KafkaConsumer) ProducerCount(d time.Duration) uint
- func (kc *KafkaConsumer) Ready() <-chan struct{}
- func (kc *KafkaConsumer) Start() error
- func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription
- func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
- func (kc *KafkaConsumer) Waiter() waiter.Waiter
- func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string
- type KafkaProducer
- func (kp *KafkaProducer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, ...) error
- func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)
- func (kp *KafkaProducer) ProducerCount(d time.Duration) uint
- func (kp *KafkaProducer) PurgeReplayCache()
- func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)
- func (kp *KafkaProducer) SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error
- func (kp *KafkaProducer) SetHealth(b bool)
- type KafkaResumptionMessage
- type Producer
- func NewKafkaProducer(brokerURL, defaultTopic string, schema map[string]string) (Producer, error)
- func NewWebsocketProducer(wsurl string, resumer StreamsResumption) (Producer, error)
- func ResolveMuxProducer(brokerParams []ProducerBrokerParams, resumer StreamsResumption) (Producer, error)
- func ResolveProducer(brokerURL, defaultTopic string, schema map[string]string) (Producer, error)
- func ResolveProducerWithResumer(brokerURL, defaultTopic string, schema map[string]string, ...) (Producer, error)
- type ProducerBrokerParams
- type ProducerCounter
- type StreamsResumption
- type TransportBatch
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func ResumptionForTimestamp ¶ added in v0.0.16
func ResumptionForTimestamp(brokerParams []BrokerParams, timestamp int64) ([]byte, error)
Types ¶
type BrokerParams ¶ added in v0.0.13
type Consumer ¶
type Consumer interface { ProducerCounter // Start sets up communication with the broker and begins processing // messages. If you want to ensure receipt of 100% of messages, you should // call Start() only after setting up subscriptions with Subscribe() Start() error // Subscribe enables subscribing to either oredred chain updates or unordered // pending batches. Calling Subscribe on a chan *ChainUpdate will return a // subscription for ordered chain updates. Calling subscribe on a // *PendingBatch will return a subscription for unordered pending batches. Subscribe(ch interface{}) types.Subscription // SubscribeReorg subscribes to information about large chain reorgs. SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription //Waiter returns a waiter.Waiter that can be used to wait for blocks by hash or number Waiter() waiter.Waiter // Close shuts down the transport layer, which in turn will cause // subscriptions to stop producing messages. Close() Ready() <-chan struct{} WhyNotReady(types.Hash) string }
Consumer can be used to receive messages over a transport layer.
func NewKafkaConsumer ¶
func NewKafkaConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)
NewKafkaConsumer provides a transports.Consumer that pulls messages from a Kafka broker
func NewNullConsumer ¶
func NewNullConsumer() Consumer
func ResolveConsumer ¶ added in v0.0.13
func ResolveConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)
brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash
func ResolveMuxConsumer ¶ added in v0.0.13
func ResolveMuxConsumer(brokerParams []BrokerParams, resumption []byte, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)
ResolveMuxConsumer takes a list of broker configurations
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close()
func (*KafkaConsumer) ProducerCount ¶ added in v1.3.0
func (kc *KafkaConsumer) ProducerCount(d time.Duration) uint
func (*KafkaConsumer) Ready ¶
func (kc *KafkaConsumer) Ready() <-chan struct{}
func (*KafkaConsumer) Start ¶
func (kc *KafkaConsumer) Start() error
func (*KafkaConsumer) Subscribe ¶
func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription
func (*KafkaConsumer) SubscribeReorg ¶
func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
func (*KafkaConsumer) Waiter ¶ added in v1.5.0
func (kc *KafkaConsumer) Waiter() waiter.Waiter
func (*KafkaConsumer) WhyNotReady ¶
func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func (*KafkaProducer) LatestBlockFromFeed ¶
func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)
LatestBlockFromFeed scans the feed the producer is configured for and finds the latest block number. This should be used once on startup, and is intended to allow producers to sync to a particular block before the begin emitting messages. Producers should start emitting when they reach this number, to avoid skipped blocks (which will hault consumers). Producer applications should provide some kind of override, resuming at a block specified by an operator in case messages are needed to start on the correct side of a reorg while the feed has messages from a longer but invalid chain.
func (*KafkaProducer) ProducerCount ¶ added in v1.3.0
func (kp *KafkaProducer) ProducerCount(d time.Duration) uint
func (*KafkaProducer) PurgeReplayCache ¶ added in v1.4.0
func (kp *KafkaProducer) PurgeReplayCache()
func (*KafkaProducer) Reorg ¶
func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)
func (*KafkaProducer) SetHealth ¶ added in v1.3.0
func (kp *KafkaProducer) SetHealth(b bool)
type KafkaResumptionMessage ¶
type KafkaResumptionMessage struct {
// contains filtered or unexported fields
}
func (*KafkaResumptionMessage) Key ¶
func (m *KafkaResumptionMessage) Key() []byte
func (*KafkaResumptionMessage) Offset ¶
func (m *KafkaResumptionMessage) Offset() int64
func (*KafkaResumptionMessage) Source ¶
func (m *KafkaResumptionMessage) Source() string
func (*KafkaResumptionMessage) Time ¶
func (m *KafkaResumptionMessage) Time() time.Time
func (*KafkaResumptionMessage) Value ¶
func (m *KafkaResumptionMessage) Value() []byte
type Producer ¶
type Producer interface { ProducerCounter LatestBlockFromFeed() (int64, error) // AddBlock will send information about a block over the transport layer. AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error // SendBatch will send information about batches over the transport layer. // Batches should correspond to batches indicated in a previous AddBlock call SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error // Reorg will send information about large chain reorgs over the transport // layer. The "done" function returned by the Reorg() method should be called // after all blocks and batches for a given reorg have been sent to the // producer. Reorg(number int64, hash types.Hash) (func(), error) // SetHealth allows producers to mark that they are in an unhealthy state and not currently producing SetHealth(bool) // Producers track which blocks they have seen to avoid replaying them. If applications // intend to replay blocks, they should call this function first. PurgeReplayCache() }
Producer can be used to send block metadata over a messaging transport.
func NewKafkaProducer ¶
func NewWebsocketProducer ¶ added in v0.0.38
func NewWebsocketProducer(wsurl string, resumer StreamsResumption) (Producer, error)
func ResolveMuxProducer ¶ added in v0.0.38
func ResolveMuxProducer(brokerParams []ProducerBrokerParams, resumer StreamsResumption) (Producer, error)
func ResolveProducer ¶ added in v0.0.13
func ResolveProducerWithResumer ¶ added in v0.0.38
type ProducerBrokerParams ¶ added in v0.0.38
type ProducerCounter ¶ added in v1.3.0
type StreamsResumption ¶ added in v0.0.38
type StreamsResumption interface { // BlocksFrom produces PendingBatches. This stream does not deal with // subbatches, so the PendingBatches must include all values. BlocksFrom // should watch for context.Done() and stop producing blocks if the context // finishes before BlocksFrom(ctx context.Context, block uint64, hash types.Hash) (chan *delivery.PendingBatch, error) GetBlock(ctx context.Context, block uint64) *delivery.PendingBatch }
type TransportBatch ¶ added in v0.0.38
type TransportBatch struct { Number hexutil.Uint64 `json:"number"` Weight *hexutil.Big `json:"weight"` Hash types.Hash `json:"hash"` ParentHash types.Hash `json:"parent"` Values map[string]hexutil.Bytes `json:"values"` Deletes []string `json:"deletes"` Batches map[string]types.Hash `json:"batches"` }
func (*TransportBatch) ToPendingBatch ¶ added in v0.0.38
func (tb *TransportBatch) ToPendingBatch() *delivery.PendingBatch
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool