Documentation ¶
Index ¶
Constants ¶
const ( // NoHeadBlockWaitTime is the amount of // time we wait when no blocks have been // synced. NoHeadBlockWaitTime = 1 * time.Second // NoJobsWaitTime is the amount of time // we wait when no jobs are available // to process. NoJobsWaitTime = 10 * time.Second )
Variables ¶
var ( // ErrJobsUnretrievable is returned when an error // is returned when querying for jobs. ErrJobsUnretrievable = errors.New("unable to retrieve jobs") // ErrBroadcastsUnretrievable is returned when an error // is returned when querying for broadcasts. ErrBroadcastsUnretrievable = errors.New("unable to retrieve broadcasts") // ErrNoAvailableJobs is returned when it is not possible // to process any jobs. If this is returned, you should wait // and retry. ErrNoAvailableJobs = errors.New("no jobs available") // ErrReturnFundsComplete is returned when it is not possible // to process any more ReturnFundsWorkflows or when there is no provided // ReturnsFundsWorkflow. ErrReturnFundsComplete = errors.New("return funds complete") // ErrJobMissing is returned when the coordinator is invoked with // a broadcast complete call but the job that is affected does // not exist. ErrJobMissing = errors.New("job missing") // ErrDuplicateWorkflows is returned when 2 Workflows with the same name // are provided as an input to NewCoordinator. ErrDuplicateWorkflows = errors.New("duplicate workflows") // ErrIncorrectConcurrency is returned when CreateAccount or RequestFunds // have a concurrency greater than 1. ErrIncorrectConcurrency = errors.New("incorrect concurrency") // ErrInvalidConcurrency is returned when the concurrency of a Workflow // is <= 0. ErrInvalidConcurrency = errors.New("invalid concurrency") // ErrStalled is returned when the caller does not define // a CreateAccount and/or RequestFunds workflow and we run out // of available options (i.e. we can't do anything). ErrStalled = errors.New("processing stalled") // ErrNoWorkflows is returned when no workflows are provided // during initialization. ErrNoWorkflows = errors.New("no workflows") )
Functions ¶
This section is empty.
Types ¶
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator faciliates the creation and processing of jobs.
func New ¶
func New( storage JobStorage, helper Helper, handler Handler, parser *parser.Parser, inputWorkflows []*job.Workflow, ) (*Coordinator, error)
New parses a slice of input Workflows and creates a new *Coordinator.
func (*Coordinator) BroadcastComplete ¶
func (c *Coordinator) BroadcastComplete( ctx context.Context, dbTx database.Transaction, jobIdentifier string, transaction *types.Transaction, ) error
BroadcastComplete is called by the broadcast coordinator when a transaction broadcast has completed. If the transaction is nil, then the transaction did not succeed.
func (*Coordinator) Process ¶
func (c *Coordinator) Process( ctx context.Context, ) error
Process creates and executes jobs until failure.
func (*Coordinator) ReturnFunds ¶
func (c *Coordinator) ReturnFunds( ctx context.Context, ) error
ReturnFunds attempts to execute the ReturnFunds workflow until it is no longer satisfiable. This is typically called on shutdown to return funds to a faucet.
type Handler ¶
type Handler interface { TransactionCreated( context.Context, string, *types.TransactionIdentifier, ) error }
Handler is an interface called by the coordinator whenever an address is created or a transaction is created.
type Helper ¶
type Helper interface { // HeadBlockExists returns a boolean indicating if a block // has been synced by BlockStorage. HeadBlockExists(context.Context) bool // DatabaseTransaction returns a new database.Transaction. // This is used to update jobs and enque them for broadcast atomically. DatabaseTransaction(context.Context) database.Transaction // StoreKey is called to persist a // *types.AccountIdentifier + KeyPair. StoreKey( context.Context, database.Transaction, *types.AccountIdentifier, *keys.KeyPair, ) error // GetKey is called to get the *types.KeyPair // associated with an address. GetKey( context.Context, database.Transaction, *types.AccountIdentifier, ) (*keys.KeyPair, error) // AllAccounts returns a slice of all known *types.AccountIdentifier. AllAccounts( context.Context, database.Transaction, ) ([]*types.AccountIdentifier, error) // LockedAccounts is a slice of all *types.AccountIdentifier currently sending or receiving // funds. LockedAccounts( context.Context, database.Transaction, ) ([]*types.AccountIdentifier, error) // Balance returns the balance // for a provided address. Balance( context.Context, database.Transaction, *types.AccountIdentifier, *types.Currency, ) (*types.Amount, error) // Coins returns all *types.Coin owned by an address. Coins( context.Context, database.Transaction, *types.AccountIdentifier, *types.Currency, ) ([]*types.Coin, error) // BroadcastAll broadcasts all transactions considered ready for // broadcast (unbroadcasted or stale). BroadcastAll(context.Context) error // Broadcast enqueues a particular intent for broadcast. Broadcast( context.Context, database.Transaction, string, *types.NetworkIdentifier, []*types.Operation, *types.TransactionIdentifier, string, int64, ) error // Derive returns a new *types.AccountIdentifier for a provided publicKey. Derive( context.Context, *types.NetworkIdentifier, *types.PublicKey, map[string]interface{}, ) (*types.AccountIdentifier, map[string]interface{}, error) // Preprocess calls the /construction/preprocess endpoint // on an offline node. Preprocess( context.Context, *types.NetworkIdentifier, []*types.Operation, map[string]interface{}, ) (map[string]interface{}, []*types.AccountIdentifier, error) // Metadata calls the /construction/metadata endpoint // using the online node. Metadata( context.Context, *types.NetworkIdentifier, map[string]interface{}, []*types.PublicKey, ) (map[string]interface{}, []*types.Amount, error) // Payloads calls the /construction/payloads endpoint // using the offline node. Payloads( context.Context, *types.NetworkIdentifier, []*types.Operation, map[string]interface{}, []*types.PublicKey, ) (string, []*types.SigningPayload, error) // Parse calls the /construction/parse endpoint // using the offline node. Parse( context.Context, *types.NetworkIdentifier, bool, string, ) ([]*types.Operation, []*types.AccountIdentifier, map[string]interface{}, error) // Combine calls the /construction/combine endpoint // using the offline node. Combine( context.Context, *types.NetworkIdentifier, string, []*types.Signature, ) (string, error) // Hash calls the /construction/hash endpoint // using the offline node. Hash( context.Context, *types.NetworkIdentifier, string, ) (*types.TransactionIdentifier, error) // Sign returns signatures for the provided // payloads. Sign( context.Context, []*types.SigningPayload, ) ([]*types.Signature, error) // SetBlob transactionally persists // a key and value. SetBlob( ctx context.Context, dbTx database.Transaction, key string, value []byte, ) error // GetBlob transactionally retrieves // a key and value. GetBlob( ctx context.Context, dbTx database.Transaction, key string, ) (bool, []byte, error) }
Helper is used by the coordinator to process Jobs. It is a superset of functions required by the constructor/worker.Helper.
type JobStorage ¶
type JobStorage interface { // Ready returns the jobs that are ready to be processed. Ready( context.Context, database.Transaction, ) ([]*job.Job, error) // Broadcasting returns all jobs that are broadcasting. Broadcasting( context.Context, database.Transaction, ) ([]*job.Job, error) // Processing returns the number of jobs processing // for a particular workflow. Processing( context.Context, database.Transaction, string, ) ([]*job.Job, error) // Update stores an updated *Job in storage // and returns its UUID (which won't exist // on first update). Update(context.Context, database.Transaction, *job.Job) (string, error) // Get fetches a *Job by Identifier. It returns an error // if the identifier doesn't exist. Get(context.Context, database.Transaction, string) (*job.Job, error) }
JobStorage allows for the persistent and transactional storage of Jobs.