torch

package
v0.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Hash

func Hash(input string) string

Hash is a helper function that returns the MD5 hash of the input if present and an empty string otherwise.

func StringComputer

func StringComputer[T any](inField, outField string, fn func(ctx *Context, in string) (T, error)) func(ctx *Context) error

StringComputer constructs a compute function for the provided string input and generic output field. If the input string is empty, the output field will be set to the zero value of the generic type.

func StringHasher

func StringHasher(field string) func(model coal.Model) string

StringHasher constructs a hasher function for the provided string field.

func Test

func Test(store *coal.Store, operation *Operation, fn func(env Env))

Test will create and yield a testing environment for the specified operation.

Types

type Check

type Check struct {
	// The field or tag name.
	Name string

	// The model to check.
	Model coal.Model

	// The check interval.
	Interval time.Duration

	// The check jitter as a factor between 0 and 1.
	Jitter float64

	// The check handler.
	Handler func(ctx *Context) error
}

Check defines a periodic model check.

func (*Check) Deadline

func (c *Check) Deadline() time.Time

Deadline will return a deadline for a query or filter that has the configured jitter already applied.

type Computation

type Computation struct {
	// The status field name.
	Name string

	// The model.
	Model coal.Model

	// The interval at which the value is checked for changes.
	RehashInterval time.Duration

	// The interval a which the value is recomputed regardless if the input
	// is the same.
	RecomputeInterval time.Duration

	// Hasher returns a hash of the input that is used to determine whether the
	// computation needed. An absent input is indicated by an empty string.
	Hasher func(model coal.Model) string

	// The computation handler.
	Computer func(ctx *Context) error

	// The release handler is called to release an invalidated value
	// synchronously. Otherwise, a computation is scheduled to release the
	// value asynchronously.
	Releaser func(ctx *Context) error

	// Whether outdated values should be kept until the new value is computed.
	// Otherwise, values are released synchronously.
	KeepOutdated bool
}

Computation defines a computation.

type Context

type Context struct {
	// The parent context.
	context.Context

	// The operated model.
	Model coal.Model

	// The final update document.
	Update bson.M

	// Whether the operation is executed synchronously.
	Sync bool

	// A flag that may be set by the handler to indicate that the operation has
	// not yet been fully processed and the handler should be called again
	// sometime later. If a synchronous operation is deferred, it will always be
	// retried asynchronously.
	Defer bool

	// The executed operation.
	Operation *Operation

	// The executed check.
	Check *Check

	// The executed computation.
	Computation *Computation

	// The function used to report progress during a computation.
	Progress func(factor float64) error

	// The reactor, store and queue.
	Reactor *Reactor
	Store   *coal.Store
	Queue   *axe.Queue

	// The async context.
	AsyncContext *axe.Context
}

Context holds context information for a reactor operation.

func (*Context) Change

func (c *Context) Change(op, key string, val interface{})

Change will record a change to the update document.

type Env

type Env struct {
	*roast.Tester
	Queue     *axe.Queue
	Reactor   *Reactor
	Operation *Operation
}

Env is a testing environment.

func (*Env) Process

func (e *Env) Process(model coal.Model) error

Process will enqueue and await a process job for the tested operation and specified model.

func (*Env) Scan

func (e *Env) Scan() (int, error)

Scan will queue and await a scan for the tested operation. It will return the number of queued and executed process jobs.

type Operation

type Operation struct {
	// A unique name.
	Name string

	// The model.
	Model coal.Model

	// The query used to find potential models to process.
	Query func() bson.M

	// The filter function that decides whether a model should be processed.
	Filter func(model coal.Model) bool

	// The function called to process a model.
	Processor func(ctx *Context) error

	// The operation is executed synchronously during the modifier callback and
	// when checked directly.
	Sync bool

	// The maximum number of models loaded during a single scan.
	//
	// Default: 100.
	ScanBatch int

	// The time after which an asynchronous operation fails (lifetime) and is
	// retried (timeout).
	//
	// Default: 5m, 10m.
	ProcessLifetime time.Duration
	ProcessTimeout  time.Duration

	// The maximum delay up to which a process may be deferred. Beyond this
	// limit, the process is aborted and may be picked up by the next scan
	// depending on the configured query.
	//
	// Default: 1m.
	MaxDeferDelay time.Duration

	// The tag name used to track the number of outstanding operations.
	//
	// Default: "torch/Reactor/<Name>".
	TagName string

	// The tag expiry time.
	//
	// Default: 24h.
	TagExpiry time.Duration
}

Operation defines a reactor operation.

func CheckField

func CheckField(check Check) *Operation

CheckField will return an operation that runs the provided check for the specified model and timestamp field. The timestamp field is automatically updated with the latest check time. It may be nilled or zeroed to force the check to run again.

func CheckTag

func CheckTag(check Check) *Operation

CheckTag will return an operation that runs the provided check function for the specified model and timestamp tag. The timestamp tag is automatically updated with the latest check time. It may be nilled or zeroed to force the check to run again.

func Compute

func Compute(comp Computation) *Operation

Compute will return an operation that automatically runs the provided asynchronous computation. During a check/modifier call, the hash of the input is taken to determine if the values needs to be computed. During a scan the computation is only invoked when the status is missing, invalid or outdated. To force a computation in both cases, the status can be flagged as invalid.

If no releaser is configured, the computer is also invoked asynchronously to compute the value for a zero input (zero hash). If a releaser is configured, it is invoked instead synchronously to release (clear) the current value. Optionally, the outdated value can be kept until the new value is computed.

func (*Operation) Validate

func (o *Operation) Validate() error

Validate will validate the operation.

type ProcessJob

type ProcessJob struct {
	axe.Base  `json:"-" axe:"torch/process"`
	Operation string  `json:"operation"`
	Model     coal.ID `json:"model"`
}

ProcessJob defines a job that processes a single operation.

func NewProcessJob

func NewProcessJob(operation string, model coal.ID) *ProcessJob

NewProcessJob creates and returns a new ProcessJob with a label.

func (*ProcessJob) Validate

func (j *ProcessJob) Validate() error

Validate implements the axe.Job interface.

type Reactor

type Reactor struct {
	// contains filtered or unexported fields
}

Reactor organizes the execution of operations based on model events (via modifier callback), direct checks or database scans (via periodic jobs). The reactor will ensure that only one operation of the same type per model is executed at the same. Outstanding operations are tracked using a tag on the model and are guaranteed to be executed eventually until the tag expires.

func NewReactor

func NewReactor(store *coal.Store, queue *axe.Queue, operations ...*Operation) *Reactor

NewReactor creates and returns a new reactor.

func (*Reactor) Check

func (r *Reactor) Check(ctx context.Context, model coal.Model) error

Check will check the provided model and enqueue a job if processing is necessary or if Operation.Sync is enabled perform the operation directly.

Note: As the method may mutate the model, the caller must arrange for the model to be persisted.

func (*Reactor) Modifier

func (r *Reactor) Modifier() *fire.Callback

Modifier returns a callback that will run Check on created and updated models.

func (*Reactor) ProcessTask

func (r *Reactor) ProcessTask() *axe.Task

ProcessTask will return the process task.

func (*Reactor) ScanTask

func (r *Reactor) ScanTask() *axe.Task

ScanTask will return the scan task.

type Registry

type Registry struct {
	*stick.Registry[*Operation]
}

Registry is a collection of known operations.

func NewRegistry

func NewRegistry(operations ...*Operation) *Registry

NewRegistry will return an operation registry indexed by name.

type ScanJob

type ScanJob struct {
	axe.Base  `json:"-" axe:"torch/scan"`
	Operation string `json:"operation"`
}

ScanJob defines a job that scans for due operations.

func NewScanJob

func NewScanJob(operation string) *ScanJob

NewScanJob creates and returns a new ScanJob with a label.

func (*ScanJob) Validate

func (j *ScanJob) Validate() error

Validate implements the axe.Job interface.

type Status

type Status struct {
	Progress float64   `json:"progress"`
	Updated  time.Time `json:"updated"`
	Hash     string    `json:"hash"`
	Valid    bool      `json:"valid"`
}

Status defines the status of a computation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL