subscribers

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package subscribers provides a comprehensive solution for managing subscriptions to Ethereum blockchain events. It supports both real-time monitoring of head block updates and detailed analysis of historical blockchain data through archive blocks. The package facilitates executing custom logic, known as hooks, in response to these events, enabling users to perform real-time actions or process historical data efficiently. It offers a robust framework for the registration, unregistration, and concurrent-safe notification of subscribers, along with foundational types and interfaces for categorization and state management of subscribers within a system. This dual focus on both live and archival blockchain data, combined with the package's flexible subscriber management infrastructure, makes it a versatile tool for applications requiring detailed blockchain event handling and processing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccountHookFn

type AccountHookFn func(block *events.Account) error

type Accounts

type Accounts struct {
	// contains filtered or unexported fields
}

Accounts ...

func NewAccounts

func NewAccounts(ctx context.Context, pool *clients.ClientPool, sm *state.State, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]AccountHookFn) (*Accounts, error)

func (*Accounts) Start

func (b *Accounts) Start() error

Start begins listening for unpacking requests on the NATS subject specified in the options.

func (*Accounts) Status

func (b *Accounts) Status() Status

Status returns the current status of the unpacker subscription.

func (*Accounts) Stop

func (b *Accounts) Stop() error

Stop terminates the unpacker subscription and performs any necessary cleanup.

type ArchiveBlock

type ArchiveBlock struct {
	// contains filtered or unexported fields
}

ArchiveBlock represents a subscription to a range of Ethereum archive blocks. It facilitates the execution of predefined hooks on each block within the specified range.

func NewArchiveBlock

func NewArchiveBlock(ctx context.Context, pool *clients.ClientPool, sm *state.State, opts *options.Subscriber, hooks map[HookType][]BlockHookFn) (*ArchiveBlock, error)

NewArchiveBlock initializes a new subscription for Ethereum archive blocks using the provided context, client pool, subscription options, and hooks. It returns an initialized ArchiveBlock object or an error if initialization fails.

func (*ArchiveBlock) Start

func (b *ArchiveBlock) Start() error

Start begins the subscription process for the specified range of archive blocks. It iterates over each block in the range, retrieves it, and executes the registered hooks. Logs are generated to indicate the process status and any errors encountered.

func (*ArchiveBlock) Status

func (b *ArchiveBlock) Status() Status

Status returns the current status of the archive block subscription.

func (*ArchiveBlock) Stop

func (b *ArchiveBlock) Stop() error

Stop terminates the archive block subscription and performs any necessary cleanup.

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block represents a subscription to blockchain head blocks, encapsulating the logic for starting, managing, and executing hooks on new blocks.

func NewHeadBlock

func NewHeadBlock(ctx context.Context, pool *clients.ClientPool, sm *state.State, opts *options.Subscriber, hooks map[HookType][]BlockHookFn) (*Block, error)

NewHeadBlock initializes a new head block subscription using the provided context, client pool, subscription options, and hooks. It returns an initialized Block object or an error if initialization fails.

func (*Block) Start

func (b *Block) Start() error

Start begins the subscription to new head blocks. It connects to an Ethereum client, subscribes to new head blocks, and executes registered hooks on each new block. Logs information about subscription status and errors encountered during block retrieval or hook execution.

func (*Block) Status

func (b *Block) Status() Status

Status returns the current status of the block subscription.

func (*Block) Stop

func (b *Block) Stop() error

Stop terminates the block subscription and performs any necessary cleanup.

type BlockHookFn

type BlockHookFn func(block *types.Block) (*types.Block, error)

BlockHookFn defines the function signature for hooks that process blocks. Implementations of this function can modify the block and must return the modified block or an error if the processing fails.

type HookType

type HookType string

HookType defines the stage at which a hook should be executed relative to the block processing. It can be either before (PreHook) or after (PostHook) the block is processed.

const (
	PreHook  HookType = "pre"  // PreHook indicates the hook will run before block processing.
	PostHook HookType = "post" // PostHook indicates the hook will run after block processing.
)

PreHook and PostHook constants represent the hook types for pre-processing and post-processing of blocks, respectively.

func (HookType) String

func (h HookType) String() string

String returns the string representation of the HookType.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates subscribers. It supports adding, removing, and notifying subscribers. Operations on Manager are safe for concurrent use by multiple goroutines.

func NewManager

func NewManager(ctx context.Context) (*Manager, error)

NewManager initializes a new Manager with the provided context. The context is used to control the lifecycle of subscribers.

func (*Manager) Close

func (m *Manager) Close() error

Close stops all active subscribers. It ignores subscribers that are not active.

func (*Manager) Exists

func (m *Manager) Exists(name SubscriberType) bool

Exists checks if a subscriber of the given name is already registered. Returns true if the subscriber exists; otherwise, false.

func (*Manager) Get

func (m *Manager) Get(name SubscriberType) (Subscriber, error)

Get retrieves a subscriber by name. Returns an error if the subscriber does not exist.

func (*Manager) List

func (m *Manager) List() map[SubscriberType]Subscriber

List returns a map of all registered subscribers.

func (*Manager) Register

func (m *Manager) Register(name SubscriberType, sub Subscriber) error

Register adds a new subscriber under the given name. It returns an error if a subscriber with the same name already exists.

func (*Manager) Subscribe

func (m *Manager) Subscribe(names ...SubscriberType) error

Subscribe starts the specified subscribers or all if none are specified. It returns an error if any subscriber fails to start.

func (*Manager) UnRegister

func (m *Manager) UnRegister(name SubscriberType) error

UnRegister removes a subscriber identified by name. It returns an error if no subscriber by that name exists.

type Sources added in v1.0.0

type Sources struct {
	// contains filtered or unexported fields
}

Sources ...

func NewSources added in v1.0.0

func NewSources(ctx context.Context, pool *clients.ClientPool, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]SourcesHookFn) (*Sources, error)

func (*Sources) Start added in v1.0.0

func (b *Sources) Start() error

Start begins listening for unpacking requests on the NATS subject specified in the options.

func (*Sources) Status added in v1.0.0

func (b *Sources) Status() Status

Status returns the current status of the unpacker subscription.

func (*Sources) Stop added in v1.0.0

func (b *Sources) Stop() error

Stop terminates the unpacker subscription and performs any necessary cleanup.

type SourcesHookFn added in v1.0.0

type SourcesHookFn func(block *events.UnpackSources) error

type Status

type Status int16

Status is an enumerated type (int16) representing the operational state of a subscriber. It indicates whether a subscriber is currently active, inactive, or in any other defined state.

const (
	// StatusActive indicates that the subscriber is currently active and operational.
	// It is engaged in its subscription duties, such as listening for and processing events.
	StatusActive Status = iota

	// StatusNotActive indicates that the subscriber is not currently active. It may be
	// in this state either because it has been stopped or has not yet been started.
	StatusNotActive
)

type Subscriber

type Subscriber interface {
	// Start initiates the subscription process. It should establish any necessary
	// connections or routines needed for the subscription to function. Start should
	// return an error if the subscription fails to initiate properly.
	Start() error

	// Stop terminates the subscription. It should cleanly close any connections
	// and halt any routines associated with the subscription. Stop should ensure
	// that all resources are released properly. It should return an error if the
	// subscription fails to stop cleanly.
	Stop() error

	// Status returns the current status of the subscription. The status indicates
	// whether the subscription is active, inactive, or in any other state as defined
	// by the Status type. This method allows for querying the operational state of
	// the subscription at any time.
	Status() Status
}

Subscriber is an interface that defines the basic lifecycle and status operations for a subscription. Implementations of this interface can be used to start and stop subscriptions and to query their current status.

type SubscriberType

type SubscriberType string

SubscriberType is a string type that defines various categories or identifiers for subscribers. It is used to distinguish between different types of subscriptions within the system, enabling type-specific handling and management of subscribers.

var AccountsSubscriber SubscriberType = "accounts"

AccountsSubscriber defines the subscriber type for new accounts (addresses).

var ArchiveBlockSubscriber SubscriberType = "archive"

ArchiveBlockSubscriber defines the subscriber type for archive blocks.

var HeadBlockSubscriber SubscriberType = "head"

HeadBlockSubscriber defines the subscriber type for new head blocks.

var SourcesSubscriber SubscriberType = "sources"

SourcesSubscriber defines the subscriber type for new source synchronizations events.

var UnpackerSubscriber SubscriberType = "unpacker"

UnpackerSubscriber defines the subscriber type for new head blocks.

func (SubscriberType) String

func (t SubscriberType) String() string

String returns the string representation of the SubscriberType.

type TxHookFn

type TxHookFn func(block *types.Transaction) (*types.Transaction, error)

type Unpacker

type Unpacker struct {
	// contains filtered or unexported fields
}

Unpacker ...

func NewUnpacker

func NewUnpacker(ctx context.Context, pool *clients.ClientPool, sm *state.State, ncConn *nats.Conn, opts *options.Subscriber, hooks map[HookType][]UnpackerHookFn) (*Unpacker, error)

func (*Unpacker) Start

func (b *Unpacker) Start() error

Start begins listening for unpacking requests on the NATS subject specified in the options.

func (*Unpacker) Status

func (b *Unpacker) Status() Status

Status returns the current status of the unpacker subscription.

func (*Unpacker) Stop

func (b *Unpacker) Stop() error

Stop terminates the unpacker subscription and performs any necessary cleanup.

type UnpackerHookFn

type UnpackerHookFn func(block *events.Unpack) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL