sqlstore

package
v0.67.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2023 License: MIT Imports: 41 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ASC  Sorting = "ASC"
	DESC Sorting = "DESC"

	EQ Compare = "="
	NE Compare = "!="
	GT Compare = ">"
	LT Compare = "<"
	GE Compare = ">="
	LE Compare = "<="
)
View Source
const (
	SQLMigrationsDir = "migrations"
	InfiniteInterval = "forever"
)

Variables

View Source
var (
	ErrBlockWaitTimedout = errors.New("Timed out waiting for TimeUpdate event")
	BlockWaitTimeout     = 5 * time.Second
)
View Source
var (
	ErrLedgerEntryFilterForParty = errors.New("filtering ledger entries should be limited to a single party")

	ErrLedgerEntryExportForParty = errors.New("exporting ledger entries should be limited to a single party")
	ErrLedgerEntryExportForAsset = errors.New("exporting ledger entries should be limited to a single asset")
)
View Source
var EmbedMigrations embed.FS
View Source
var ErrBadID = errors.New("bad id (must be hex string)")
View Source
var ErrInvalidDateRange = errors.New("invalid date range, end date must be after start date")

Functions

func ApplyDataRetentionPolicies

func ApplyDataRetentionPolicies(config Config) error

func CreateConnectionPool added in v0.67.0

func CreateConnectionPool(conf ConnectionConfig) (*pgxpool.Pool, error)

func CreateVegaSchema added in v0.57.0

func CreateVegaSchema(log *logging.Logger, connConfig ConnectionConfig) error

func CursorPredicate

func CursorPredicate(args []interface{}, cursor interface{}, ordering TableOrdering) (string, []interface{}, error)

CursorPredicate generates an SQL predicate which excludes all rows before the supplied cursor, with regards to the supplied table ordering. The values used for comparison are added to the args list and bind variables used in the query fragment.

For example, with if you had a query with columns sorted foo ASCENDING, bar DESCENDING and a cursor with {foo=1, bar=2}, it would yield a string predicate like this:

(foo > $1) OR (foo = $1 AND bar <= $2)

And 'args' would have 1 and 2 appended to it.

Notes:

  • The predicate *includes* the value at the cursor
  • Only fields that are present in both the cursor and the ordering are considered
  • The union of those fields must have enough information to uniquely identify a row
  • The table ordering must be sufficient to ensure that a row identified by a cursor cannot change position in relation to the other rows

func GetAllTableNames added in v0.64.0

func GetAllTableNames(ctx context.Context, conn *pgxpool.Pool) ([]string, error)

func GetLastBlockUsingConnection added in v0.60.0

func GetLastBlockUsingConnection(ctx context.Context, connection Connection) (*entities.Block, error)

func GetOldestHistoryBlockUsingConnection added in v0.60.0

func GetOldestHistoryBlockUsingConnection(ctx context.Context, connection Connection) (entities.Block, error)

func HasVegaSchema added in v0.64.0

func HasVegaSchema(ctx context.Context, connPool *pgxpool.Pool) (bool, error)

func MigrateToLatestSchema

func MigrateToLatestSchema(log *logging.Logger, config Config) error

func MigrateToSchemaVersion added in v0.63.0

func MigrateToSchemaVersion(log *logging.Logger, config Config, version int64, fs fs.FS) error

func PaginateQuery

func PaginateQuery[T any, PT parserPtr[T]](
	query string,
	args []interface{},
	ordering TableOrdering,
	pagination entities.CursorPagination,
) (string, []interface{}, error)

PaginateQuery takes a query string & bind arg list and returns the same with additional SQL to

  • exclude rows before the cursor (or after it if the cursor is a backwards looking one)
  • limit the number of rows to the pagination limit +1 (no cursor) or +2 (cursor) [for purposes of later figuring out whether there are next or previous pages]
  • order the query according to the TableOrdering supplied the order is reversed if pagination request is backwards

For example with cursor to a row where foo=42, and a pagination saying get the next 3 then: PaginateQuery[MyCursor]("SELECT foo FROM my_table", args, ordering, pagination)

Would append `42` to the arg list and return SELECT foo FROM my_table WHERE foo>=$1 ORDER BY foo ASC LIMIT 5

See CursorPredicate() for more details about how the cursor filtering is done.

func RecreateVegaDatabase added in v0.57.0

func RecreateVegaDatabase(ctx context.Context, log *logging.Logger, connConfig ConnectionConfig) error

func RevertToSchemaVersionZero added in v0.64.0

func RevertToSchemaVersionZero(log *logging.Logger, config ConnectionConfig, fs fs.FS) error

func StartEmbeddedPostgres

func StartEmbeddedPostgres(log *logging.Logger, config Config, runtimeDir string, postgresLog EmbeddedPostgresLog) (*embeddedpostgres.EmbeddedPostgres, error)

func StructValueForColumn

func StructValueForColumn(obj any, colName string) (interface{}, error)

StructValueForColumn replicates some of the unexported functionality from Scanny. You pass a struct (or pointer to a struct), and a column name. It converts the struct field names into database column names in a similar way to scanny and if one matches colName, that field value is returned. For example

type Foo struct {
	Thingy        int `db:"wotsit"`
	SomethingElse int
}

val, err := StructValueForColumn(foo, "wotsit")             -> 1
val, err := StructValueForColumn(&foo, "something_else")    -> 2

NB - not all functionality of scanny is supported (but could be added if needed)

  • we don't support embedded structs
  • assumes the 'dbTag' is the default 'db'

func WipeDatabaseAndMigrateSchemaToLatestVersion added in v0.65.0

func WipeDatabaseAndMigrateSchemaToLatestVersion(log *logging.Logger, config ConnectionConfig, fs fs.FS) error

func WipeDatabaseAndMigrateSchemaToVersion added in v0.65.0

func WipeDatabaseAndMigrateSchemaToVersion(log *logging.Logger, config ConnectionConfig, version int64, fs fs.FS) error

Types

type AccountSource

type AccountSource interface {
	Query(filter entities.AccountFilter) ([]entities.Account, error)
}

type Accounts

type Accounts struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewAccounts

func NewAccounts(connectionSource *ConnectionSource) *Accounts

func (*Accounts) Add

func (as *Accounts) Add(ctx context.Context, a *entities.Account) error

Add inserts a row and updates supplied struct with autogenerated ID.

func (*Accounts) GetAll

func (as *Accounts) GetAll(ctx context.Context) ([]entities.Account, error)

func (*Accounts) GetByID

func (as *Accounts) GetByID(ctx context.Context, accountID entities.AccountID) (entities.Account, error)

func (*Accounts) Obtain

func (as *Accounts) Obtain(ctx context.Context, a *entities.Account) error

Obtain will either fetch or create an account in the database. If an account with matching party/asset/market/type does not exist in the database, create one. If an account already exists, fetch that one. In either case, update the entities.Account object passed with an ID from the database.

func (*Accounts) Query

func (as *Accounts) Query(ctx context.Context, filter entities.AccountFilter) ([]entities.Account, error)

func (*Accounts) QueryBalances

func (*Accounts) QueryBalancesV1

func (as *Accounts) QueryBalancesV1(ctx context.Context, filter entities.AccountFilter, pagination entities.OffsetPagination) ([]entities.AccountBalance, error)

type Assets

type Assets struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewAssets

func NewAssets(connectionSource *ConnectionSource) *Assets

func (*Assets) Add

func (as *Assets) Add(ctx context.Context, a entities.Asset) error

func (*Assets) GetAll

func (as *Assets) GetAll(ctx context.Context) ([]entities.Asset, error)

func (*Assets) GetAllWithCursorPagination

func (as *Assets) GetAllWithCursorPagination(ctx context.Context, pagination entities.CursorPagination) (
	[]entities.Asset, entities.PageInfo, error,
)

func (*Assets) GetByID

func (as *Assets) GetByID(ctx context.Context, id string) (entities.Asset, error)

type Balances

type Balances struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewBalances

func NewBalances(connectionSource *ConnectionSource) *Balances

func (*Balances) Add

Add inserts a row to the balance table. If there's already a balance for this (account, block time) update it to match with the one supplied.

func (*Balances) Flush

func (bs *Balances) Flush(ctx context.Context) ([]entities.AccountBalance, error)

func (*Balances) Query

type Blocks

type Blocks struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewBlocks

func NewBlocks(connectionSource *ConnectionSource) *Blocks

func (*Blocks) Add

func (bs *Blocks) Add(ctx context.Context, b entities.Block) error

func (*Blocks) GetAll

func (bs *Blocks) GetAll(ctx context.Context) ([]entities.Block, error)

func (*Blocks) GetAtHeight

func (bs *Blocks) GetAtHeight(ctx context.Context, height int64) (entities.Block, error)

func (*Blocks) GetLastBlock

func (bs *Blocks) GetLastBlock(ctx context.Context) (entities.Block, error)

GetLastBlock return the last block or ErrNoLastBlock if no block is found.

func (*Blocks) GetOldestHistoryBlock added in v0.57.0

func (bs *Blocks) GetOldestHistoryBlock(ctx context.Context) (entities.Block, error)

type Candles

type Candles struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewCandles

func NewCandles(ctx context.Context, connectionSource *ConnectionSource, config candlesv2.CandleStoreConfig) *Candles

func (*Candles) CandleExists

func (cs *Candles) CandleExists(ctx context.Context, candleID string) (bool, error)

func (*Candles) GetCandleDataForTimeSpan

func (cs *Candles) GetCandleDataForTimeSpan(ctx context.Context, candleID string, from *time.Time, to *time.Time,
	p entities.CursorPagination) ([]entities.Candle, entities.PageInfo, error,
)

GetCandleDataForTimeSpan gets the candles for a given interval, from and to are optional.

func (*Candles) GetCandleIDForIntervalAndMarket added in v0.55.0

func (cs *Candles) GetCandleIDForIntervalAndMarket(ctx context.Context, interval string, market string) (bool, string, error)

func (*Candles) GetCandlesForMarket

func (cs *Candles) GetCandlesForMarket(ctx context.Context, market string) (map[string]string, error)

GetCandlesForMarket returns a map of existing intervals to candle ids for the given market.

type Chain

type Chain struct {
	*ConnectionSource
}

func NewChain

func NewChain(connectionSource *ConnectionSource) *Chain

func (*Chain) Get

func (c *Chain) Get(ctx context.Context) (entities.Chain, error)

func (*Chain) Set

func (c *Chain) Set(ctx context.Context, chain entities.Chain) error

type Checkpoints

type Checkpoints struct {
	*ConnectionSource
}

func NewCheckpoints

func NewCheckpoints(connectionSource *ConnectionSource) *Checkpoints

func (*Checkpoints) Add

func (*Checkpoints) GetAll

type ColumnOrdering

type ColumnOrdering struct {
	// Name of the column in the database table to match to the struct field
	Name string
	// Sorting is the sorting order to use for the column
	Sorting Sorting
}

func NewColumnOrdering

func NewColumnOrdering(name string, sorting Sorting) ColumnOrdering

type Compare

type Compare = string

type Config

type Config struct {
	ConnectionConfig                                   ConnectionConfig      `group:"ConnectionConfig" namespace:"ConnectionConfig"`
	WipeOnStartup                                      encoding.Bool         `long:"wipe-on-startup"`
	Level                                              encoding.LogLevel     `long:"log-level"`
	UseEmbedded                                        encoding.Bool         `long:"use-embedded" description:"Use an embedded version of Postgresql for the SQL data store"`
	FanOutBufferSize                                   int                   `long:"fan-out-buffer-size" description:"buffer size used by the fan out event source"`
	RetentionPolicies                                  []RetentionPolicy     `group:"RetentionPolicies" namespace:"RetentionPolicies"`
	ConnectionRetryConfig                              ConnectionRetryConfig `group:"ConnectionRetryConfig" namespace:"ConnectionRetryConfig"`
	LogRotationConfig                                  LogRotationConfig     `group:"LogRotationConfig" namespace:"LogRotationConfig"`
	DisableMinRetentionPolicyCheckForUseInSysTestsOnly encoding.Bool         `` /* 154-byte string literal not displayed */
}

func NewDefaultConfig

func NewDefaultConfig() Config

type Connection

type Connection interface {
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error)
	SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}

type ConnectionConfig

type ConnectionConfig struct {
	Host                  string            `long:"host"`
	Port                  int               `long:"port"`
	Username              string            `long:"username"`
	Password              string            `long:"password"`
	Database              string            `long:"database"`
	SocketDir             string            `long:"socket-dir" description:"location of postgres UNIX socket directory (used if host is empty string)"`
	MaxConnLifetime       encoding.Duration `long:"max-conn-lifetime"`
	MaxConnLifetimeJitter encoding.Duration `long:"max-conn-lifetime-jitter"`
	MaxConnPoolSize       int               `long:"max-conn-pool-size"`
	MinConnPoolSize       int32             `long:"min-conn-pool-size"`
}

func (ConnectionConfig) GetConnectionString

func (conf ConnectionConfig) GetConnectionString() string

func (ConnectionConfig) GetConnectionStringForPostgresDatabase added in v0.57.0

func (conf ConnectionConfig) GetConnectionStringForPostgresDatabase() string

func (ConnectionConfig) GetPoolConfig

func (conf ConnectionConfig) GetPoolConfig() (*pgxpool.Config, error)

type ConnectionRetryConfig added in v0.59.0

type ConnectionRetryConfig struct {
	MaxRetries      uint64        `long:"max-retries" description:"the maximum number of times to retry connecting to the database"`
	InitialInterval time.Duration `long:"initial-interval" description:"the initial interval to wait before retrying"`
	MaxInterval     time.Duration `long:"max-interval" description:"the maximum interval to wait before retrying"`
	MaxElapsedTime  time.Duration `long:"max-elapsed-time" description:"the maximum elapsed time to wait before giving up"`
}

type ConnectionSource

type ConnectionSource struct {
	Connection Connection
	// contains filtered or unexported fields
}

func NewTransactionalConnectionSource

func NewTransactionalConnectionSource(log *logging.Logger, connConfig ConnectionConfig) (*ConnectionSource, error)

func (*ConnectionSource) AfterCommit added in v0.55.0

func (s *ConnectionSource) AfterCommit(ctx context.Context, f func())

func (*ConnectionSource) Close added in v0.60.0

func (s *ConnectionSource) Close()

func (*ConnectionSource) Commit

func (s *ConnectionSource) Commit(ctx context.Context) error

func (*ConnectionSource) Rollback added in v0.64.0

func (s *ConnectionSource) Rollback(ctx context.Context) error

func (*ConnectionSource) WithConnection

func (s *ConnectionSource) WithConnection(ctx context.Context) (context.Context, error)

func (*ConnectionSource) WithTransaction

func (s *ConnectionSource) WithTransaction(ctx context.Context) (context.Context, error)

type CoreSnapshotData added in v0.65.0

type CoreSnapshotData struct {
	*ConnectionSource
}

func NewCoreSnapshotData added in v0.65.0

func NewCoreSnapshotData(connectionSource *ConnectionSource) *CoreSnapshotData

func (*CoreSnapshotData) Add added in v0.65.0

func (*CoreSnapshotData) List added in v0.65.0

type CursorQueryParameter

type CursorQueryParameter struct {
	ColumnName string
	Sort       Sorting
	Cmp        Compare
	Value      any
}

func NewCursorQueryParameter

func NewCursorQueryParameter(columnName string, sort Sorting, cmp Compare, value any) CursorQueryParameter

func (CursorQueryParameter) OrderBy

func (c CursorQueryParameter) OrderBy() string

func (CursorQueryParameter) Where

func (c CursorQueryParameter) Where(args ...interface{}) (string, []interface{})

type CursorQueryParameters

type CursorQueryParameters []CursorQueryParameter

func (CursorQueryParameters) OrderBy

func (c CursorQueryParameters) OrderBy() string

func (CursorQueryParameters) Where

func (c CursorQueryParameters) Where(args ...interface{}) (string, []interface{})

type DatanodeBlockSpan added in v0.64.0

type DatanodeBlockSpan struct {
	FromHeight int64
	ToHeight   int64
	HasData    bool
}

func GetDatanodeBlockSpan added in v0.64.0

func GetDatanodeBlockSpan(ctx context.Context, connPool *pgxpool.Pool) (DatanodeBlockSpan, error)

type Delegations

type Delegations struct {
	*ConnectionSource
}

func NewDelegations

func NewDelegations(connectionSource *ConnectionSource) *Delegations

func (*Delegations) Add

func (*Delegations) Get

func (ds *Delegations) Get(ctx context.Context,
	partyIDHex *string,
	nodeIDHex *string,
	epochID *int64,
	pagination entities.Pagination,
) ([]entities.Delegation, entities.PageInfo, error)

func (*Delegations) GetAll

func (ds *Delegations) GetAll(ctx context.Context) ([]entities.Delegation, error)

type Deposits

type Deposits struct {
	*ConnectionSource
}

func NewDeposits

func NewDeposits(connectionSource *ConnectionSource) *Deposits

func (*Deposits) GetByID

func (d *Deposits) GetByID(ctx context.Context, depositID string) (entities.Deposit, error)

func (*Deposits) GetByParty

func (d *Deposits) GetByParty(ctx context.Context, party string, openOnly bool, pagination entities.Pagination, dateRange entities.DateRange) (
	[]entities.Deposit, entities.PageInfo, error,
)

func (*Deposits) Upsert

func (d *Deposits) Upsert(ctx context.Context, deposit *entities.Deposit) error

type ERC20MultiSigSignerEvent

type ERC20MultiSigSignerEvent struct {
	*ConnectionSource
}

func NewERC20MultiSigSignerEvent

func NewERC20MultiSigSignerEvent(connectionSource *ConnectionSource) *ERC20MultiSigSignerEvent

func (*ERC20MultiSigSignerEvent) Add

func (*ERC20MultiSigSignerEvent) GetAddedEvents

func (m *ERC20MultiSigSignerEvent) GetAddedEvents(ctx context.Context, validatorID string, submitter string, epochID *int64, pagination entities.CursorPagination) (
	[]entities.ERC20MultiSigSignerEvent, entities.PageInfo, error,
)

func (*ERC20MultiSigSignerEvent) GetRemovedEvents

func (m *ERC20MultiSigSignerEvent) GetRemovedEvents(ctx context.Context, validatorID string, submitter string, epochID *int64, pagination entities.CursorPagination) ([]entities.ERC20MultiSigSignerEvent, entities.PageInfo, error)

type EmbeddedPostgresLog added in v0.60.0

type EmbeddedPostgresLog interface {
	io.Writer
}

type Epochs

type Epochs struct {
	*ConnectionSource
}

func NewEpochs

func NewEpochs(connectionSource *ConnectionSource) *Epochs

func (*Epochs) Add

func (es *Epochs) Add(ctx context.Context, r entities.Epoch) error

func (*Epochs) Get

func (es *Epochs) Get(ctx context.Context, ID int64) (entities.Epoch, error)

func (*Epochs) GetAll

func (es *Epochs) GetAll(ctx context.Context) ([]entities.Epoch, error)

func (*Epochs) GetCurrent

func (es *Epochs) GetCurrent(ctx context.Context) (entities.Epoch, error)

type EthereumKeyRotations

type EthereumKeyRotations struct {
	*ConnectionSource
}

func NewEthereumKeyRotations

func NewEthereumKeyRotations(connectionSource *ConnectionSource) *EthereumKeyRotations

func (*EthereumKeyRotations) Add

func (*EthereumKeyRotations) List

type KeyRotations

type KeyRotations struct {
	*ConnectionSource
}

func NewKeyRotations

func NewKeyRotations(connectionSource *ConnectionSource) *KeyRotations

func (*KeyRotations) GetAllPubKeyRotations

func (store *KeyRotations) GetAllPubKeyRotations(ctx context.Context, pagination entities.CursorPagination) ([]entities.KeyRotation, entities.PageInfo, error)

func (*KeyRotations) GetPubKeyRotationsPerNode

func (store *KeyRotations) GetPubKeyRotationsPerNode(ctx context.Context, nodeID string, pagination entities.CursorPagination) ([]entities.KeyRotation, entities.PageInfo, error)

func (*KeyRotations) Upsert

func (store *KeyRotations) Upsert(ctx context.Context, kr *entities.KeyRotation) error

type Ledger

type Ledger struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewLedger

func NewLedger(connectionSource *ConnectionSource) *Ledger

func (*Ledger) Add

func (ls *Ledger) Add(le entities.LedgerEntry) error

func (*Ledger) Export added in v0.67.0

func (ls *Ledger) Export(
	ctx context.Context,
	partyID, assetID string,
	dateRange entities.DateRange,
	pagination entities.CursorPagination,
) ([]byte, entities.PageInfo, error)

func (*Ledger) Flush

func (ls *Ledger) Flush(ctx context.Context) ([]entities.LedgerEntry, error)

func (*Ledger) GetAll

func (ls *Ledger) GetAll(ctx context.Context) ([]entities.LedgerEntry, error)

func (*Ledger) GetByLedgerEntryTime added in v0.57.0

func (ls *Ledger) GetByLedgerEntryTime(ctx context.Context, ledgerEntryTime time.Time) (entities.LedgerEntry, error)

func (*Ledger) Query added in v0.58.0

This query requests and sums number of the ledger entries of a given subset of accounts, specified via the 'filter' argument. It returns a timeseries (implemented as a list of AggregateLedgerEntry structs), with a row for every time the summed ledger entries of the set of specified accounts changes. Listed queries should be limited to a single party from each side only. If no or more than one parties are provided for sending and receiving accounts - the query returns error.

Entries can be queried by:

  • listing ledger entries with filtering on the sending account (market_id, asset_id, account_type)
  • listing ledger entries with filtering on the receiving account (market_id, asset_id, account_type)
  • listing ledger entries with filtering on the sending AND receiving account
  • listing ledger entries with filtering on the transfer type (on top of above filters or as a standalone option)

type LiquidityProvision

type LiquidityProvision struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewLiquidityProvision

func NewLiquidityProvision(connectionSource *ConnectionSource, log *logging.Logger) *LiquidityProvision

func (*LiquidityProvision) Flush

func (lp *LiquidityProvision) Flush(ctx context.Context) error

func (*LiquidityProvision) Get

func (*LiquidityProvision) ObserveLiquidityProvisions added in v0.55.0

func (lp *LiquidityProvision) ObserveLiquidityProvisions(ctx context.Context, retries int,
	market *string, party *string,
) (<-chan []entities.LiquidityProvision, uint64)

func (*LiquidityProvision) Upsert

func (lp *LiquidityProvision) Upsert(ctx context.Context, liquidityProvision entities.LiquidityProvision) error

type ListBatcher

type ListBatcher[T simpleEntity] struct {
	// contains filtered or unexported fields
}

func NewListBatcher

func NewListBatcher[T simpleEntity](tableName string, columnNames []string) ListBatcher[T]

func (*ListBatcher[T]) Add

func (b *ListBatcher[T]) Add(entity T)

func (*ListBatcher[T]) Flush

func (b *ListBatcher[T]) Flush(ctx context.Context, connection Connection) ([]T, error)

type LogRotationConfig added in v0.62.0

type LogRotationConfig struct {
	MaxSize int `long:"max-size" description:"the maximum size of the log file in bytes"`
	MaxAge  int `long:"max-age" description:"the maximum number of days to keep the log file"`
}

type MapBatcher

type MapBatcher[K entityKey, V entity[K]] struct {
	// contains filtered or unexported fields
}

func NewMapBatcher

func NewMapBatcher[K entityKey, V entity[K]](tableName string, columnNames []string) MapBatcher[K, V]

func (*MapBatcher[K, V]) Add

func (b *MapBatcher[K, V]) Add(e V)

func (*MapBatcher[K, V]) Flush

func (b *MapBatcher[K, V]) Flush(ctx context.Context, connection Connection) ([]V, error)

type MarginLevels

type MarginLevels struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewMarginLevels

func NewMarginLevels(connectionSource *ConnectionSource) *MarginLevels

func (*MarginLevels) Add

func (ml *MarginLevels) Add(marginLevel entities.MarginLevels) error

func (*MarginLevels) Flush

func (ml *MarginLevels) Flush(ctx context.Context) ([]entities.MarginLevels, error)

func (*MarginLevels) GetMarginLevelsByID

func (ml *MarginLevels) GetMarginLevelsByID(ctx context.Context, partyID, marketID string, pagination entities.OffsetPagination) ([]entities.MarginLevels, error)

func (*MarginLevels) GetMarginLevelsByIDWithCursorPagination

func (ml *MarginLevels) GetMarginLevelsByIDWithCursorPagination(ctx context.Context, partyID, marketID string, pagination entities.CursorPagination) ([]entities.MarginLevels, entities.PageInfo, error)

type MarketData

type MarketData struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewMarketData

func NewMarketData(connectionSource *ConnectionSource) *MarketData

func (*MarketData) Add

func (md *MarketData) Add(data *entities.MarketData) error

func (*MarketData) Flush

func (md *MarketData) Flush(ctx context.Context) ([]*entities.MarketData, error)

func (*MarketData) GetBetweenDatesByID

func (md *MarketData) GetBetweenDatesByID(ctx context.Context, marketID string, start, end time.Time, pagination entities.Pagination) ([]entities.MarketData, entities.PageInfo, error)

func (*MarketData) GetFromDateByID

func (md *MarketData) GetFromDateByID(ctx context.Context, marketID string, start time.Time, pagination entities.Pagination) ([]entities.MarketData, entities.PageInfo, error)

func (*MarketData) GetMarketDataByID

func (md *MarketData) GetMarketDataByID(ctx context.Context, marketID string) (entities.MarketData, error)

func (*MarketData) GetMarketsData

func (md *MarketData) GetMarketsData(ctx context.Context) ([]entities.MarketData, error)

func (*MarketData) GetToDateByID

func (md *MarketData) GetToDateByID(ctx context.Context, marketID string, end time.Time, pagination entities.Pagination) ([]entities.MarketData, entities.PageInfo, error)

type Markets

type Markets struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewMarkets

func NewMarkets(connectionSource *ConnectionSource) *Markets

func (*Markets) GetAll

func (m *Markets) GetAll(ctx context.Context, pagination entities.OffsetPagination) ([]entities.Market, error)

func (*Markets) GetAllPaged

func (m *Markets) GetAllPaged(ctx context.Context, marketID string, pagination entities.CursorPagination) ([]entities.Market, entities.PageInfo, error)

func (*Markets) GetByID

func (m *Markets) GetByID(ctx context.Context, marketID string) (entities.Market, error)

func (*Markets) Upsert

func (m *Markets) Upsert(ctx context.Context, market *entities.Market) error

type NetworkLimits

type NetworkLimits struct {
	*ConnectionSource
}

func NewNetworkLimits

func NewNetworkLimits(connectionSource *ConnectionSource) *NetworkLimits

func (*NetworkLimits) Add

Add inserts a row into the network limits table. If a row with the same vega time exists, that row is updated instead. (i.e. there are multiple updates of the limits in one block, does occur).

func (*NetworkLimits) GetLatest

func (nl *NetworkLimits) GetLatest(ctx context.Context) (entities.NetworkLimits, error)

GetLatest returns the most recent network limits.

type NetworkParameters

type NetworkParameters struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewNetworkParameters

func NewNetworkParameters(connectionSource *ConnectionSource) *NetworkParameters

func (*NetworkParameters) Add

func (*NetworkParameters) GetAll

func (*NetworkParameters) GetByKey added in v0.57.0

type Node

type Node struct {
	*ConnectionSource
}

func NewNode

func NewNode(connectionSource *ConnectionSource) *Node

func (*Node) AddNodeAnnouncedEvent added in v0.55.0

func (store *Node) AddNodeAnnouncedEvent(ctx context.Context, nodeID string, vegatime time.Time, aux *entities.ValidatorUpdateAux) error

AddNodeAnnouncedEvent store data about which epoch a particular node was added or removed from the roster of validators.

func (*Node) GetNodeByID

func (store *Node) GetNodeByID(ctx context.Context, nodeID string, epochSeq uint64) (entities.Node, error)

func (*Node) GetNodeData

func (store *Node) GetNodeData(ctx context.Context, epochSeq uint64) (entities.NodeData, error)

func (*Node) GetNodes

func (store *Node) GetNodes(ctx context.Context, epochSeq uint64, pagination entities.CursorPagination) ([]entities.Node, entities.PageInfo, error)

func (*Node) UpdateEthereumAddress

func (store *Node) UpdateEthereumAddress(ctx context.Context, kr entities.EthereumKeyRotation) error

func (*Node) UpdatePublicKey

func (store *Node) UpdatePublicKey(ctx context.Context, kr *entities.KeyRotation) error

func (*Node) UpsertNode

func (store *Node) UpsertNode(ctx context.Context, node *entities.Node) error

func (*Node) UpsertRanking

func (store *Node) UpsertRanking(ctx context.Context, rs *entities.RankingScore, aux *entities.RankingScoreAux) error

func (*Node) UpsertScore

func (store *Node) UpsertScore(ctx context.Context, rs *entities.RewardScore, aux *entities.RewardScoreAux) error

type Notary

type Notary struct {
	*ConnectionSource
}

func NewNotary

func NewNotary(connectionSource *ConnectionSource) *Notary

func (*Notary) Add

func (n *Notary) Add(ctx context.Context, ns *entities.NodeSignature) error

func (*Notary) GetByResourceID

func (n *Notary) GetByResourceID(ctx context.Context, id string, pagination entities.CursorPagination) ([]entities.NodeSignature, entities.PageInfo, error)

type OracleData

type OracleData struct {
	*ConnectionSource
}

func NewOracleData

func NewOracleData(connectionSource *ConnectionSource) *OracleData

func (*OracleData) Add

func (od *OracleData) Add(ctx context.Context, oracleData *entities.OracleData) error

func (*OracleData) GetOracleDataBySpecID

func (od *OracleData) GetOracleDataBySpecID(ctx context.Context, id string, pagination entities.Pagination) ([]entities.OracleData, entities.PageInfo, error)

func (*OracleData) ListOracleData

func (od *OracleData) ListOracleData(ctx context.Context, pagination entities.Pagination) ([]entities.OracleData, entities.PageInfo, error)

type OracleSpec

type OracleSpec struct {
	*ConnectionSource
}

func NewOracleSpec

func NewOracleSpec(connectionSource *ConnectionSource) *OracleSpec

func (*OracleSpec) GetSpecByID

func (os *OracleSpec) GetSpecByID(ctx context.Context, specID string) (*entities.OracleSpec, error)

func (*OracleSpec) GetSpecs

func (os *OracleSpec) GetSpecs(ctx context.Context, pagination entities.OffsetPagination) ([]entities.DataSourceSpec, error)

func (*OracleSpec) GetSpecsWithCursorPagination

func (os *OracleSpec) GetSpecsWithCursorPagination(ctx context.Context, specID string, pagination entities.CursorPagination) (
	[]entities.OracleSpec, entities.PageInfo, error,
)

func (*OracleSpec) Upsert

func (os *OracleSpec) Upsert(ctx context.Context, spec *entities.OracleSpec) error

type Orders

type Orders struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewOrders

func NewOrders(connectionSource *ConnectionSource, _ *logging.Logger) *Orders

func (*Orders) Add

func (os *Orders) Add(o entities.Order) error

Add inserts an order update row into the database if an row for this (block time, order id, version) does not already exist; otherwise update the existing row with information supplied. Currently we only store the last update to an order per block, so the order history is not complete if multiple updates happen in one block.

func (*Orders) Flush

func (os *Orders) Flush(ctx context.Context) ([]entities.Order, error)

func (*Orders) GetAll

func (os *Orders) GetAll(ctx context.Context) ([]entities.Order, error)

GetAll returns all updates to all orders (including changes to orders that don't increment the version number).

func (*Orders) GetAllVersionsByOrderID

func (os *Orders) GetAllVersionsByOrderID(ctx context.Context, id string, p entities.OffsetPagination) ([]entities.Order, error)

GetAllVersionsByOrderID the last update to all versions (e.g. manual changes that lead to incrementing the version field) of a given order id.

func (*Orders) GetByMarket

func (os *Orders) GetByMarket(ctx context.Context, marketIDStr string, p entities.OffsetPagination) ([]entities.Order, error)

GetByMarket returns the last update of the all the orders in a particular market.

func (*Orders) GetByParty

func (os *Orders) GetByParty(ctx context.Context, partyIDStr string, p entities.OffsetPagination) ([]entities.Order, error)

GetByParty returns the last update of the all the orders in a particular party.

func (*Orders) GetByReference

func (os *Orders) GetByReference(ctx context.Context, reference string, p entities.OffsetPagination) ([]entities.Order, error)

GetByReference returns the last update of orders with the specified user-suppled reference.

func (*Orders) GetByReferencePaged

func (os *Orders) GetByReferencePaged(ctx context.Context, reference string, p entities.CursorPagination) ([]entities.Order, entities.PageInfo, error)

GetByReference returns the last update of orders with the specified user-suppled reference.

func (*Orders) GetLiveOrders

func (os *Orders) GetLiveOrders(ctx context.Context) ([]entities.Order, error)

GetLiveOrders fetches all currently live orders so the market depth data can be rebuilt from the orders data in the database.

func (*Orders) GetOrder

func (os *Orders) GetOrder(ctx context.Context, orderIDStr string, version *int32) (entities.Order, error)

GetByOrderId returns the last update of the order with the given ID.

func (*Orders) ListOrderVersions

func (os *Orders) ListOrderVersions(ctx context.Context, orderIDStr string, p entities.CursorPagination) ([]entities.Order, entities.PageInfo, error)

func (*Orders) ListOrders

func (os *Orders) ListOrders(ctx context.Context, party *string, market *string, reference *string, liveOnly bool, p entities.CursorPagination,
	dateRange entities.DateRange, orderFilter entities.OrderFilter,
) ([]entities.Order, entities.PageInfo, error)

type Parties

type Parties struct {
	*ConnectionSource
}

func NewParties

func NewParties(connectionSource *ConnectionSource) *Parties

func (*Parties) Add

func (ps *Parties) Add(ctx context.Context, p entities.Party) error

func (*Parties) GetAll

func (ps *Parties) GetAll(ctx context.Context) ([]entities.Party, error)

func (*Parties) GetAllPaged

func (ps *Parties) GetAllPaged(ctx context.Context, partyID string, pagination entities.CursorPagination) ([]entities.Party, entities.PageInfo, error)

func (*Parties) GetByID

func (ps *Parties) GetByID(ctx context.Context, id string) (entities.Party, error)

func (*Parties) Initialise

func (ps *Parties) Initialise(ctx context.Context)

Initialise adds the built-in 'network' party which is never explicitly sent on the event bus, but nonetheless is necessary.

type Positions

type Positions struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewPositions

func NewPositions(connectionSource *ConnectionSource) *Positions

func (*Positions) Add

func (ps *Positions) Add(ctx context.Context, p entities.Position) error

func (*Positions) Flush

func (ps *Positions) Flush(ctx context.Context) ([]entities.Position, error)

func (*Positions) GetAll

func (ps *Positions) GetAll(ctx context.Context) ([]entities.Position, error)

func (*Positions) GetByMarket

func (ps *Positions) GetByMarket(ctx context.Context, marketID string) ([]entities.Position, error)

func (*Positions) GetByMarketAndParty

func (ps *Positions) GetByMarketAndParty(ctx context.Context,
	marketIDRaw string,
	partyIDRaw string,
) (entities.Position, error)

func (*Positions) GetByParty

func (ps *Positions) GetByParty(ctx context.Context, partyID string) ([]entities.Position, error)

func (*Positions) GetByPartyConnection

func (ps *Positions) GetByPartyConnection(ctx context.Context, partyIDRaw string, marketIDRaw string, pagination entities.CursorPagination) ([]entities.Position, entities.PageInfo, error)

type Proposals

type Proposals struct {
	*ConnectionSource
}

func NewProposals

func NewProposals(connectionSource *ConnectionSource) *Proposals

func (*Proposals) Add

func (ps *Proposals) Add(ctx context.Context, p entities.Proposal) error

func (*Proposals) Get

func (ps *Proposals) Get(ctx context.Context,
	inState *entities.ProposalState,
	partyIDStr *string,
	proposalType *entities.ProposalType,
	pagination entities.CursorPagination,
) ([]entities.Proposal, entities.PageInfo, error)

func (*Proposals) GetByID

func (ps *Proposals) GetByID(ctx context.Context, id string) (entities.Proposal, error)

func (*Proposals) GetByReference

func (ps *Proposals) GetByReference(ctx context.Context, ref string) (entities.Proposal, error)

type ProtocolUpgradeProposals added in v0.61.0

type ProtocolUpgradeProposals struct {
	*ConnectionSource
}

func NewProtocolUpgradeProposals added in v0.61.0

func NewProtocolUpgradeProposals(connectionSource *ConnectionSource) *ProtocolUpgradeProposals

func (*ProtocolUpgradeProposals) Add added in v0.61.0

func (*ProtocolUpgradeProposals) List added in v0.61.0

type RetentionPolicy

type RetentionPolicy struct {
	HypertableOrCaggName string `` /* 134-byte string literal not displayed */
	DataRetentionPeriod  string `` /* 144-byte string literal not displayed */
}

type Rewards

type Rewards struct {
	*ConnectionSource
}

func NewRewards

func NewRewards(connectionSource *ConnectionSource) *Rewards

func (*Rewards) Add

func (rs *Rewards) Add(ctx context.Context, r entities.Reward) error

func (*Rewards) GetAll

func (rs *Rewards) GetAll(ctx context.Context) ([]entities.Reward, error)

func (*Rewards) GetByCursor

func (rs *Rewards) GetByCursor(ctx context.Context,
	partyIDHex *string,
	assetIDHex *string,
	pagination entities.CursorPagination,
) ([]entities.Reward, entities.PageInfo, error)

func (*Rewards) GetByOffset

func (rs *Rewards) GetByOffset(ctx context.Context,
	partyIDHex *string,
	assetIDHex *string,
	pagination *entities.OffsetPagination,
) ([]entities.Reward, error)

func (*Rewards) GetEpochSummaries added in v0.65.0

func (rs *Rewards) GetEpochSummaries(ctx context.Context,
	fromEpoch *uint64,
	toEpoch *uint64,
	pagination entities.CursorPagination,
) ([]entities.EpochRewardSummary, entities.PageInfo, error)

GetEpochSummaries returns paged epoch reward summary aggregated by asset, market, and reward type for a given range of epochs.

func (*Rewards) GetSummaries

func (rs *Rewards) GetSummaries(ctx context.Context,
	partyIDHex *string, assetIDHex *string,
) ([]entities.RewardSummary, error)

type RiskFactors

type RiskFactors struct {
	*ConnectionSource
}

func NewRiskFactors

func NewRiskFactors(connectionSource *ConnectionSource) *RiskFactors

func (*RiskFactors) GetMarketRiskFactors

func (rf *RiskFactors) GetMarketRiskFactors(ctx context.Context, marketID string) (entities.RiskFactor, error)

func (*RiskFactors) Upsert

func (rf *RiskFactors) Upsert(ctx context.Context, factor *entities.RiskFactor) error

type Sorting

type Sorting = string

type StakeLinking

type StakeLinking struct {
	*ConnectionSource
}

func NewStakeLinking

func NewStakeLinking(connectionSource *ConnectionSource) *StakeLinking

func (*StakeLinking) GetStake

func (*StakeLinking) Upsert

func (s *StakeLinking) Upsert(ctx context.Context, stake *entities.StakeLinking) error

type TableOrdering

type TableOrdering []ColumnOrdering

func (*TableOrdering) OrderByClause

func (t *TableOrdering) OrderByClause() string

func (*TableOrdering) Reversed

func (t *TableOrdering) Reversed() TableOrdering

type Trades

type Trades struct {
	*ConnectionSource
	// contains filtered or unexported fields
}

func NewTrades

func NewTrades(connectionSource *ConnectionSource) *Trades

func (*Trades) Add

func (ts *Trades) Add(t *entities.Trade) error

func (*Trades) Flush

func (ts *Trades) Flush(ctx context.Context) ([]*entities.Trade, error)

func (*Trades) GetByMarket

func (ts *Trades) GetByMarket(ctx context.Context, market string, p entities.OffsetPagination) ([]entities.Trade, error)

func (*Trades) GetByOrderID

func (ts *Trades) GetByOrderID(ctx context.Context, order string, market *string, pagination entities.OffsetPagination) ([]entities.Trade, error)

func (*Trades) GetByParty

func (ts *Trades) GetByParty(ctx context.Context, party string, market *string, pagination entities.OffsetPagination) ([]entities.Trade, error)

func (*Trades) List

func (ts *Trades) List(ctx context.Context,
	marketID entities.MarketID,
	partyID entities.PartyID,
	orderID entities.OrderID,
	pagination entities.CursorPagination,
	dateRange entities.DateRange,
) ([]entities.Trade, entities.PageInfo, error)

type Transfers

type Transfers struct {
	*ConnectionSource
}

func NewTransfers

func NewTransfers(connectionSource *ConnectionSource) *Transfers

func (*Transfers) GetAll

func (*Transfers) GetTransfersFromAccount

func (t *Transfers) GetTransfersFromAccount(ctx context.Context, accountID entities.AccountID, pagination entities.CursorPagination) ([]entities.Transfer,
	entities.PageInfo, error,
)

func (*Transfers) GetTransfersFromParty

func (t *Transfers) GetTransfersFromParty(ctx context.Context, partyID entities.PartyID, pagination entities.CursorPagination) ([]entities.Transfer,
	entities.PageInfo, error,
)

func (*Transfers) GetTransfersToAccount

func (t *Transfers) GetTransfersToAccount(ctx context.Context, accountID entities.AccountID, pagination entities.CursorPagination) ([]entities.Transfer,
	entities.PageInfo, error,
)

func (*Transfers) GetTransfersToOrFromParty

func (t *Transfers) GetTransfersToOrFromParty(ctx context.Context, partyID entities.PartyID, pagination entities.CursorPagination) ([]entities.Transfer,
	entities.PageInfo, error,
)

func (*Transfers) GetTransfersToParty

func (t *Transfers) GetTransfersToParty(ctx context.Context, partyID entities.PartyID, pagination entities.CursorPagination) ([]entities.Transfer, entities.PageInfo,
	error,
)

func (*Transfers) Upsert

func (t *Transfers) Upsert(ctx context.Context, transfer *entities.Transfer) error

type Votes

type Votes struct {
	*ConnectionSource
}

func NewVotes

func NewVotes(connectionSource *ConnectionSource) *Votes

func (*Votes) Add

func (vs *Votes) Add(ctx context.Context, v entities.Vote) error

func (*Votes) Get

func (vs *Votes) Get(ctx context.Context,
	proposalIDStr *string,
	partyIDStr *string,
	value *entities.VoteValue,
) ([]entities.Vote, error)

func (*Votes) GetByParty

func (vs *Votes) GetByParty(ctx context.Context, partyIDStr string) ([]entities.Vote, error)

func (*Votes) GetByPartyConnection

func (vs *Votes) GetByPartyConnection(ctx context.Context, partyIDStr string, pagination entities.CursorPagination) ([]entities.Vote, entities.PageInfo, error)

func (*Votes) GetConnection added in v0.63.0

func (vs *Votes) GetConnection(
	ctx context.Context,
	proposalIDStr, partyIDStr *string,
	pagination entities.CursorPagination,
) ([]entities.Vote, entities.PageInfo, error)

func (*Votes) GetNoVotesForProposal

func (vs *Votes) GetNoVotesForProposal(ctx context.Context, proposalIDStr string) ([]entities.Vote, error)

func (*Votes) GetYesVotesForProposal

func (vs *Votes) GetYesVotesForProposal(ctx context.Context, proposalIDStr string) ([]entities.Vote, error)

type Withdrawals

type Withdrawals struct {
	*ConnectionSource
}

func NewWithdrawals

func NewWithdrawals(connectionSource *ConnectionSource) *Withdrawals

func (*Withdrawals) GetByID

func (w *Withdrawals) GetByID(ctx context.Context, withdrawalID string) (entities.Withdrawal, error)

func (*Withdrawals) GetByParty

func (w *Withdrawals) GetByParty(ctx context.Context, partyID string, openOnly bool, pagination entities.Pagination, dateRange entities.DateRange) (
	[]entities.Withdrawal, entities.PageInfo, error,
)

func (*Withdrawals) Upsert

func (w *Withdrawals) Upsert(ctx context.Context, withdrawal *entities.Withdrawal) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL