subscriber

package
v0.1.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2024 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetQueryHash added in v0.0.2

func GetQueryHash(chainId *big.Int, query ethereum.FilterQuery) common.Hash

func GetQueryKey

func GetQueryKey(chainId *big.Int, query ethereum.FilterQuery) string

Types

type ChainSubscriber

type ChainSubscriber struct {
	// contains filtered or unexported fields
}

ChainSubscriber implements Subscriber interface

func NewChainSubscriber

func NewChainSubscriber(c *ethclient.Client, storage SubscriberStorage) (*ChainSubscriber, error)

NewChainSubscriber .

func (*ChainSubscriber) Close added in v0.0.7

func (s *ChainSubscriber) Close()

func (*ChainSubscriber) FilterLogs

func (cs *ChainSubscriber) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)

func (*ChainSubscriber) FilterLogsWithChannel

func (cs *ChainSubscriber) FilterLogsWithChannel(ctx context.Context, q ethereum.FilterQuery, logsChan chan<- types.Log, watch bool, closeOnExit bool) (err error)

TODO: 3. cache all of finalized historical data, e.g., blockByHash, txByHash

func (*ChainSubscriber) GetBlockConfirmationsOnSubscription added in v0.0.2

func (cs *ChainSubscriber) GetBlockConfirmationsOnSubscription() uint64

func (*ChainSubscriber) GetQueryHandler added in v0.0.9

func (cs *ChainSubscriber) GetQueryHandler() QueryHandler

func (*ChainSubscriber) SetBlockConfirmationsOnSubscription added in v0.0.2

func (cs *ChainSubscriber) SetBlockConfirmationsOnSubscription(confirmations uint64)

func (*ChainSubscriber) SetBlocksPerScan added in v0.0.6

func (s *ChainSubscriber) SetBlocksPerScan(blocksPerScan uint64)

func (*ChainSubscriber) SetBuffer added in v0.0.10

func (cs *ChainSubscriber) SetBuffer(buffer int)

func (*ChainSubscriber) SetFetchMissingHeaders added in v0.1.11

func (cs *ChainSubscriber) SetFetchMissingHeaders(enable bool)

func (*ChainSubscriber) SetMaxBlocksPerScan added in v0.1.4

func (s *ChainSubscriber) SetMaxBlocksPerScan(maxBlocksPerScan uint64)

func (*ChainSubscriber) SetQueryHandler added in v0.0.7

func (cs *ChainSubscriber) SetQueryHandler(handler QueryHandler)

func (*ChainSubscriber) SetRetryInterval added in v0.0.6

func (s *ChainSubscriber) SetRetryInterval(retryInterval time.Duration)

func (*ChainSubscriber) SubmitQuery added in v0.0.7

func (cs *ChainSubscriber) SubmitQuery(query ethereum.FilterQuery) error

func (*ChainSubscriber) SubscribeFilterLogs

func (cs *ChainSubscriber) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error)

func (*ChainSubscriber) SubscribeNewHead

func (cs *ChainSubscriber) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (sub ethereum.Subscription, err error)

SubscribeNewHead .

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

func NewMemoryStorage

func NewMemoryStorage(chainId *big.Int) *MemoryStorage

func (*MemoryStorage) FilterLogs added in v0.0.11

func (s *MemoryStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)

func (*MemoryStorage) IsFilterLogsSupported added in v0.0.11

func (s *MemoryStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool

func (*MemoryStorage) LatestBlockForQuery

func (s *MemoryStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)

func (*MemoryStorage) LatestLogForQuery

func (s *MemoryStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)

func (*MemoryStorage) SaveFilterLogs added in v0.0.11

func (s *MemoryStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)

func (*MemoryStorage) SaveLatestBlockForQuery

func (s *MemoryStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error

func (*MemoryStorage) SaveLatestLogForQuery

func (s *MemoryStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, l types.Log) error

type Query added in v0.0.8

type Query struct {
	ChainID *big.Int
	ethereum.FilterQuery
}

func NewQuery added in v0.0.8

func NewQuery(chainId *big.Int, q ethereum.FilterQuery) Query

func (Query) Hash added in v0.0.8

func (q Query) Hash() common.Hash

type QueryHandler added in v0.0.7

type QueryHandler interface {
	// Please update query states by handler-self, otherwise
	// logs may be replayed
	SubscriberStorage
	// Subscriber will call back it for handling when incoming logs are ready.
	// If log.Address is address(0), just for updating block number
	HandleQuery(ctx context.Context, query Query, log types.Log) error
}

Used only for handler set && query.ToBlock == nil

type QueryStateReader added in v0.0.7

type QueryStateReader interface {
	LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)
	LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)

	// Save query result to save network io
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
	// Report whether client can use `FilterLogs` in the storage instead of ethclient.FilterLogs
	IsFilterLogsSupported(q ethereum.FilterQuery) bool
}

type QueryStateWriter added in v0.0.7

type QueryStateWriter interface {
	// Must call the function after all logs was handled for the block.
	SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error
	// Must call the function after each log was handled .
	SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error

	// Save query result to save network io
	SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)
}

type RedisStorage

type RedisStorage struct {
	// contains filtered or unexported fields
}

func NewRedisStorage

func NewRedisStorage(chainId *big.Int, pool redis.Pool) *RedisStorage

func (*RedisStorage) FilterLogs added in v0.0.11

func (s *RedisStorage) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)

func (*RedisStorage) IsFilterLogsSupported added in v0.0.11

func (s *RedisStorage) IsFilterLogsSupported(q ethereum.FilterQuery) bool

func (*RedisStorage) LatestBlockForQuery

func (s *RedisStorage) LatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery) (uint64, error)

func (*RedisStorage) LatestLogForQuery

func (s *RedisStorage) LatestLogForQuery(ctx context.Context, query ethereum.FilterQuery) (types.Log, error)

func (*RedisStorage) QueryLock

func (s *RedisStorage) QueryLock(q ethereum.FilterQuery) sync.Locker

func (*RedisStorage) SaveFilterLogs added in v0.0.11

func (s *RedisStorage) SaveFilterLogs(q ethereum.FilterQuery, logs []types.Log) (err error)

func (*RedisStorage) SaveLatestBlockForQuery

func (s *RedisStorage) SaveLatestBlockForQuery(ctx context.Context, query ethereum.FilterQuery, blockNum uint64) error

func (*RedisStorage) SaveLatestLogForQuery

func (s *RedisStorage) SaveLatestLogForQuery(ctx context.Context, query ethereum.FilterQuery, log types.Log) error

type Subscriber

type Subscriber interface {
	Close()
	GetQueryHandler() QueryHandler
	GetBlockConfirmationsOnSubscription() uint64

	SetBuffer(buffer int)
	SetBlockConfirmationsOnSubscription(confirmations uint64)
	SetQueryHandler(handler QueryHandler) // use QueryHandler instead of SubscriberStorage if handler set
	SetFetchMissingHeaders(enable bool)

	// Provided for handler submitting query.
	SubmitQuery(query ethereum.FilterQuery) error
	SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
	SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) (logs []types.Log, err error)
}

Subscriber represents a set of methods about chain subscription

type SubscriberStorage

type SubscriberStorage interface {
	QueryStateReader
	QueryStateWriter
}

Used only for function `SubscribeFilterlogs` && query.ToBlock == nil

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL