ingest

package
v0.0.0-...-2e41b90 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0, Apache-2.0 Imports: 15 Imported by: 0

README

Ingestion Library

The ingest package provides primitives for building custom ingestion engines.

Very often, developers need features that are outside of Horizon's scope. While it provides APIs for building the most common applications, it's not possible to add all possible features. That's why this package was created.

Architecture

From a high level, the ingestion library is broken down into a few modular components:

                  [ Processors ]
                        |
                       / \
                      /   \
                     /     \
              [Change]      [Transaction]
                 |               |
             |---+---|           |
       Checkpoint Ledger      Ledger
         Change   Change    Transaction
         Reader   Reader      Reader

                [ Ledger Backend ]
                        |
                     one of...
                        |
          --------|-----+------|----------|
         |        |            |          |
      Captive  Database      Remote      etc.
       Core                 Captive
                             Core 

This is described in a little more detail in doc.go, its accompanying examples, the documentation within this package, and the rest of this tutorial.

Hello, World!

As is tradition, we'll start with a simplistic example that ingests a single ledger from the network. We're immediately faced with a decision, though: What's the backend? We'll use a Captive Stellar-Core backend in this example because it requires (little-to-)no setup, but there are couple of alternatives available. You could also use:

  • a database (via NewDatabaseBackend()), which would ingest ledgers stored in a Stellar-Core database, or

  • a remote Captive Core instance (via NewRemoteCaptive()), which works much like Captive Core, but points to an instance that isn't (necessarily) running locally.

With that in mind, here's a minimalist example of the ingestion library:

package main

import (
	"context"
	"fmt"

	backends "github.com/TosinShada/monorepo/ingest/ledgerbackend"
)

func main() {
	ctx := context.Background()
	backend, err := backends.NewCaptive(config)
	panicIf(err)
	defer backend.Close()

	// Prepare a single ledger to be ingested,
	err = backend.PrepareRange(ctx, backends.BoundedRange(123456, 123456))
	panicIf(err)

	// then retrieve it:
	ledger, err := backend.GetLedger(ctx, 123456)
	panicIf(err)

	// Now `ledger` is a raw `xdr.LedgerCloseMeta` object containing the
	// transactions contained within this ledger.
	fmt.Printf("\nHello, Sequence %d.\n", ledger.LedgerSequence())
}

(The panicIf function is defined in the footnotes; it's used here for error-checking brevity.)

Notice that the mysterious config variable above isn't defined. This will be environment-specific and users should consult both the Captive Core documentation and the config docs directly for more details if they want to use this backend in production. For now, though, we'll have some hardcoded values for the SDF testnet:

networkPassphrase := "Test SDF Network ; September 2015"
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreToml(
	ledgerbackend.CaptiveCoreTomlParams{
		NetworkPassphrase:  networkPassphrase,
		HistoryArchiveURLs: []string{
			"https://history.stellar.org/prd/core-testnet/core_testnet_001",
		},
	})
panicIf(err)

config := ledgerbackend.CaptiveCoreConfig{
	// Change these based on your environment:
	BinaryPath:         "/usr/bin/stellar-core",
	NetworkPassphrase:  networkPassphrase,
	HistoryArchiveURLs: archiveURLs,
	Toml:               captiveCoreToml,
}

(Again, see the format of the stub file, etc. in the linked docs.)

Running this should dump a ton of logs while Captive Core boots up, downloads a history archive, and ultimately pops up the ledger sequence number we ingested:

$ go run ./example.go
INFO[...] default: Config from /tmp/captive-stellar-core365405852/stellar-core.conf  pid=20574
INFO[...] default: RUN_STANDALONE enabled in configuration file - node will not function properly with most networks  pid=20574
INFO[...] default: Generated QUORUM_SET: {              pid=20574
INFO[...] "t" : 2,                                      pid=20574
INFO[...] "v" : [ "sdf_testnet_2", "sdf_testnet_3", "sdf_testnet_1" ]  pid=20574
INFO[...] }                                             pid=20574
INFO[...] default: Assigning calculated value of 1 to FAILURE_SAFETY  pid=20574
INFO[...] Database: Connecting to: sqlite3://:memory:   pid=20574
INFO[...] SCP: LocalNode::LocalNode@GCVAA qSet: 59d361  pid=20574
INFO[...] default: *                                    pid=20574
INFO[...] default: * The database has been initialized  pid=20574
INFO[...] default: *                                    pid=20574
INFO[...] Database: Applying DB schema upgrade to version 13  pid=20574
INFO[...] Database: Adding column 'ledgerext' to table 'accounts'  pid=20574
...
INFO[...] Ledger: Established genesis ledger, closing   pid=20574
INFO[...] Ledger: Root account seed: SDHOAMBNLGCE2MV5ZKIVZAQD3VCLGP53P3OBSBI6UN5L5XZI5TKHFQL4  pid=20574
INFO[...] default: *                                    pid=20574
INFO[...] default: * The next launch will catchup from the network afresh.  pid=20574
INFO[...] default: *                                    pid=20574
INFO[...] default: Application destructing              pid=20574
INFO[...] default: Application destroyed                pid=20574
...
INFO[...] History: Starting catchup with configuration: pid=20574
INFO[...] lastClosedLedger: 1                           pid=20574
INFO[...] toLedger: 123457                              pid=20574
INFO[...] count: 2                                      pid=20574
INFO[...] History: Catching up to ledger 123457: Downloading state file history/00/01/e2/history-0001e27f.json for ledger 123519  pid=20574
...
INFO[...] History: Catching up to ledger 123457: downloading and verifying buckets: 16/17 (94%)  pid=20574
INFO[...] History: Verifying bucket d4db982884941c0b82422996e26ae0778b4a85385ef657ffacee9b11adf72882  pid=20574
INFO[...] History: Catching up to ledger 123457: Succeeded: download-verify-buckets : 17/17 children completed  pid=20574
INFO[...] History: Applying buckets                     pid=20574
INFO[...] History: Catching up to ledger 123457: Applying buckets 0%. Currently on level 9  pid=20574
...
INFO[...] Bucket: Bucket-apply: 158366 entries in 17.12MB/17.12MB in 17/17 files (100%)  pid=20574
INFO[...] History: Catching up to ledger 123457: Applying buckets 100%. Currently on level 0  pid=20574
INFO[...] History: ApplyBuckets : done, restarting merges  pid=20574
INFO[...] History: Catching up to ledger 123457: Succeeded: download-verify-apply-buckets  pid=20574
INFO[...] History: Downloading, unzipping and applying transactions for checkpoint 123519  pid=20574
INFO[...] History: Catching up to ledger 123457: Download & apply checkpoints: num checkpoints left to apply:1 (0% done)  pid=20574

Hello, Ledger #123456.

There's obviously much, much more we can do with the ingestion library. Let's work through some more comprehensive examples.

Example: Ledger Statistics

In this section, we'll demonstrate how to combine a backend with a reader to actually learn something meaningful about the Stellar network. Again, we'll use a specific backend here (Captive Core, again), but the processing can be done with any of them.

More specifically, we're going to analyze the ledgers and track some statistics about the success/failure of transactions and their relative operations using LedgerTransactionReader. While this is technically doable by manipulating the Horizon API and some fancy JSON parsing, it serves as a useful yet concise demonstration of the ingestion library's features.

Preamble

Let's get the boilerplate out of the way first. Again, we presume config is some sensible Captive Core configuration.

package main

import (
	"context"
	"fmt"
	"io"

	"github.com/sirupsen/logrus"
	"github.com/TosinShada/monorepo/ingest"
	backends "github.com/TosinShada/monorepo/ingest/ledgerbackend"
	"github.com/TosinShada/monorepo/support/log"
)

func statistics() {
	ctx := context.Background()
	// Only log errors from the backend to keep output cleaner.
	lg := log.New()
	lg.SetLevel(logrus.ErrorLevel)
	config.Log = lg

	backend, err := backends.NewCaptive(config)
	panicIf(err)
	defer backend.Close()

	// ...

Reading Transactions

Now, let's identify a range of ledgers we wish to process. For simplicity, let's work on the first 10,000 ledgers on the network.

	// Prepare a range to be ingested:
	var startingSeq uint32 = 2 // can't start with genesis ledger
	var ledgersToRead uint32 = 10000

	fmt.Printf("Preparing range (%d ledgers)...\n", ledgersToRead)
	ledgerRange := backends.BoundedRange(startingSeq, startingSeq+ledgersToRead)
	err = backend.PrepareRange(ctx, ledgerRange)
	panicIf(err)

This part will take a bit of time as Captive Core (or whatever backend) processes these ledgers and prepares them for ingestion.

Now, we'll actually use a LedgerTransactionReader object to use the backend and read the transactions ledger by ledger. It takes the backend, the network passphrase, and the ledger you'd like to process as parameters, giving you back an object that returns raw transaction objects row by row.

	// These are the statistics that we're tracking.
	var successfulTransactions, failedTransactions int
	var operationsInSuccessful, operationsInFailed int

	for seq := startingSeq; seq <= startingSeq+ledgersToRead; seq++ {
		fmt.Printf("Processed ledger %d...\r", seq)

		txReader, err := ingest.NewLedgerTransactionReader(
			ctx, backend, config.NetworkPassphrase, seq,
		)
		panicIf(err)
		defer txReader.Close()

Each ledger likely has many transactions, so we nest in another loop to process them all:

		// Read each transaction within the ledger, extract its operations, and
		// accumulate the statistics we're interested in.
		for {
			tx, err := txReader.Read()
			if err == io.EOF {
				break
			}
			panicIf(err)

			envelope := tx.Envelope
			operationCount := len(envelope.Operations())
			if tx.Result.Successful() {
				successfulTransactions++
				operationsInSuccessful += operationCount
			} else {
				failedTransactions++
				operationsInFailed += operationCount
			}
		}
	} // outer loop

And that's it! We can print the statistics out of interest:

	fmt.Println("\nDone. Results:")
	fmt.Printf("  - total transactions: %d\n", successfulTransactions+failedTransactions)
	fmt.Printf("  - succeeded / failed: %d / %d\n", successfulTransactions, failedTransactions)
	fmt.Printf("  - total operations:   %d\n", operationsInSuccessful+operationsInFailed)
	fmt.Printf("  - succeeded / failed: %d / %d\n", operationsInSuccessful, operationsInFailed)
} // end of main

As of this writing, the stats are as follows:

Results:
  - total transactions: 24159
  - succeeded / failed: 16037 / 8122
  - total operations:   33845
  - succeeded / failed: 25387 / 8458

The full, runnable example is available here.

Example: Feature Popularity

In this example, we'll leverage the CheckpointChangeReader to determine the popularity of a feature introduced in Protocol 15: claimable balances. Specifically, we'll be investigating how many claimable balances were created in an arbitrary ledger range.

Let's begin. As before, there's a bit of boilerplate necessary. There's only a single additional import necessary relative to the previous Preamble. Since we're working with checkpoint ledgers, history archives come into play:

import "github.com/TosinShada/monorepo/historyarchive"

This time, we don't need a LedgerBackend instance whatsoever. The ledger changes we want to process will be fed into the reader through a different means. In our example, the history archives have the droids ledgers that we are looking for.

History Archive Connections

First thing's first: we need to establish a connection to a history archive.

	// Open a history archive using our existing configuration details.
	historyArchive, err := historyarchive.Connect(
		config.HistoryArchiveURLs[0],	// assumes a CaptiveCoreConfig
		historyarchive.ConnectOptions{
			NetworkPassphrase: config.NetworkPassphrase,
			S3Region:          "us-west-1",
			UnsignedRequests:  false,
		},
	)
	panicIf(err)

Tracking Changes

Each history archive contains the current cumulative state of the entire network.

Now we can use the history archive to actually read in all of the changes that have accumulated in the entire network by a particular checkpoint.

	// First, we need to establish a safe fallback in case of any problems
	// during the history archive download+processing, so we'll set a 30-second
	// timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	reader, err := ingest.NewCheckpointChangeReader(ctx, historyArchive, 123455)
	panicIf(err)

In our examples, we refer to the testnet, whose archives are much smaller. When using the pubnet, a 30 minute timeout may be more appropriate (depending on system specs): Horizon takes around 15-20 minutes to process pubnet history archives.

By default, checkpoints occur every 64 ledgers (see historyarchive.ConnectOptions for changing this). More specifically, given ledger n, if n+1 mod 64 == 0, then n is a checkpoint ledger. Alternatively, this is when n*64 - 1 for n = 1, 2, 3, ... and so on. This is true above for n == 123455.

Since history archives store global cumulative state, our ChangeReader will report every entry as being "new", reading out a list of all ledger entries. We can then process them and establish how many claimable balances have been created in the testnet's lifetime:

	entries, newCBs := 0, 0
	for {
		entry, err := reader.Read()
		if err == io.EOF {
			break
		}
		panicIf(err)

		entries++

		switch entry.Type {
		case xdr.LedgerEntryTypeClaimableBalance:
			newCBs++
		// these are included for completeness of the demonstration
		case xdr.LedgerEntryTypeAccount:
		case xdr.LedgerEntryTypeData:
		case xdr.LedgerEntryTypeTrustline:
		case xdr.LedgerEntryTypeOffer:
		default:
			panic(fmt.Errorf("Unknown type: %+v", entry.Type))
		}
	}

	fmt.Printf("%d/%d entries were claimable balances\n", newCBs, entries)
} // end of main()

Snippets

This section outlines a brief collection of common things you may want to do with the library. We assume a very generic backend variable where necessary that is one of the aforementioned LedgerBackend instances to avoid boilerplate.

Controlling LedgerBackend log verbosity

Certain backends (like Captive Core) can be very noisy; they will log to standard output by default at the "Info" level.

You can suppress many logs by changing the level to only print warnings and errors:

package main

import (
  ingest "github.com/TosinShada/monorepo/ingest/ledgerbackend"
  "github.com/TosinShada/monorepo/support/log"
  "github.com/sirupsen/logrus"
)

func main() {
  lg := log.New()
  lg.SetLevel(logrus.WarnLevel)
  config.Log = lg // assume config is otherwise predefined

  backend, err := ingest.NewCaptive(config) // (or other backend)
  // ...
}

Or even disable output entirely by redirecting to ioutil.Discard:

lg.Entry.Logger.Out = ioutil.Discard
Footnotes
  1. The minimalist error handler (if panicking counts as "handling" an error) panicIf used throughout this tutorial is defined simply as:
func panicIf(err error) {
	if err != nil {
		panic(err)
	}
}

Please don't use it in production code; it's provided here for completeness, convenience, and brevity of examples.

  1. Since the Stellar testnet undergoes periodic resets, the example outputs from various sections (especially regarding network statistics) will not always be accurate.

  2. It's worth noting that even though the second example could also be done by using the LedgerTransactionReader and inspecting the individual operations, that'd be bit redundant as far as examples go.

Documentation

Overview

Package ingest provides primitives for building custom ingestion engines.

Very often developers need features that are outside of Horizon's scope. While it provides APIs for building the most common apps, it's not possible to add all possible features. This is why this package was created.

Ledger Backend

Ledger backends are sources of information about Stellar network ledgers. This can be, for example: a Stellar-Core database, (possibly-remote) Captive Stellar-Core instances, or History Archives. Please consult the "ledgerbackend" package docs for more information about each backend.

Warning: Ledger backends provide low-level xdr.LedgerCloseMeta that should not

be used directly unless the developer really understands this data
structure. Read on to understand how to use ledger backend in higher
level objects.

Readers

Readers are objects that wrap ledger backend and provide higher level, developer friendly APIs for reading ledger data.

Currently there are three types of readers:

  • CheckpointChangeReader reads ledger entries from history buckets for a given checkpoint ledger. Allow building state (all accounts, trust lines etc.) at any checkpoint ledger.
  • LedgerTransactionReader reads transactions for a given ledger sequence.
  • LedgerChangeReader reads all changes to ledger entries created as a result of transactions (fees and meta) and protocol upgrades in a given ledger.

Warning: Readers stream BOTH successful and failed transactions; check transactions status in your application if required.

Tutorial

Refer to the examples below for simple use cases, or check out the README (and its corresponding tutorial/ subfolder) in the repository for a Getting Started guide: https://github.com/TosinShada/monorepo/blob/master/ingest/README.md

Example (Changes)

Example_changes demonstrates how to stream ledger entry changes for a specific ledger using captive stellar-core. Please note that transaction meta IS available when using this backend.

ctx := context.Background()
archiveURL := "http://history.stellar.org/prd/core-live/core_live_001"
networkPassphrase := network.PublicNetworkPassphrase

captiveCoreToml, err := ledgerbackend.NewCaptiveCoreToml(ledgerbackend.CaptiveCoreTomlParams{
	NetworkPassphrase:  networkPassphrase,
	HistoryArchiveURLs: []string{archiveURL},
})
if err != nil {
	panic(err)
}

// Requires Stellar-Core 13.2.0+
backend, err := ledgerbackend.NewCaptive(
	ledgerbackend.CaptiveCoreConfig{
		BinaryPath:         "/bin/stellar-core",
		NetworkPassphrase:  networkPassphrase,
		HistoryArchiveURLs: []string{archiveURL},
		Toml:               captiveCoreToml,
	},
)
if err != nil {
	panic(err)
}

sequence := uint32(3)

err = backend.PrepareRange(ctx, ledgerbackend.SingleLedgerRange(sequence))
if err != nil {
	panic(err)
}

changeReader, err := NewLedgerChangeReader(ctx, backend, networkPassphrase, sequence)
if err != nil {
	panic(err)
}

for {
	change, err := changeReader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		panic(err)
	}

	var action string
	switch {
	case change.Pre == nil && change.Post != nil:
		action = "created"
	case change.Pre != nil && change.Post != nil:
		action = "updated"
	case change.Pre != nil && change.Post == nil:
		action = "removed"
	}

	switch change.Type {
	case xdr.LedgerEntryTypeAccount:
		var accountEntry xdr.AccountEntry
		if change.Pre != nil {
			accountEntry = change.Pre.Data.MustAccount()
		} else {
			accountEntry = change.Post.Data.MustAccount()
		}
		fmt.Println("account", accountEntry.AccountId.Address(), action)
	case xdr.LedgerEntryTypeData:
		fmt.Println("data", action)
	case xdr.LedgerEntryTypeTrustline:
		fmt.Println("trustline", action)
	case xdr.LedgerEntryTypeOffer:
		fmt.Println("offer", action)
	default:
		panic("Unknown type")
	}
}
Output:

Example (Ledgerentrieshistoryarchive)

Example_ledgerentrieshistoryarchive demonstrates how to stream all ledger entries live at specific checkpoint ledger from history archives.

archiveURL := "http://history.stellar.org/prd/core-live/core_live_001"

archive, err := historyarchive.Connect(
	archiveURL,
	historyarchive.ConnectOptions{Context: context.TODO()},
)
if err != nil {
	panic(err)
}

// Ledger must be a checkpoint ledger: (100031+1) mod 64 == 0.
reader, err := NewCheckpointChangeReader(context.TODO(), archive, 100031)
if err != nil {
	panic(err)
}

var accounts, data, trustlines, offers int
for {
	entry, err := reader.Read()
	if err == io.EOF {
		break
	}
	if err != nil {
		panic(err)
	}

	switch entry.Type {
	case xdr.LedgerEntryTypeAccount:
		accounts++
	case xdr.LedgerEntryTypeData:
		data++
	case xdr.LedgerEntryTypeTrustline:
		trustlines++
	case xdr.LedgerEntryTypeOffer:
		offers++
	default:
		panic("Unknown type")
	}
}

fmt.Println("accounts", accounts)
fmt.Println("data", data)
fmt.Println("trustlines", trustlines)
fmt.Println("offers", offers)
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotFound = errors.New("ledger not found")

ErrNotFound is returned when the requested ledger is not found

Functions

This section is empty.

Types

type Change

type Change struct {
	Type xdr.LedgerEntryType
	Pre  *xdr.LedgerEntry
	Post *xdr.LedgerEntry
}

Change is a developer friendly representation of LedgerEntryChanges. It also provides some helper functions to quickly check if a given change has occurred in an entry.

If an entry is created: Pre is nil and Post is not nil. If an entry is updated: Pre is not nil and Post is not nil. If an entry is removed: Pre is not nil and Post is nil.

func GenesisChange

func GenesisChange(networkPassPhrase string) Change

GenesisChange returns the Change occurring at the genesis ledger (ledgerseq = 1)..

func GetChangesFromLedgerEntryChanges

func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change

GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. Each `update` and `removed` is preceded with `state` and `create` changes are alone, without `state`. The transformation we're doing is to move each change (state/update, state/removed or create) to an array of pre/post pairs. Then: - for create, pre is null and post is a new entry, - for update, pre is previous state and post is the current state, - for removed, pre is previous state and post is null.

stellar-core source: https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582

func (*Change) AccountChangedExceptSigners

func (c *Change) AccountChangedExceptSigners() (bool, error)

AccountChangedExceptSigners returns true if account has changed WITHOUT checking the signers (except master key weight!). In other words, if the only change is connected to signers, this function will return false.

func (*Change) AccountSignersChanged

func (c *Change) AccountSignersChanged() bool

AccountSignersChanged returns true if account signers have changed. Notice: this will return true on master key changes too!

func (*Change) GetLiquidityPoolType

func (c *Change) GetLiquidityPoolType() (xdr.LiquidityPoolType, error)

GetLiquidityPoolType returns the liquidity pool type.

func (*Change) LedgerEntryChangeType

func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType

LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.

type ChangeCompactor

type ChangeCompactor struct {
	// contains filtered or unexported fields
}

ChangeCompactor is a cache of ledger entry changes that squashes all changes within a single ledger. By doing this, it decreases number of DB queries sent to a DB to update the current state of the ledger. It has integrity checks built in so ex. removing an account that was previously removed returns an error. In such case verify.StateError is returned.

It applies changes to the cache using the following algorithm:

  1. If the change is CREATED it checks if any change connected to given entry is already in the cache. If not, it adds CREATED change. Otherwise, if existing change is: a. CREATED it returns error because we can't add an entry that already exists. b. UPDATED it returns error because we can't add an entry that already exists. c. REMOVED it means that due to previous transitions we want to remove this from a DB what means that it already exists in a DB so we need to update the type of change to UPDATED.
  2. If the change is UPDATE it checks if any change connected to given entry is already in the cache. If not, it adds UPDATE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB so we need to update the entry but stay with CREATED type. b. UPDATED we simply update it with the new value. c. REMOVED it means that at this point in the ledger the entry is removed so updating it returns an error.
  3. If the change is REMOVE it checks if any change connected to given entry is already in the cache. If not, it adds REMOVE change. Otherwise, if existing change is: a. CREATED it means that due to previous transitions we want to create this in a DB what means that it doesn't exist in a DB. If it was created and removed in the same ledger it's a noop so we remove entry from the cache. b. UPDATED we simply update it to be a REMOVE change because the UPDATE change means the entry exists in a DB. c. REMOVED it returns error because we can't remove an entry that was already removed.

func NewChangeCompactor

func NewChangeCompactor() *ChangeCompactor

NewChangeCompactor returns a new ChangeCompactor.

func (*ChangeCompactor) AddChange

func (c *ChangeCompactor) AddChange(change Change) error

AddChange adds a change to ChangeCompactor. All changes are stored in memory. To get the final, squashed changes call GetChanges.

Please note that the current ledger capacity in pubnet (max 1000 ops/ledger) makes ChangeCompactor safe to use in terms of memory usage. If the cache takes too much memory, you apply changes returned by GetChanges and create a new ChangeCompactor object to continue ingestion.

func (*ChangeCompactor) GetChanges

func (c *ChangeCompactor) GetChanges() []Change

GetChanges returns a slice of Changes in the cache. The order of changes is random but each change is connected to a separate entry.

func (*ChangeCompactor) Size

func (c *ChangeCompactor) Size() int

Size returns number of ledger entries in the cache.

type ChangeReader

type ChangeReader interface {
	// Read should return the next `Change` in the leader. If there are no more
	// changes left it should return an `io.EOF` error.
	Read() (Change, error)
	// Close should be called when reading is finished. This is especially
	// helpful when there are still some changes available so reader can stop
	// streaming them.
	Close() error
}

ChangeReader provides convenient, streaming access to a sequence of Changes.

type CheckpointChangeReader

type CheckpointChangeReader struct {
	// contains filtered or unexported fields
}

CheckpointChangeReader is a ChangeReader which returns Changes from a history archive snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar network at a particular checkpoint ledger sequence.

func NewCheckpointChangeReader

func NewCheckpointChangeReader(
	ctx context.Context,
	archive historyarchive.ArchiveInterface,
	sequence uint32,
) (*CheckpointChangeReader, error)

NewCheckpointChangeReader constructs a new CheckpointChangeReader instance.

The ledger sequence must be a checkpoint ledger. By default (see `historyarchive.ConnectOptions.CheckpointFrequency` for configuring this), its next sequence number would have to be a multiple of 64, e.g. sequence=100031 is a checkpoint ledger, since: (100031+1) mod 64 == 0

func (*CheckpointChangeReader) Close

func (r *CheckpointChangeReader) Close() error

Close should be called when reading is finished.

func (*CheckpointChangeReader) Progress

func (r *CheckpointChangeReader) Progress() float64

Progress returns progress reading all buckets in percents.

func (*CheckpointChangeReader) Read

func (r *CheckpointChangeReader) Read() (Change, error)

Read returns a new ledger entry change on each call, returning io.EOF when the stream ends.

type LedgerChangeReader

type LedgerChangeReader struct {
	*LedgerTransactionReader
	// contains filtered or unexported fields
}

LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core for a single ledger

func NewLedgerChangeReader

func NewLedgerChangeReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerChangeReader, error)

NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. Note that the returned LedgerChangeReader is not thread safe and should not be shared by multiple goroutines.

func NewLedgerChangeReaderFromLedgerCloseMeta

func NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase string, ledger xdr.LedgerCloseMeta) (*LedgerChangeReader, error)

NewLedgerChangeReaderFromLedgerCloseMeta constructs a new LedgerChangeReader instance bound to the given ledger. Note that the returned LedgerChangeReader is not thread safe and should not be shared by multiple goroutines.

func (*LedgerChangeReader) Close

func (r *LedgerChangeReader) Close() error

Close should be called when reading is finished.

func (*LedgerChangeReader) Read

func (r *LedgerChangeReader) Read() (Change, error)

Read returns the next change in the stream. If there are no changes remaining io.EOF is returned as an error.

type LedgerTransaction

type LedgerTransaction struct {
	Index    uint32
	Envelope xdr.TransactionEnvelope
	Result   xdr.TransactionResultPair
	// FeeChanges and UnsafeMeta are low level values, do not use them directly unless
	// you know what you are doing.
	// Use LedgerTransaction.GetChanges() for higher level access to ledger
	// entry changes.
	FeeChanges xdr.LedgerEntryChanges
	UnsafeMeta xdr.TransactionMeta
}

LedgerTransaction represents the data for a single transaction within a ledger.

func (*LedgerTransaction) GetChanges

func (t *LedgerTransaction) GetChanges() ([]Change, error)

GetChanges returns a developer friendly representation of LedgerEntryChanges. It contains transaction changes and operation changes in that order. If the transaction failed with TxInternalError, operations and txChangesAfter are omitted. It doesn't support legacy TransactionMeta.V=0.

func (*LedgerTransaction) GetFeeChanges

func (t *LedgerTransaction) GetFeeChanges() []Change

GetFeeChanges returns a developer friendly representation of LedgerEntryChanges connected to fees.

func (*LedgerTransaction) GetOperation

func (t *LedgerTransaction) GetOperation(index uint32) (xdr.Operation, bool)

GetOperation returns an operation by index.

func (*LedgerTransaction) GetOperationChanges

func (t *LedgerTransaction) GetOperationChanges(operationIndex uint32) ([]Change, error)

GetOperationChanges returns a developer friendly representation of LedgerEntryChanges. It contains only operation changes.

type LedgerTransactionReader

type LedgerTransactionReader struct {
	// contains filtered or unexported fields
}

LedgerTransactionReader reads transactions for a given ledger sequence from a backend. Use NewTransactionReader to create a new instance.

func NewLedgerTransactionReader

func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error)

NewLedgerTransactionReader creates a new TransactionReader instance. Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.

func NewLedgerTransactionReaderFromLedgerCloseMeta

func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error)

NewLedgerTransactionReaderFromXdr creates a new TransactionReader instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.

func (*LedgerTransactionReader) Close

func (reader *LedgerTransactionReader) Close() error

Close should be called when reading is finished. This is especially helpful when there are still some transactions available so reader can stop streaming them.

func (*LedgerTransactionReader) GetHeader

GetHeader returns the XDR Header data associated with the stored ledger.

func (*LedgerTransactionReader) GetSequence

func (reader *LedgerTransactionReader) GetSequence() uint32

GetSequence returns the sequence number of the ledger data stored by this object.

func (*LedgerTransactionReader) Read

Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there are no more transactions to return, an EOF error is returned.

func (*LedgerTransactionReader) Rewind

func (reader *LedgerTransactionReader) Rewind()

Rewind resets the reader back to the first transaction in the ledger

type MockChangeReader

type MockChangeReader struct {
	mock.Mock
}

func (*MockChangeReader) Close

func (m *MockChangeReader) Close() error

func (*MockChangeReader) Read

func (m *MockChangeReader) Read() (Change, error)

type StateError

type StateError struct {
	// contains filtered or unexported fields
}

StateError is a fatal error indicating that the Change stream produced a result which violates fundamental invariants (e.g. an account transferred more XLM than the account held in its balance).

func NewStateError

func NewStateError(err error) StateError

NewStateError creates a new StateError.

type StatsChangeProcessor

type StatsChangeProcessor struct {
	// contains filtered or unexported fields
}

StatsChangeProcessor is a state processors that counts number of changes types and entry types.

func (*StatsChangeProcessor) GetResults

func (*StatsChangeProcessor) ProcessChange

func (p *StatsChangeProcessor) ProcessChange(ctx context.Context, change Change) error

type StatsChangeProcessorResults

type StatsChangeProcessorResults struct {
	AccountsCreated int64
	AccountsUpdated int64
	AccountsRemoved int64

	ClaimableBalancesCreated int64
	ClaimableBalancesUpdated int64
	ClaimableBalancesRemoved int64

	DataCreated int64
	DataUpdated int64
	DataRemoved int64

	OffersCreated int64
	OffersUpdated int64
	OffersRemoved int64

	TrustLinesCreated int64
	TrustLinesUpdated int64
	TrustLinesRemoved int64

	LiquidityPoolsCreated int64
	LiquidityPoolsUpdated int64
	LiquidityPoolsRemoved int64
}

StatsChangeProcessorResults contains results after running StatsChangeProcessor.

func (*StatsChangeProcessorResults) Map

func (stats *StatsChangeProcessorResults) Map() map[string]interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL