Documentation ¶
Index ¶
- func Hash(input string) string
- func StringComputer[T any](inField, outField string, fn func(ctx *Context, in string) (T, error)) func(ctx *Context) error
- func StringHasher(field string) func(model coal.Model) string
- func Test(store *coal.Store, operation *Operation, fn func(env Env))
- type Check
- type Computation
- type Context
- type Env
- type Operation
- type ProcessJob
- type Reactor
- type Registry
- type ScanJob
- type Status
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Hash ¶
Hash is a helper function that returns the MD5 hash of the input if present and an empty string otherwise.
func StringComputer ¶
func StringComputer[T any](inField, outField string, fn func(ctx *Context, in string) (T, error)) func(ctx *Context) error
StringComputer constructs a compute function for the provided string input and generic output field. If the input string is empty, the output field will be set to the zero value of the generic type.
func StringHasher ¶
StringHasher constructs a hasher function for the provided string field.
Types ¶
type Check ¶
type Check struct { // The field or tag name. Name string // The model to check. Model coal.Model // The check interval. Interval time.Duration // The check jitter as a factor between 0 and 1. Jitter float64 // The check handler. Handler func(ctx *Context) error }
Check defines a periodic model check.
type Computation ¶
type Computation struct { // The status field name. Name string // The model. Model coal.Model // The interval at which the value is checked for changes. RehashInterval time.Duration // The interval a which the value is recomputed regardless if the input // is the same. RecomputeInterval time.Duration // Hasher returns a hash of the input that is used to determine whether the // computation needed. An absent input is indicated by an empty string. Hasher func(model coal.Model) string // The computation handler. Computer func(ctx *Context) error // The release handler is called to release an invalidated value // synchronously. Otherwise, a computation is scheduled to release the // value asynchronously. Releaser func(ctx *Context) error // Whether outdated values should be kept until the new value is computed. // Otherwise, values are released synchronously. KeepOutdated bool }
Computation defines a computation.
type Context ¶
type Context struct { // The parent context. context.Context // The operated model. Model coal.Model // The final update document. Update bson.M // Whether the operation is executed synchronously. Sync bool // A flag that may be set by the handler to indicate that the operation has // not yet been fully processed and the handler should be called again // sometime later. If a synchronous operation is deferred, it will always be // retried asynchronously. Defer bool // The executed operation. Operation *Operation // The executed check. Check *Check // The executed computation. Computation *Computation // The function used to report progress during a computation. Progress func(factor float64) error // The reactor, store and queue. Reactor *Reactor Store *coal.Store Queue *axe.Queue // The async context. AsyncContext *axe.Context }
Context holds context information for a reactor operation.
type Env ¶
Env is a testing environment.
type Operation ¶
type Operation struct { // A unique name. Name string // The model. Model coal.Model // The query used to find potential models to process. Query func() bson.M // The filter function that decides whether a model should be processed. Filter func(model coal.Model) bool // The function called to process a model. Processor func(ctx *Context) error // The operation is executed synchronously during the modifier callback and // when checked directly. Sync bool // The maximum number of models loaded during a single scan. // // Default: 100. ScanBatch int // The time after which an asynchronous operation fails (lifetime) and is // retried (timeout). // // Default: 5m, 10m. ProcessLifetime time.Duration ProcessTimeout time.Duration // The maximum delay up to which a process may be deferred. Beyond this // limit, the process is aborted and may be picked up by the next scan // depending on the configured query. // // Default: 1m. MaxDeferDelay time.Duration // The tag name used to track the number of outstanding operations. // // Default: "torch/Reactor/<Name>". TagName string // The tag expiry time. // // Default: 24h. TagExpiry time.Duration }
Operation defines a reactor operation.
func CheckField ¶
CheckField will return an operation that runs the provided check for the specified model and timestamp field. The timestamp field is automatically updated with the latest check time. It may be nilled or zeroed to force the check to run again.
func CheckTag ¶
CheckTag will return an operation that runs the provided check function for the specified model and timestamp tag. The timestamp tag is automatically updated with the latest check time. It may be nilled or zeroed to force the check to run again.
func Compute ¶
func Compute(comp Computation) *Operation
Compute will return an operation that automatically runs the provided asynchronous computation. During a check/modifier call, the hash of the input is taken to determine if the values needs to be computed. During a scan the computation is only invoked when the status is missing, invalid or outdated. To force a computation in both cases, the status can be flagged as invalid.
If no releaser is configured, the computer is also invoked asynchronously to compute the value for a zero input (zero hash). If a releaser is configured, it is invoked instead synchronously to release (clear) the current value. Optionally, the outdated value can be kept until the new value is computed.
type ProcessJob ¶
type ProcessJob struct { axe.Base `json:"-" axe:"torch/process"` Operation string `json:"operation"` Model coal.ID `json:"model"` }
ProcessJob defines a job that processes a single operation.
func NewProcessJob ¶
func NewProcessJob(operation string, model coal.ID) *ProcessJob
NewProcessJob creates and returns a new ProcessJob with a label.
func (*ProcessJob) Validate ¶
func (j *ProcessJob) Validate() error
Validate implements the axe.Job interface.
type Reactor ¶
type Reactor struct {
// contains filtered or unexported fields
}
Reactor organizes the execution of operations based on model events (via modifier callback), direct checks or database scans (via periodic jobs). The reactor will ensure that only one operation of the same type per model is executed at the same. Outstanding operations are tracked using a tag on the model and are guaranteed to be executed eventually until the tag expires.
func NewReactor ¶
NewReactor creates and returns a new reactor.
func (*Reactor) Check ¶
Check will check the provided model and enqueue a job if processing is necessary or if Operation.Sync is enabled perform the operation directly.
Note: As the method may mutate the model, the caller must arrange for the model to be persisted.
func (*Reactor) Modifier ¶
Modifier returns a callback that will run Check on created and updated models.
func (*Reactor) ProcessTask ¶
ProcessTask will return the process task.
type Registry ¶
Registry is a collection of known operations.
func NewRegistry ¶
NewRegistry will return an operation registry indexed by name.
type ScanJob ¶
ScanJob defines a job that scans for due operations.
func NewScanJob ¶
NewScanJob creates and returns a new ScanJob with a label.