Documentation ¶
Overview ¶
Package ingest provides primitives for building custom ingestion engines.
Very often developers need features that are outside of Horizon's scope. While it provides APIs for building the most common apps, it's not possible to add all possible features. This is why this package was created.
Ledger Backend ¶
Ledger backends are sources of information about Stellar network ledgers. This can be, for example: a Stellar-Core database, (possibly-remote) Captive Stellar-Core instances, or History Archives. Please consult the "ledgerbackend" package docs for more information about each backend.
Warning: Ledger backends provide low-level xdr.LedgerCloseMeta that should not
be used directly unless the developer really understands this data structure. Read on to understand how to use ledger backend in higher level objects.
Readers ¶
Readers are objects that wrap ledger backend and provide higher level, developer friendly APIs for reading ledger data.
Currently there are three types of readers:
- CheckpointChangeReader reads ledger entries from history buckets for a given checkpoint ledger. Allow building state (all accounts, trust lines etc.) at any checkpoint ledger.
- LedgerTransactionReader reads transactions for a given ledger sequence.
- LedgerChangeReader reads all changes to ledger entries created as a result of transactions (fees and meta) and protocol upgrades in a given ledger.
Warning: Readers stream BOTH successful and failed transactions; check transactions status in your application if required.
Tutorial ¶
Refer to the examples below for simple use cases, or check out the README (and its corresponding tutorial/ subfolder) in the repository for a Getting Started guide: https://github.com/TosinShada/monorepo/blob/master/ingest/README.md
Example (Changes) ¶
Example_changes demonstrates how to stream ledger entry changes for a specific ledger using captive stellar-core. Please note that transaction meta IS available when using this backend.
ctx := context.Background() archiveURL := "http://history.stellar.org/prd/core-live/core_live_001" networkPassphrase := network.PublicNetworkPassphrase captiveCoreToml, err := ledgerbackend.NewCaptiveCoreToml(ledgerbackend.CaptiveCoreTomlParams{ NetworkPassphrase: networkPassphrase, HistoryArchiveURLs: []string{archiveURL}, }) if err != nil { panic(err) } // Requires Stellar-Core 13.2.0+ backend, err := ledgerbackend.NewCaptive( ledgerbackend.CaptiveCoreConfig{ BinaryPath: "/bin/stellar-core", NetworkPassphrase: networkPassphrase, HistoryArchiveURLs: []string{archiveURL}, Toml: captiveCoreToml, }, ) if err != nil { panic(err) } sequence := uint32(3) err = backend.PrepareRange(ctx, ledgerbackend.SingleLedgerRange(sequence)) if err != nil { panic(err) } changeReader, err := NewLedgerChangeReader(ctx, backend, networkPassphrase, sequence) if err != nil { panic(err) } for { change, err := changeReader.Read() if err == io.EOF { break } if err != nil { panic(err) } var action string switch { case change.Pre == nil && change.Post != nil: action = "created" case change.Pre != nil && change.Post != nil: action = "updated" case change.Pre != nil && change.Post == nil: action = "removed" } switch change.Type { case xdr.LedgerEntryTypeAccount: var accountEntry xdr.AccountEntry if change.Pre != nil { accountEntry = change.Pre.Data.MustAccount() } else { accountEntry = change.Post.Data.MustAccount() } fmt.Println("account", accountEntry.AccountId.Address(), action) case xdr.LedgerEntryTypeData: fmt.Println("data", action) case xdr.LedgerEntryTypeTrustline: fmt.Println("trustline", action) case xdr.LedgerEntryTypeOffer: fmt.Println("offer", action) default: panic("Unknown type") } }
Output:
Example (Ledgerentrieshistoryarchive) ¶
Example_ledgerentrieshistoryarchive demonstrates how to stream all ledger entries live at specific checkpoint ledger from history archives.
archiveURL := "http://history.stellar.org/prd/core-live/core_live_001" archive, err := historyarchive.Connect( archiveURL, historyarchive.ConnectOptions{Context: context.TODO()}, ) if err != nil { panic(err) } // Ledger must be a checkpoint ledger: (100031+1) mod 64 == 0. reader, err := NewCheckpointChangeReader(context.TODO(), archive, 100031) if err != nil { panic(err) } var accounts, data, trustlines, offers int for { entry, err := reader.Read() if err == io.EOF { break } if err != nil { panic(err) } switch entry.Type { case xdr.LedgerEntryTypeAccount: accounts++ case xdr.LedgerEntryTypeData: data++ case xdr.LedgerEntryTypeTrustline: trustlines++ case xdr.LedgerEntryTypeOffer: offers++ default: panic("Unknown type") } } fmt.Println("accounts", accounts) fmt.Println("data", data) fmt.Println("trustlines", trustlines) fmt.Println("offers", offers)
Output:
Index ¶
- Variables
- type Change
- type ChangeCompactor
- type ChangeReader
- type CheckpointChangeReader
- type LedgerChangeReader
- type LedgerTransaction
- type LedgerTransactionReader
- func (reader *LedgerTransactionReader) Close() error
- func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry
- func (reader *LedgerTransactionReader) GetSequence() uint32
- func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error)
- func (reader *LedgerTransactionReader) Rewind()
- type MockChangeReader
- type StateError
- type StatsChangeProcessor
- type StatsChangeProcessorResults
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNotFound = errors.New("ledger not found")
ErrNotFound is returned when the requested ledger is not found
Functions ¶
This section is empty.
Types ¶
type Change ¶
type Change struct { Type xdr.LedgerEntryType Pre *xdr.LedgerEntry Post *xdr.LedgerEntry }
Change is a developer friendly representation of LedgerEntryChanges. It also provides some helper functions to quickly check if a given change has occurred in an entry.
If an entry is created: Pre is nil and Post is not nil. If an entry is updated: Pre is not nil and Post is not nil. If an entry is removed: Pre is not nil and Post is nil.
func GenesisChange ¶
GenesisChange returns the Change occurring at the genesis ledger (ledgerseq = 1)..
func GetChangesFromLedgerEntryChanges ¶
func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change
GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. Each `update` and `removed` is preceded with `state` and `create` changes are alone, without `state`. The transformation we're doing is to move each change (state/update, state/removed or create) to an array of pre/post pairs. Then: - for create, pre is null and post is a new entry, - for update, pre is previous state and post is the current state, - for removed, pre is previous state and post is null.
stellar-core source: https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582
func (*Change) AccountChangedExceptSigners ¶
AccountChangedExceptSigners returns true if account has changed WITHOUT checking the signers (except master key weight!). In other words, if the only change is connected to signers, this function will return false.
func (*Change) AccountSignersChanged ¶
AccountSignersChanged returns true if account signers have changed. Notice: this will return true on master key changes too!
func (*Change) GetLiquidityPoolType ¶
func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error)
GetLiquidityPoolType returns the liquidity pool type.
func (*Change) LedgerEntryChangeType ¶
func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType
LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
type ChangeCompactor ¶
type ChangeCompactor struct {
// contains filtered or unexported fields
}
ChangeCompactor is a cache of ledger entry changes that squashes all changes within a single ledger. By doing this, it decreases number of DB queries sent to a DB to update the current state of the ledger. It has integrity checks built in so ex. removing an account that was previously removed returns an error. In such case verify.StateError is returned.
It applies changes to the cache using the following algorithm:
- If the change is CREATED it checks if any change connected to given entry is already in the cache. If not, it adds CREATED change. Otherwise, if existing change is: a. CREATED it returns error because we can't add an entry that already exists. b. UPDATED it returns error because we can't add an entry that already exists. c. REMOVED it means that due to previous transitions we want to remove this from a DB what means that it already exists in a DB so we need to update the type of change to UPDATED.
- If the change is UPDATE it checks if any change connected to given entry is already in the cache. If not, it adds UPDATE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB so we need to update the entry but stay with CREATED type. b. UPDATED we simply update it with the new value. c. REMOVED it means that at this point in the ledger the entry is removed so updating it returns an error.
- If the change is REMOVE it checks if any change connected to given entry is already in the cache. If not, it adds REMOVE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB. If it was created and removed in the same ledger it's a noop so we remove entry from the cache. b. UPDATED we simply update it to be a REMOVE change because the UPDATE change means the entry exists in a DB. c. REMOVED it returns error because we can't remove an entry that was already removed.
func NewChangeCompactor ¶
func NewChangeCompactor() *ChangeCompactor
NewChangeCompactor returns a new ChangeCompactor.
func (*ChangeCompactor) AddChange ¶
func (c *ChangeCompactor) AddChange(change Change) error
AddChange adds a change to ChangeCompactor. All changes are stored in memory. To get the final, squashed changes call GetChanges.
Please note that the current ledger capacity in pubnet (max 1000 ops/ledger) makes ChangeCompactor safe to use in terms of memory usage. If the cache takes too much memory, you apply changes returned by GetChanges and create a new ChangeCompactor object to continue ingestion.
func (*ChangeCompactor) GetChanges ¶
func (c *ChangeCompactor) GetChanges() []Change
GetChanges returns a slice of Changes in the cache. The order of changes is random but each change is connected to a separate entry.
func (*ChangeCompactor) Size ¶
func (c *ChangeCompactor) Size() int
Size returns number of ledger entries in the cache.
type ChangeReader ¶
type ChangeReader interface { // Read should return the next `Change` in the leader. If there are no more // changes left it should return an `io.EOF` error. Read() (Change, error) // Close should be called when reading is finished. This is especially // helpful when there are still some changes available so reader can stop // streaming them. Close() error }
ChangeReader provides convenient, streaming access to a sequence of Changes.
type CheckpointChangeReader ¶
type CheckpointChangeReader struct {
// contains filtered or unexported fields
}
CheckpointChangeReader is a ChangeReader which returns Changes from a history archive snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar network at a particular checkpoint ledger sequence.
func NewCheckpointChangeReader ¶
func NewCheckpointChangeReader( ctx context.Context, archive historyarchive.ArchiveInterface, sequence uint32, ) (*CheckpointChangeReader, error)
NewCheckpointChangeReader constructs a new CheckpointChangeReader instance.
The ledger sequence must be a checkpoint ledger. By default (see `historyarchive.ConnectOptions.CheckpointFrequency` for configuring this), its next sequence number would have to be a multiple of 64, e.g. sequence=100031 is a checkpoint ledger, since: (100031+1) mod 64 == 0
func (*CheckpointChangeReader) Close ¶
func (r *CheckpointChangeReader) Close() error
Close should be called when reading is finished.
func (*CheckpointChangeReader) Progress ¶
func (r *CheckpointChangeReader) Progress() float64
Progress returns progress reading all buckets in percents.
func (*CheckpointChangeReader) Read ¶
func (r *CheckpointChangeReader) Read() (Change, error)
Read returns a new ledger entry change on each call, returning io.EOF when the stream ends.
type LedgerChangeReader ¶
type LedgerChangeReader struct { *LedgerTransactionReader // contains filtered or unexported fields }
LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core for a single ledger
func NewLedgerChangeReader ¶
func NewLedgerChangeReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerChangeReader, error)
NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. Note that the returned LedgerChangeReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerChangeReaderFromLedgerCloseMeta ¶
func NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase string, ledger xdr.LedgerCloseMeta) (*LedgerChangeReader, error)
NewLedgerChangeReaderFromLedgerCloseMeta constructs a new LedgerChangeReader instance bound to the given ledger. Note that the returned LedgerChangeReader is not thread safe and should not be shared by multiple goroutines.
func (*LedgerChangeReader) Close ¶
func (r *LedgerChangeReader) Close() error
Close should be called when reading is finished.
func (*LedgerChangeReader) Read ¶
func (r *LedgerChangeReader) Read() (Change, error)
Read returns the next change in the stream. If there are no changes remaining io.EOF is returned as an error.
type LedgerTransaction ¶
type LedgerTransaction struct { Index uint32 Envelope xdr.TransactionEnvelope Result xdr.TransactionResultPair // FeeChanges and UnsafeMeta are low level values, do not use them directly unless // you know what you are doing. // Use LedgerTransaction.GetChanges() for higher level access to ledger // entry changes. FeeChanges xdr.LedgerEntryChanges UnsafeMeta xdr.TransactionMeta }
LedgerTransaction represents the data for a single transaction within a ledger.
func (*LedgerTransaction) GetChanges ¶
func (t *LedgerTransaction) GetChanges() ([]Change, error)
GetChanges returns a developer friendly representation of LedgerEntryChanges. It contains transaction changes and operation changes in that order. If the transaction failed with TxInternalError, operations and txChangesAfter are omitted. It doesn't support legacy TransactionMeta.V=0.
func (*LedgerTransaction) GetFeeChanges ¶
func (t *LedgerTransaction) GetFeeChanges() []Change
GetFeeChanges returns a developer friendly representation of LedgerEntryChanges connected to fees.
func (*LedgerTransaction) GetOperation ¶
func (t *LedgerTransaction) GetOperation(index uint32) (xdr.Operation, bool)
GetOperation returns an operation by index.
func (*LedgerTransaction) GetOperationChanges ¶
func (t *LedgerTransaction) GetOperationChanges(operationIndex uint32) ([]Change, error)
GetOperationChanges returns a developer friendly representation of LedgerEntryChanges. It contains only operation changes.
type LedgerTransactionReader ¶
type LedgerTransactionReader struct {
// contains filtered or unexported fields
}
LedgerTransactionReader reads transactions for a given ledger sequence from a backend. Use NewTransactionReader to create a new instance.
func NewLedgerTransactionReader ¶
func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error)
NewLedgerTransactionReader creates a new TransactionReader instance. Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta ¶
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error)
NewLedgerTransactionReaderFromXdr creates a new TransactionReader instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func (*LedgerTransactionReader) Close ¶
func (reader *LedgerTransactionReader) Close() error
Close should be called when reading is finished. This is especially helpful when there are still some transactions available so reader can stop streaming them.
func (*LedgerTransactionReader) GetHeader ¶
func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry
GetHeader returns the XDR Header data associated with the stored ledger.
func (*LedgerTransactionReader) GetSequence ¶
func (reader *LedgerTransactionReader) GetSequence() uint32
GetSequence returns the sequence number of the ledger data stored by this object.
func (*LedgerTransactionReader) Read ¶
func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error)
Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there are no more transactions to return, an EOF error is returned.
func (*LedgerTransactionReader) Rewind ¶
func (reader *LedgerTransactionReader) Rewind()
Rewind resets the reader back to the first transaction in the ledger
type MockChangeReader ¶
func (*MockChangeReader) Close ¶
func (m *MockChangeReader) Close() error
func (*MockChangeReader) Read ¶
func (m *MockChangeReader) Read() (Change, error)
type StateError ¶
type StateError struct {
// contains filtered or unexported fields
}
StateError is a fatal error indicating that the Change stream produced a result which violates fundamental invariants (e.g. an account transferred more XLM than the account held in its balance).
func NewStateError ¶
func NewStateError(err error) StateError
NewStateError creates a new StateError.
type StatsChangeProcessor ¶
type StatsChangeProcessor struct {
// contains filtered or unexported fields
}
StatsChangeProcessor is a state processors that counts number of changes types and entry types.
func (*StatsChangeProcessor) GetResults ¶
func (p *StatsChangeProcessor) GetResults() StatsChangeProcessorResults
func (*StatsChangeProcessor) ProcessChange ¶
func (p *StatsChangeProcessor) ProcessChange(ctx context.Context, change Change) error
type StatsChangeProcessorResults ¶
type StatsChangeProcessorResults struct { AccountsCreated int64 AccountsUpdated int64 AccountsRemoved int64 ClaimableBalancesCreated int64 ClaimableBalancesUpdated int64 ClaimableBalancesRemoved int64 DataCreated int64 DataUpdated int64 DataRemoved int64 OffersCreated int64 OffersUpdated int64 OffersRemoved int64 TrustLinesCreated int64 TrustLinesUpdated int64 TrustLinesRemoved int64 LiquidityPoolsCreated int64 LiquidityPoolsUpdated int64 LiquidityPoolsRemoved int64 }
StatsChangeProcessorResults contains results after running StatsChangeProcessor.
func (*StatsChangeProcessorResults) Map ¶
func (stats *StatsChangeProcessorResults) Map() map[string]interface{}