coordinator

package
v0.7.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NoHeadBlockWaitTime is the amount of
	// time we wait when no blocks have been
	// synced.
	NoHeadBlockWaitTime = 1 * time.Second

	// NoJobsWaitTime is the amount of time
	// we wait when no jobs are available
	// to process.
	NoJobsWaitTime = 10 * time.Second
)

Variables

View Source
var (
	// ErrJobsUnretrievable is returned when an error
	// is returned when querying for jobs.
	ErrJobsUnretrievable = errors.New("unable to retrieve jobs")

	// ErrBroadcastsUnretrievable is returned when an error
	// is returned when querying for broadcasts.
	ErrBroadcastsUnretrievable = errors.New("unable to retrieve broadcasts")

	// ErrNoAvailableJobs is returned when it is not possible
	// to process any jobs. If this is returned, you should wait
	// and retry.
	ErrNoAvailableJobs = errors.New("no jobs available")

	// ErrReturnFundsComplete is returned when it is not possible
	// to process any more ReturnFundsWorkflows or when there is no provided
	// ReturnsFundsWorkflow.
	ErrReturnFundsComplete = errors.New("return funds complete")

	// ErrJobMissing is returned when the coordinator is invoked with
	// a broadcast complete call but the job that is affected does
	// not exist.
	ErrJobMissing = errors.New("job missing")

	// ErrDuplicateWorkflows is returned when 2 Workflows with the same name
	// are provided as an input to NewCoordinator.
	ErrDuplicateWorkflows = errors.New("duplicate workflows")

	// ErrIncorrectConcurrency is returned when CreateAccount or RequestFunds
	// have a concurrency greater than 1.
	ErrIncorrectConcurrency = errors.New("incorrect concurrency")

	// ErrInvalidConcurrency is returned when the concurrency of a Workflow
	// is <= 0.
	ErrInvalidConcurrency = errors.New("invalid concurrency")

	// ErrStalled is returned when the caller does not define
	// a CreateAccount and/or RequestFunds workflow and we run out
	// of available options (i.e. we can't do anything).
	ErrStalled = errors.New("processing stalled")

	// ErrNoWorkflows is returned when no workflows are provided
	// during initialization.
	ErrNoWorkflows = errors.New("no workflows")
)

Functions

This section is empty.

Types

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator faciliates the creation and processing of jobs.

func New

func New(
	storage JobStorage,
	helper Helper,
	handler Handler,
	parser *parser.Parser,
	inputWorkflows []*job.Workflow,
) (*Coordinator, error)

New parses a slice of input Workflows and creates a new *Coordinator.

func (*Coordinator) BroadcastComplete

func (c *Coordinator) BroadcastComplete(
	ctx context.Context,
	dbTx database.Transaction,
	jobIdentifier string,
	transaction *types.Transaction,
) error

BroadcastComplete is called by the broadcast coordinator when a transaction broadcast has completed. If the transaction is nil, then the transaction did not succeed.

func (*Coordinator) Process

func (c *Coordinator) Process(
	ctx context.Context,
) error

Process creates and executes jobs until failure.

func (*Coordinator) ReturnFunds

func (c *Coordinator) ReturnFunds(
	ctx context.Context,
) error

ReturnFunds attempts to execute the ReturnFunds workflow until it is no longer satisfiable. This is typically called on shutdown to return funds to a faucet.

type Handler

type Handler interface {
	TransactionCreated(
		context.Context,
		string,
		*types.TransactionIdentifier,
	) error
}

Handler is an interface called by the coordinator whenever an address is created or a transaction is created.

type Helper

type Helper interface {
	// HeadBlockExists returns a boolean indicating if a block
	// has been synced by BlockStorage.
	HeadBlockExists(context.Context) bool

	// DatabaseTransaction returns a new database.Transaction.
	// This is used to update jobs and enque them for broadcast atomically.
	DatabaseTransaction(context.Context) database.Transaction

	// StoreKey is called to persist a
	// *types.AccountIdentifier + KeyPair.
	StoreKey(
		context.Context,
		database.Transaction,
		*types.AccountIdentifier,
		*keys.KeyPair,
	) error

	// GetKey is called to get the *types.KeyPair
	// associated with an address.
	GetKey(
		context.Context,
		database.Transaction,
		*types.AccountIdentifier,
	) (*keys.KeyPair, error)

	// AllAccounts returns a slice of all known *types.AccountIdentifier.
	AllAccounts(
		context.Context,
		database.Transaction,
	) ([]*types.AccountIdentifier, error)

	// LockedAccounts is a slice of all *types.AccountIdentifier currently sending or receiving
	// funds.
	LockedAccounts(
		context.Context,
		database.Transaction,
	) ([]*types.AccountIdentifier, error)

	// Balance returns the balance
	// for a provided address.
	Balance(
		context.Context,
		database.Transaction,
		*types.AccountIdentifier,
		*types.Currency,
	) (*types.Amount, error)

	// Coins returns all *types.Coin owned by an address.
	Coins(
		context.Context,
		database.Transaction,
		*types.AccountIdentifier,
		*types.Currency,
	) ([]*types.Coin, error)

	// BroadcastAll broadcasts all transactions considered ready for
	// broadcast (unbroadcasted or stale).
	BroadcastAll(context.Context) error

	// Broadcast enqueues a particular intent for broadcast.
	Broadcast(
		context.Context,
		database.Transaction,
		string,
		*types.NetworkIdentifier,
		[]*types.Operation,
		*types.TransactionIdentifier,
		string,
		int64,
	) error

	// Derive returns a new *types.AccountIdentifier for a provided publicKey.
	Derive(
		context.Context,
		*types.NetworkIdentifier,
		*types.PublicKey,
		map[string]interface{},
	) (*types.AccountIdentifier, map[string]interface{}, error)

	// Preprocess calls the /construction/preprocess endpoint
	// on an offline node.
	Preprocess(
		context.Context,
		*types.NetworkIdentifier,
		[]*types.Operation,
		map[string]interface{},
	) (map[string]interface{}, []*types.AccountIdentifier, error)

	// Metadata calls the /construction/metadata endpoint
	// using the online node.
	Metadata(
		context.Context,
		*types.NetworkIdentifier,
		map[string]interface{},
		[]*types.PublicKey,
	) (map[string]interface{}, []*types.Amount, error)

	// Payloads calls the /construction/payloads endpoint
	// using the offline node.
	Payloads(
		context.Context,
		*types.NetworkIdentifier,
		[]*types.Operation,
		map[string]interface{},
		[]*types.PublicKey,
	) (string, []*types.SigningPayload, error)

	// Parse calls the /construction/parse endpoint
	// using the offline node.
	Parse(
		context.Context,
		*types.NetworkIdentifier,
		bool,
		string,
	) ([]*types.Operation, []*types.AccountIdentifier, map[string]interface{}, error)

	// Combine calls the /construction/combine endpoint
	// using the offline node.
	Combine(
		context.Context,
		*types.NetworkIdentifier,
		string,
		[]*types.Signature,
	) (string, error)

	// Hash calls the /construction/hash endpoint
	// using the offline node.
	Hash(
		context.Context,
		*types.NetworkIdentifier,
		string,
	) (*types.TransactionIdentifier, error)

	// Sign returns signatures for the provided
	// payloads.
	Sign(
		context.Context,
		[]*types.SigningPayload,
	) ([]*types.Signature, error)

	// SetBlob transactionally persists
	// a key and value.
	SetBlob(
		ctx context.Context,
		dbTx database.Transaction,
		key string,
		value []byte,
	) error

	// GetBlob transactionally retrieves
	// a key and value.
	GetBlob(
		ctx context.Context,
		dbTx database.Transaction,
		key string,
	) (bool, []byte, error)
}

Helper is used by the coordinator to process Jobs. It is a superset of functions required by the constructor/worker.Helper.

type JobStorage

type JobStorage interface {
	// Ready returns the jobs that are ready to be processed.
	Ready(
		context.Context,
		database.Transaction,
	) ([]*job.Job, error)

	// Broadcasting returns all jobs that are broadcasting.
	Broadcasting(
		context.Context,
		database.Transaction,
	) ([]*job.Job, error)

	// Processing returns the number of jobs processing
	// for a particular workflow.
	Processing(
		context.Context,
		database.Transaction,
		string,
	) ([]*job.Job, error)

	// Update stores an updated *Job in storage
	// and returns its UUID (which won't exist
	// on first update).
	Update(context.Context, database.Transaction, *job.Job) (string, error)

	// Get fetches a *Job by Identifier. It returns an error
	// if the identifier doesn't exist.
	Get(context.Context, database.Transaction, string) (*job.Job, error)
}

JobStorage allows for the persistent and transactional storage of Jobs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL