Documentation ¶
Overview ¶
Package subscribers provides a comprehensive solution for managing subscriptions to Ethereum blockchain events. It supports both real-time monitoring of head block updates and detailed analysis of historical blockchain data through archive blocks. The package facilitates executing custom logic, known as hooks, in response to these events, enabling users to perform real-time actions or process historical data efficiently. It offers a robust framework for the registration, unregistration, and concurrent-safe notification of subscribers, along with foundational types and interfaces for categorization and state management of subscribers within a system. This dual focus on both live and archival blockchain data, combined with the package's flexible subscriber management infrastructure, makes it a versatile tool for applications requiring detailed blockchain event handling and processing.
Index ¶
- type AccountHookFn
- type Accounts
- type ArchiveBlock
- type Block
- type BlockHookFn
- type HookType
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) Exists(name SubscriberType) bool
- func (m *Manager) Get(name SubscriberType) (Subscriber, error)
- func (m *Manager) List() map[SubscriberType]Subscriber
- func (m *Manager) Register(name SubscriberType, sub Subscriber) error
- func (m *Manager) Subscribe(names ...SubscriberType) error
- func (m *Manager) UnRegister(name SubscriberType) error
- type Sources
- type SourcesHookFn
- type Status
- type Subscriber
- type SubscriberType
- type TxHookFn
- type Unpacker
- type UnpackerHookFn
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountHookFn ¶
type Accounts ¶
type Accounts struct {
// contains filtered or unexported fields
}
Accounts ...
func NewAccounts ¶
func NewAccounts(ctx context.Context, pool *clients.ClientPool, sm *state.State, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]AccountHookFn) (*Accounts, error)
func (*Accounts) Start ¶
Start begins listening for unpacking requests on the NATS subject specified in the options.
type ArchiveBlock ¶
type ArchiveBlock struct {
// contains filtered or unexported fields
}
ArchiveBlock represents a subscription to a range of Ethereum archive blocks. It facilitates the execution of predefined hooks on each block within the specified range.
func NewArchiveBlock ¶
func NewArchiveBlock(ctx context.Context, pool *clients.ClientPool, sm *state.State, opts *options.Subscriber, hooks map[HookType][]BlockHookFn) (*ArchiveBlock, error)
NewArchiveBlock initializes a new subscription for Ethereum archive blocks using the provided context, client pool, subscription options, and hooks. It returns an initialized ArchiveBlock object or an error if initialization fails.
func (*ArchiveBlock) Start ¶
func (b *ArchiveBlock) Start() error
Start begins the subscription process for the specified range of archive blocks. It iterates over each block in the range, retrieves it, and executes the registered hooks. Logs are generated to indicate the process status and any errors encountered.
func (*ArchiveBlock) Status ¶
func (b *ArchiveBlock) Status() Status
Status returns the current status of the archive block subscription.
func (*ArchiveBlock) Stop ¶
func (b *ArchiveBlock) Stop() error
Stop terminates the archive block subscription and performs any necessary cleanup.
type Block ¶
type Block struct {
// contains filtered or unexported fields
}
Block represents a subscription to blockchain head blocks, encapsulating the logic for starting, managing, and executing hooks on new blocks.
func NewHeadBlock ¶
func NewHeadBlock(ctx context.Context, pool *clients.ClientPool, sm *state.State, opts *options.Subscriber, hooks map[HookType][]BlockHookFn) (*Block, error)
NewHeadBlock initializes a new head block subscription using the provided context, client pool, subscription options, and hooks. It returns an initialized Block object or an error if initialization fails.
func (*Block) Start ¶
Start begins the subscription to new head blocks. It connects to an Ethereum client, subscribes to new head blocks, and executes registered hooks on each new block. Logs information about subscription status and errors encountered during block retrieval or hook execution.
type BlockHookFn ¶
BlockHookFn defines the function signature for hooks that process blocks. Implementations of this function can modify the block and must return the modified block or an error if the processing fails.
type HookType ¶
type HookType string
HookType defines the stage at which a hook should be executed relative to the block processing. It can be either before (PreHook) or after (PostHook) the block is processed.
const ( PreHook HookType = "pre" // PreHook indicates the hook will run before block processing. PostHook HookType = "post" // PostHook indicates the hook will run after block processing. )
PreHook and PostHook constants represent the hook types for pre-processing and post-processing of blocks, respectively.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates subscribers. It supports adding, removing, and notifying subscribers. Operations on Manager are safe for concurrent use by multiple goroutines.
func NewManager ¶
NewManager initializes a new Manager with the provided context. The context is used to control the lifecycle of subscribers.
func (*Manager) Close ¶
Close stops all active subscribers. It ignores subscribers that are not active.
func (*Manager) Exists ¶
func (m *Manager) Exists(name SubscriberType) bool
Exists checks if a subscriber of the given name is already registered. Returns true if the subscriber exists; otherwise, false.
func (*Manager) Get ¶
func (m *Manager) Get(name SubscriberType) (Subscriber, error)
Get retrieves a subscriber by name. Returns an error if the subscriber does not exist.
func (*Manager) List ¶
func (m *Manager) List() map[SubscriberType]Subscriber
List returns a map of all registered subscribers.
func (*Manager) Register ¶
func (m *Manager) Register(name SubscriberType, sub Subscriber) error
Register adds a new subscriber under the given name. It returns an error if a subscriber with the same name already exists.
func (*Manager) Subscribe ¶
func (m *Manager) Subscribe(names ...SubscriberType) error
Subscribe starts the specified subscribers or all if none are specified. It returns an error if any subscriber fails to start.
func (*Manager) UnRegister ¶
func (m *Manager) UnRegister(name SubscriberType) error
UnRegister removes a subscriber identified by name. It returns an error if no subscriber by that name exists.
type Sources ¶ added in v1.0.0
type Sources struct {
// contains filtered or unexported fields
}
Sources ...
func NewSources ¶ added in v1.0.0
func NewSources(ctx context.Context, pool *clients.ClientPool, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]SourcesHookFn) (*Sources, error)
func (*Sources) Start ¶ added in v1.0.0
Start begins listening for unpacking requests on the NATS subject specified in the options.
type SourcesHookFn ¶ added in v1.0.0
type SourcesHookFn func(block *events.UnpackSources) error
type Status ¶
type Status int16
Status is an enumerated type (int16) representing the operational state of a subscriber. It indicates whether a subscriber is currently active, inactive, or in any other defined state.
const ( // StatusActive indicates that the subscriber is currently active and operational. // It is engaged in its subscription duties, such as listening for and processing events. StatusActive Status = iota // StatusNotActive indicates that the subscriber is not currently active. It may be // in this state either because it has been stopped or has not yet been started. StatusNotActive )
type Subscriber ¶
type Subscriber interface { // Start initiates the subscription process. It should establish any necessary // connections or routines needed for the subscription to function. Start should // return an error if the subscription fails to initiate properly. Start() error // Stop terminates the subscription. It should cleanly close any connections // and halt any routines associated with the subscription. Stop should ensure // that all resources are released properly. It should return an error if the // subscription fails to stop cleanly. Stop() error // Status returns the current status of the subscription. The status indicates // whether the subscription is active, inactive, or in any other state as defined // by the Status type. This method allows for querying the operational state of // the subscription at any time. Status() Status }
Subscriber is an interface that defines the basic lifecycle and status operations for a subscription. Implementations of this interface can be used to start and stop subscriptions and to query their current status.
type SubscriberType ¶
type SubscriberType string
SubscriberType is a string type that defines various categories or identifiers for subscribers. It is used to distinguish between different types of subscriptions within the system, enabling type-specific handling and management of subscribers.
var AccountsSubscriber SubscriberType = "accounts"
AccountsSubscriber defines the subscriber type for new accounts (addresses).
var ArchiveBlockSubscriber SubscriberType = "archive"
ArchiveBlockSubscriber defines the subscriber type for archive blocks.
var HeadBlockSubscriber SubscriberType = "head"
HeadBlockSubscriber defines the subscriber type for new head blocks.
var SourcesSubscriber SubscriberType = "sources"
SourcesSubscriber defines the subscriber type for new source synchronizations events.
var UnpackerSubscriber SubscriberType = "unpacker"
UnpackerSubscriber defines the subscriber type for new head blocks.
func (SubscriberType) String ¶
func (t SubscriberType) String() string
String returns the string representation of the SubscriberType.
type TxHookFn ¶
type TxHookFn func(block *types.Transaction) (*types.Transaction, error)
type Unpacker ¶
type Unpacker struct {
// contains filtered or unexported fields
}
Unpacker ...
func NewUnpacker ¶
func NewUnpacker(ctx context.Context, pool *clients.ClientPool, sm *state.State, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]UnpackerHookFn) (*Unpacker, error)
func (*Unpacker) Start ¶
Start begins listening for unpacking requests on the NATS subject specified in the options.