appdata

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 4 Imported by: 5

README

App Data

The appdata package defines the basic types for streaming blockchain event and state data to external listeners, with a specific focus on supporting logical decoding and indexing of state.

A blockchain data source should accept a Listener instance and invoke the provided callbacks in the correct order. A downstream listener should provide a Listener instance and perform operations based on the data passed to its callbacks.

Listener Callback Order

Listener callbacks should be called in this order

sequenceDiagram
    actor Source
    actor Target    
    Source ->> Target: Initialize
    Source -->> Target: InitializeModuleSchema
    loop Block
        Source ->> Target: StartBlock
        Source ->> Target: OnBlockHeader
        Source -->> Target: OnTx
        Source -->> Target: OnEvent
        Source -->> Target: OnKVPair
        Source -->> Target: OnObjectUpdate
        Source ->> Target: Commit
    end

Initialize must be called before any other method and should only be invoked once. InitializeModuleSchema should be called at most once for every module with logical data.

Sources will generally only call InitializeModuleSchema and OnObjectUpdate if they have native logical decoding capabilities. Usually, the indexer framework will provide this functionality based on OnKVPair data and schema.HasModuleCodec implementations.

StartBlock and OnBlockHeader should be called only once at the beginning of a block, and Commit should be called only once at the end of a block. The OnTx, OnEvent, OnKVPair and OnObjectUpdate must be called after OnBlockHeader, may be called multiple times within a block and indexers should not assume that the order is logical unless InitializationData.HasEventAlignedWrites is true.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActorKVPairUpdate added in v0.2.0

type ActorKVPairUpdate = struct {
	// Actor is the byte representation of the module or account that is updating the key-value pair.
	Actor []byte

	// StateChanges are key-value pair updates.
	StateChanges []schema.KVPairUpdate
}

ActorKVPairUpdate represents a key-value pair update for a specific module or account.

type AsyncListenerOptions added in v0.2.0

type AsyncListenerOptions struct {
	// Context is the context whose Done() channel listeners use will use to listen for completion to close their
	// goroutine. If it is nil, then context.Background() will be used and goroutines may be leaked.
	Context context.Context

	// BufferSize is the buffer size of the channels to use. It defaults to 0.
	BufferSize int

	// DoneWaitGroup is an optional wait-group that listener goroutines will notify via Add(1) when they are started
	// and Done() after they are canceled and completed.
	DoneWaitGroup *sync.WaitGroup
}

AsyncListenerOptions are options for async listeners and listener mux's.

type BatchablePacket added in v0.2.0

type BatchablePacket interface {
	Packet
	// contains filtered or unexported methods
}

BatchablePacket is the interface that packet types which can be batched implement. All types that implement Packet except CommitData also implement BatchablePacket. CommitData should not be batched because it forces synchronization of asynchronous listeners.

type BlockStage added in v0.3.0

type BlockStage int32

BlockStage represents the stage of block processing for an event.

const (
	// UnknownBlockStage indicates that we do not know the block stage.
	UnknownBlockStage BlockStage = iota

	// PreBlockStage indicates that the event is associated with the pre-block stage.
	PreBlockStage

	// BeginBlockStage indicates that the event is associated with the begin-block stage.
	BeginBlockStage

	// TxProcessingStage indicates that the event is associated with the transaction processing stage.
	TxProcessingStage

	// EndBlockStage indicates that the event is associated with the end-block stage.
	EndBlockStage
)

type CommitData

type CommitData struct{}

CommitData represents commit data. It is empty for now, but fields could be added later.

type Event added in v0.2.0

type Event struct {
	// BlockStage represents the stage of the block at which this event is associated.
	// If the block stage is unknown, it should be set to UnknownBlockStage.
	BlockStage BlockStage

	// BlockNumber is the block number to which this event is associated.
	BlockNumber uint64

	// TxIndex is the 1-based index of the transaction in the block to which this event is associated.
	// If TxIndex is zero, it means that we do not know the transaction index.
	// Otherwise, the index should start with 1.
	TxIndex int32

	// MsgIndex is the 1-based index of the message in the transaction to which this event is associated.
	// If MsgIndex is zero, it means that we do not know the message index.
	// Otherwise, the index should start with 1.
	MsgIndex int32

	// EventIndex is the 1-based index of the event in the message to which this event is associated.
	// If EventIndex is zero, it means that we do not know the event index.
	// Otherwise, the index should start with 1.
	EventIndex int32

	// Type is the type of the event.
	Type string

	// Data lazily returns the JSON representation of the event.
	Data ToJSON

	// Attributes lazily returns the key-value attribute representation of the event.
	Attributes ToEventAttributes
}

Event represents the data for a single event.

type EventAttribute added in v0.2.0

type EventAttribute = struct {
	Key, Value string
}

type EventData

type EventData struct {
	// Events are the events that are received.
	Events []Event
}

EventData represents event data that is passed to a listener when events are received.

type KVPairData

type KVPairData struct {
	Updates []ActorKVPairUpdate
}

KVPairData represents a batch of key-value pair data that is passed to a listener.

type Listener

type Listener struct {
	// InitializeModuleData should be called whenever the blockchain process starts OR whenever
	// logical decoding of a module is initiated. An indexer listening to this event
	// should ensure that they have performed whatever initialization steps (such as database
	// migrations) required to receive OnObjectUpdate events for the given module. If the
	// indexer's schema is incompatible with the module's on-chain schema, the listener should return
	// an error. Module names must conform to the NameFormat regular expression.
	InitializeModuleData func(ModuleInitializationData) error

	// StartBlock is called at the beginning of processing a block.
	StartBlock func(StartBlockData) error

	// OnTx is called when a transaction is received.
	OnTx func(TxData) error

	// OnEvent is called when an event is received.
	OnEvent func(EventData) error

	// OnKVPair is called when a key-value has been written to the store for a given module.
	// Module names must conform to the NameFormat regular expression.
	OnKVPair func(updates KVPairData) error

	// OnObjectUpdate is called whenever an object is updated in a module's state. This is only called
	// when logical data is available. It should be assumed that the same data in raw form
	// is also passed to OnKVPair. Module names must conform to the NameFormat regular expression.
	OnObjectUpdate func(ObjectUpdateData) error

	// Commit is called when state is committed, usually at the end of a block. Any
	// indexers should commit their data when this is called and return an error if
	// they are unable to commit. Data sources MUST call Commit when data is committed,
	// otherwise it should be assumed that indexers have not persisted their state.
	// Commit is designed to support async processing so that implementations may return
	// a completion callback to wait for commit to complete. Callers should first check
	// if err is nil and then if it is, check if completionCallback is nil and if not
	// call it and check for an error. Commit should be designed to be non-blocking if
	// possible, but calling completionCallback should be blocking.
	// When listener processing is pushed into background go routines using AsyncListener
	// or AsyncListenerMux, the Commit completion callback will synchronize the processing of
	// all listeners. Producers that do not want to block on Commit in a given block
	// can delay calling the completion callback until the start of the next block to
	// give listeners time to complete their processing.
	Commit func(CommitData) (completionCallback func() error, err error)
	// contains filtered or unexported fields
}

Listener is an interface that defines methods for listening to both raw and logical blockchain data. It is valid for any of the methods to be nil, in which case the listener will not be called for that event. Listeners should understand the guarantees that are provided by the source they are listening to and understand which methods will or will not be called. For instance, most blockchains will not do logical decoding of data out of the box, so the InitializeModuleData and OnObjectUpdate methods will not be called. These methods will only be called when listening logical decoding is setup.

func AsyncListener added in v0.2.0

func AsyncListener(opts AsyncListenerOptions, listener Listener) Listener

AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously in a separate go routine. The listener that is returned will return nil for all methods including Commit and an error or nil will only be returned when the callback returned by Commit is called. Thus Commit() can be used as a synchronization and error checking mechanism. The go routine that is being used for listening will exit when context.Done() returns and no more events will be received by the listener. bufferSize is the size of the buffer for the channel that is used to send events to the listener.

func AsyncListenerMux added in v0.2.0

func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener

AsyncListenerMux is a convenience function that calls AsyncListener for each listener with the provided options and combines them using ListenerMux.

func ListenerMux added in v0.2.0

func ListenerMux(listeners ...Listener) Listener

ListenerMux returns a listener that forwards received events to all the provided listeners in order. A callback is only registered if a non-nil callback is present in at least one of the listeners.

func PacketForwarder added in v0.2.0

func PacketForwarder(f func(Packet) error) Listener

PacketForwarder creates a listener which listens to all callbacks and forwards all packets to the provided function. This is intended to be used primarily for tests and debugging.

func (Listener) SendPacket

func (l Listener) SendPacket(p Packet) error

SendPacket sends a packet to a listener invoking the appropriate callback for this packet if one is registered.

type ModuleInitializationData

type ModuleInitializationData struct {
	// ModuleName is the name of the module.
	ModuleName string

	// Schema is the schema of the module.
	Schema schema.ModuleSchema
}

ModuleInitializationData represents data for related to module initialization, in particular the module's schema.

type ObjectUpdateData

type ObjectUpdateData struct {
	// ModuleName is the name of the module that the update corresponds to.
	ModuleName string

	// Updates are the object updates.
	Updates []schema.StateObjectUpdate
}

ObjectUpdateData represents object update data that is passed to a listener.

type Packet

type Packet interface {
	// contains filtered or unexported methods
}

Packet is the interface that all listener data structures implement so that this data can be "packetized" and processed in a stream, possibly asynchronously. Valid implementations are ModuleInitializationData, StartBlockData, TxData, EventData, KVPairData, ObjectUpdateData, and CommitData.

type PacketBatch added in v0.2.0

type PacketBatch []BatchablePacket

PacketBatch is a batch of packets that can be sent to a listener. If listener processing is asynchronous, the batch of packets will be sent all at once in a single operation which can be more efficient than sending each packet individually.

type StartBlockData

type StartBlockData struct {
	// Height is the height of the block.
	Height uint64

	// Bytes is the raw byte representation of the block header. It may be nil if the source does not provide it.
	HeaderBytes ToBytes

	// JSON is the JSON representation of the block header. It should generally be a JSON object.
	// It may be nil if the source does not provide it.
	HeaderJSON ToJSON
}

StartBlockData represents the data that is passed to a listener when a block is started.

type ToBytes

type ToBytes = func() ([]byte, error)

ToBytes is a function that lazily returns the raw byte representation of data.

type ToEventAttributes added in v0.2.0

type ToEventAttributes = func() ([]EventAttribute, error)

ToEventAttributes is a function that lazily returns the key-value attribute representation of an event.

type ToJSON

type ToJSON = func() (json.RawMessage, error)

ToJSON is a function that lazily returns the JSON representation of data.

type TxData

type TxData struct {
	// BlockNumber is the block number to which this event is associated.
	BlockNumber uint64

	// TxIndex is the index of the transaction in the block.
	TxIndex int32

	// Bytes is the raw byte representation of the transaction.
	Bytes ToBytes

	// JSON is the JSON representation of the transaction. It should generally be a JSON object.
	JSON ToJSON
}

TxData represents the raw transaction data that is passed to a listener.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL