processors

package
v0.0.0-...-55f7a6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeOfferId

func EncodeOfferId(id uint64, typ OfferIDType) int64

EncodeOfferId creates synthetic offer ids to be used by trade resources

This is required because digitalbits-core does not allocate offer ids for immediately filled offers, while clients expect them for aggregated views.

The encoded value is of type int64 for sql compatibility. The 2nd bit is used to differentiate between digitalbits-core offer ids and operation ids, which are toids.

Due to the 2nd bit being used, the largest possible toid is: 0011111111111111111111111111111100000000000000000001000000000001 \ ledger /\ transaction /\ op /

= 1073741823
  with avg. 5 sec close time will reach in ~170 years

func ParticipantsForTransaction

func ParticipantsForTransaction(
	sequence uint32,
	transaction ingest.LedgerTransaction,
) ([]xdr.AccountId, error)

func PoolIDToString

func PoolIDToString(id xdr.PoolId) string

PoolIDToString encodes a liquidity pool id xdr value to its string form

func StreamChanges

func StreamChanges(
	ctx context.Context,
	changeProcessor ChangeProcessor,
	reader ingest.ChangeReader,
) error

func StreamLedgerTransactions

func StreamLedgerTransactions(
	ctx context.Context,
	txFilterer LedgerTransactionFilterer,
	filteredTxProcessor LedgerTransactionProcessor,
	txProcessor LedgerTransactionProcessor,
	reader *ingest.LedgerTransactionReader,
) error

Types

type AccountDataProcessor

type AccountDataProcessor struct {
	// contains filtered or unexported fields
}

func NewAccountDataProcessor

func NewAccountDataProcessor(dataQ history.QData) *AccountDataProcessor

func (*AccountDataProcessor) Commit

func (p *AccountDataProcessor) Commit(ctx context.Context) error

func (*AccountDataProcessor) ProcessChange

func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type AccountsProcessor

type AccountsProcessor struct {
	// contains filtered or unexported fields
}

func NewAccountsProcessor

func NewAccountsProcessor(accountsQ history.QAccounts) *AccountsProcessor

func (*AccountsProcessor) Commit

func (p *AccountsProcessor) Commit(ctx context.Context) error

func (*AccountsProcessor) ProcessChange

func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type AssetStatSet

type AssetStatSet map[assetStatKey]*assetStatValue

AssetStatSet represents a collection of asset stats

func (AssetStatSet) AddClaimableBalance

func (s AssetStatSet) AddClaimableBalance(change ingest.Change) error

AddClaimableBalance updates the set to account for how a given claimable balance has changed. change must be a xdr.LedgerEntryTypeClaimableBalance type.

func (AssetStatSet) AddLiquidityPool

func (s AssetStatSet) AddLiquidityPool(change ingest.Change) error

AddLiquidityPool updates the set to account for how a given liquidity pool has changed. change must be a xdr.LedgerEntryTypeLiqidityPool type.

func (AssetStatSet) AddTrustline

func (s AssetStatSet) AddTrustline(change ingest.Change) error

AddTrustline updates the set to account for how a given trustline has changed. change must be a xdr.LedgerEntryTypeTrustLine type.

func (AssetStatSet) All

func (s AssetStatSet) All() []history.ExpAssetStat

All returns a list of all `history.ExpAssetStat` contained within the set

func (AssetStatSet) Remove

func (s AssetStatSet) Remove(assetType xdr.AssetType, assetCode string, assetIssuer string) (history.ExpAssetStat, bool)

Remove deletes an asset stat from the set

type AssetStatsProcessor

type AssetStatsProcessor struct {
	// contains filtered or unexported fields
}

func NewAssetStatsProcessor

func NewAssetStatsProcessor(
	assetStatsQ history.QAssetStats,
	useLedgerEntryCache bool,
) *AssetStatsProcessor

NewAssetStatsProcessor constructs a new AssetStatsProcessor instance. If useLedgerEntryCache is false we don't use ledger cache and we just add trust lines to assetStatSet, then we insert all the stats in one insert query. This is done to make history buckets processing faster (batch inserting).

func (*AssetStatsProcessor) Commit

func (p *AssetStatsProcessor) Commit(ctx context.Context) error

func (*AssetStatsProcessor) ProcessChange

func (p *AssetStatsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type ChangeProcessor

type ChangeProcessor interface {
	ProcessChange(ctx context.Context, change ingest.Change) error
}

type ClaimableBalancesChangeProcessor

type ClaimableBalancesChangeProcessor struct {
	// contains filtered or unexported fields
}

func (*ClaimableBalancesChangeProcessor) Commit

func (*ClaimableBalancesChangeProcessor) ProcessChange

func (p *ClaimableBalancesChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type ClaimableBalancesTransactionProcessor

type ClaimableBalancesTransactionProcessor struct {
	// contains filtered or unexported fields
}

func (*ClaimableBalancesTransactionProcessor) Commit

func (*ClaimableBalancesTransactionProcessor) ProcessTransaction

func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error

type EffectProcessor

type EffectProcessor struct {
	// contains filtered or unexported fields
}

EffectProcessor process effects

func NewEffectProcessor

func NewEffectProcessor(effectsQ history.QEffects, sequence uint32) *EffectProcessor

func (*EffectProcessor) Commit

func (p *EffectProcessor) Commit(ctx context.Context) (err error)

func (*EffectProcessor) ProcessTransaction

func (p *EffectProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error)

type LedgerTransactionFilterer

type LedgerTransactionFilterer interface {
	FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error)
}

type LedgerTransactionProcessor

type LedgerTransactionProcessor interface {
	ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error
}

type LedgersProcessor

type LedgersProcessor struct {
	// contains filtered or unexported fields
}

func NewLedgerProcessor

func NewLedgerProcessor(
	ledgerQ history.QLedgers,
	ledger xdr.LedgerHeaderHistoryEntry,
	ingestVersion int,
) *LedgersProcessor

func (*LedgersProcessor) Commit

func (p *LedgersProcessor) Commit(ctx context.Context) error

func (*LedgersProcessor) ProcessTransaction

func (p *LedgersProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error)

type LiquidityPoolsChangeProcessor

type LiquidityPoolsChangeProcessor struct {
	// contains filtered or unexported fields
}

func NewLiquidityPoolsChangeProcessor

func NewLiquidityPoolsChangeProcessor(Q history.QLiquidityPools, sequence uint32) *LiquidityPoolsChangeProcessor

func (*LiquidityPoolsChangeProcessor) Commit

func (*LiquidityPoolsChangeProcessor) ProcessChange

func (p *LiquidityPoolsChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type LiquidityPoolsTransactionProcessor

type LiquidityPoolsTransactionProcessor struct {
	// contains filtered or unexported fields
}

func (*LiquidityPoolsTransactionProcessor) Commit

func (*LiquidityPoolsTransactionProcessor) ProcessTransaction

func (p *LiquidityPoolsTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error

type MockChangeProcessor

type MockChangeProcessor struct {
	mock.Mock
}

func (*MockChangeProcessor) ProcessChange

func (m *MockChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type OfferIDType

type OfferIDType uint64
const (
	CoreOfferIDType OfferIDType = 0
	TOIDType        OfferIDType = 1
)

func DecodeOfferID

func DecodeOfferID(encodedId int64) (uint64, OfferIDType)

DecodeOfferID performs the reverse operation of EncodeOfferID

type OffersProcessor

type OffersProcessor struct {
	// contains filtered or unexported fields
}

func NewOffersProcessor

func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcessor

func (*OffersProcessor) Commit

func (p *OffersProcessor) Commit(ctx context.Context) error

func (*OffersProcessor) ProcessChange

func (p *OffersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type OperationProcessor

type OperationProcessor struct {
	// contains filtered or unexported fields
}

OperationProcessor operations processor

func NewOperationProcessor

func NewOperationProcessor(operationsQ history.QOperations, sequence uint32) *OperationProcessor

func (*OperationProcessor) Commit

func (p *OperationProcessor) Commit(ctx context.Context) error

func (*OperationProcessor) ProcessTransaction

func (p *OperationProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error

ProcessTransaction process the given transaction

type ParticipantsProcessor

type ParticipantsProcessor struct {
	// contains filtered or unexported fields
}

ParticipantsProcessor is a processor which ingests various participants from different sources (transactions, operations, etc)

func NewParticipantsProcessor

func NewParticipantsProcessor(participantsQ history.QParticipants, sequence uint32) *ParticipantsProcessor

func (*ParticipantsProcessor) Commit

func (p *ParticipantsProcessor) Commit(ctx context.Context) (err error)

func (*ParticipantsProcessor) ProcessTransaction

func (p *ParticipantsProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error)

type SignersProcessor

type SignersProcessor struct {
	// contains filtered or unexported fields
}

func NewSignersProcessor

func NewSignersProcessor(
	signersQ history.QSigners, useLedgerEntryCache bool,
) *SignersProcessor

func (*SignersProcessor) Commit

func (p *SignersProcessor) Commit(ctx context.Context) error

func (*SignersProcessor) ProcessChange

func (p *SignersProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

type StatsLedgerTransactionProcessor

type StatsLedgerTransactionProcessor struct {
	// contains filtered or unexported fields
}

StatsLedgerTransactionProcessor is a state processors that counts number of changes types and entry types.

func (*StatsLedgerTransactionProcessor) GetResults

func (*StatsLedgerTransactionProcessor) ProcessTransaction

func (p *StatsLedgerTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error

type StatsLedgerTransactionProcessorResults

type StatsLedgerTransactionProcessorResults struct {
	Transactions           int64
	TransactionsSuccessful int64
	TransactionsFailed     int64
	TransactionsFiltered   int64

	Operations             int64
	OperationsInSuccessful int64
	OperationsInFailed     int64

	OperationsCreateAccount                 int64
	OperationsPayment                       int64
	OperationsPathPaymentStrictReceive      int64
	OperationsManageSellOffer               int64
	OperationsCreatePassiveSellOffer        int64
	OperationsSetOptions                    int64
	OperationsChangeTrust                   int64
	OperationsAllowTrust                    int64
	OperationsAccountMerge                  int64
	OperationsInflation                     int64
	OperationsManageData                    int64
	OperationsBumpSequence                  int64
	OperationsManageBuyOffer                int64
	OperationsPathPaymentStrictSend         int64
	OperationsCreateClaimableBalance        int64
	OperationsClaimClaimableBalance         int64
	OperationsBeginSponsoringFutureReserves int64
	OperationsEndSponsoringFutureReserves   int64
	OperationsRevokeSponsorship             int64
	OperationsClawback                      int64
	OperationsClawbackClaimableBalance      int64
	OperationsSetTrustLineFlags             int64
	OperationsLiquidityPoolDeposit          int64
	OperationsLiquidityPoolWithdraw         int64
}

StatsLedgerTransactionProcessorResults contains results after running StatsLedgerTransactionProcessor.

func (*StatsLedgerTransactionProcessorResults) Map

func (stats *StatsLedgerTransactionProcessorResults) Map() map[string]interface{}

type TradeProcessor

type TradeProcessor struct {
	// contains filtered or unexported fields
}

TradeProcessor operations processor

func NewTradeProcessor

func NewTradeProcessor(tradesQ history.QTrades, ledger xdr.LedgerHeaderHistoryEntry) *TradeProcessor

func (*TradeProcessor) Commit

func (p *TradeProcessor) Commit(ctx context.Context) error

func (*TradeProcessor) GetStats

func (p *TradeProcessor) GetStats() TradeStats

func (*TradeProcessor) ProcessTransaction

func (p *TradeProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error)

ProcessTransaction process the given transaction

type TradeStats

type TradeStats struct {
	// contains filtered or unexported fields
}

func (*TradeStats) Map

func (stats *TradeStats) Map() map[string]interface{}

type TransactionProcessor

type TransactionProcessor struct {
	// contains filtered or unexported fields
}

func NewTransactionFilteredTmpProcessor

func NewTransactionFilteredTmpProcessor(transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor

func NewTransactionProcessor

func NewTransactionProcessor(transactionsQ history.QTransactions, sequence uint32) *TransactionProcessor

func (*TransactionProcessor) Commit

func (p *TransactionProcessor) Commit(ctx context.Context) error

func (*TransactionProcessor) ProcessTransaction

func (p *TransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error

type TrustLinesProcessor

type TrustLinesProcessor struct {
	// contains filtered or unexported fields
}

func NewTrustLinesProcessor

func NewTrustLinesProcessor(trustLinesQ history.QTrustLines) *TrustLinesProcessor

func (*TrustLinesProcessor) Commit

func (p *TrustLinesProcessor) Commit(ctx context.Context) error

func (*TrustLinesProcessor) ProcessChange

func (p *TrustLinesProcessor) ProcessChange(ctx context.Context, change ingest.Change) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL