Documentation ¶
Overview ¶
Package bloombits implements bloom filtering on batches of data.
Index ¶
- type Generator
- type Matcher
- type MatcherSession
- func (s *MatcherSession) AllocateRetrieval() (uint, bool)
- func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64
- func (s *MatcherSession) Close()
- func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte)
- func (s *MatcherSession) Error() error
- func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval)
- func (s *MatcherSession) PendingSections(bit uint) int
- type Retrieval
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Generator ¶
type Generator struct {
// contains filtered or unexported fields
}
Generator takes a number of bloom filters and generates the rotated bloom bits to be used for batched filtering.
func NewGenerator ¶
NewGenerator creates a rotated bloom generator that can iteratively fill a batched bloom filter's bits.
type Matcher ¶
type Matcher struct {
// contains filtered or unexported fields
}
Matcher is a pipelined system of schedulers and logic matchers which perform binary AND/OR operations on the bit-streams, creating a stream of potential blocks to inspect for data content.
Example ¶
var start uint64 = 0 var end uint64 = 20000 var sectionSize uint64 = 4096 var maxReqCount = 16 var requested uint32 // We want to search the specified account account := common.HexToAddress("0x12345") filters := [][][]byte{{account.Bytes()}} // Set up the pipline matcher := NewMatcher(sectionSize, filters) quit := make(chan struct{}) matches := make(chan uint64, 16) session, err := matcher.Start(context.Background(), start, end-1, matches) if err != nil { fmt.Println(err.Error()) } requests := make(chan chan *Retrieval) go session.Multiplex(maxReqCount, 0, requests) // Serve the Multiplexer go func() { for { // Wait for a service request or a shutdown select { case <-quit: return case request := <-requests: task := <-request task.Bitsets = make([][]byte, len(task.Sections)) for i, section := range task.Sections { task.Bitsets[i] = makeBitset([]common.Address{account}, task.Bit, section) atomic.AddUint32(&requested, 1) } request <- task } } }() // Retrieve the results from channel matches for i := start; i < end; i++ { if i%4096 == 0 { match, ok := <-matches if !ok { fmt.Printf("%d Channel closed unexpected\n", i) } if match != i { fmt.Printf("Expected %d, Got %d\n", i, match) } fmt.Printf("Found match %d\n", i) } } // Check and clean up. The channels should be closed when data are fed in from start to end _, ok := <-matches if ok { fmt.Printf("Channel not closed as expected") } session.Close() close(quit)
Output: Found match 0 Found match 4096 Found match 8192 Found match 12288 Found match 16384
func NewMatcher ¶
NewMatcher creates a new pipeline for retrieving bloom bit streams and doing address and topic filtering on them. Setting a filter component to `nil` is allowed and will result in that filter rule being skipped (OR 0x11...1). For current code in eth/filters/filter.go:90, the filters have only one filter, with a list of addresses. sectionSize == 4096
func (*Matcher) Start ¶
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error)
Start starts the matching process and returns a stream of bloom matches in a given range of blocks. If there are no more matches in the range, the result channel is closed. results is channel of size 64 in current code. ctx define
type MatcherSession ¶
type MatcherSession struct {
// contains filtered or unexported fields
}
MatcherSession is returned by a started matcher to be used as a terminator for the actively running matching operation.
func (*MatcherSession) AllocateRetrieval ¶
func (s *MatcherSession) AllocateRetrieval() (uint, bool)
AllocateRetrieval assigns a bloom bit index to a client process that can either immediately request and fetch the section contents assigned to this bit or wait a little while for more sections to be requested.
func (*MatcherSession) AllocateSections ¶
func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64
AllocateSections assigns all or part of an already allocated bit-task queue to the requesting process.
func (*MatcherSession) Close ¶
func (s *MatcherSession) Close()
Close stops the matching process and waits for all subprocesses to terminate before returning. The timeout may be used for graceful shutdown, allowing the currently running retrievals to complete before this time.
func (*MatcherSession) DeliverSections ¶
func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte)
DeliverSections delivers a batch of section bit-vectors for a specific bloom bit index to be injected into the processing pipeline.
func (*MatcherSession) Error ¶
func (s *MatcherSession) Error() error
Error returns any failure encountered during the matching session.
func (*MatcherSession) Multiplex ¶
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval)
Multiplex polls the matcher session for retrieval tasks and multiplexes it into the requested retrieval queue to be serviced together with other sessions.
This method will block for the lifetime of the session. Even after termination of the session, any request in-flight need to be responded to! Empty responses are fine though in that case.
func (*MatcherSession) PendingSections ¶
func (s *MatcherSession) PendingSections(bit uint) int
PendingSections returns the number of pending section retrievals belonging to the given bloom bit index.
type Retrieval ¶
type Retrieval struct { Bit uint Sections []uint64 Bitsets [][]byte Context context.Context Error error }
Retrieval represents a request for retrieval task assignments for a given bit with the given number of fetch elements, or a response for such a request. It can also have the actual results set to be used as a delivery data struct.
The contest and error fields are used by the light client to terminate matching early if an error is encountered on some path of the pipeline.