Documentation ¶
Overview ¶
Package ratus contains data models and a client library for Go applications.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) DeletePromise(ctx context.Context, id string) (*Deleted, error)
- func (c *Client) DeletePromises(ctx context.Context, topic string) (*Deleted, error)
- func (c *Client) DeleteTask(ctx context.Context, id string) (*Deleted, error)
- func (c *Client) DeleteTasks(ctx context.Context, topic string) (*Deleted, error)
- func (c *Client) DeleteTopic(ctx context.Context, topic string) (*Deleted, error)
- func (c *Client) DeleteTopics(ctx context.Context) (*Deleted, error)
- func (c *Client) GetLiveness(ctx context.Context) error
- func (c *Client) GetPromise(ctx context.Context, id string) (*Promise, error)
- func (c *Client) GetReadiness(ctx context.Context) error
- func (c *Client) GetTask(ctx context.Context, id string) (*Task, error)
- func (c *Client) GetTopic(ctx context.Context, topic string) (*Topic, error)
- func (c *Client) InsertPromise(ctx context.Context, p *Promise) (*Task, error)
- func (c *Client) InsertTask(ctx context.Context, t *Task) (*Updated, error)
- func (c *Client) InsertTasks(ctx context.Context, ts []*Task) (*Updated, error)
- func (c *Client) ListPromises(ctx context.Context, topic string, limit, offset int) ([]*Promise, error)
- func (c *Client) ListTasks(ctx context.Context, topic string, limit, offset int) ([]*Task, error)
- func (c *Client) ListTopics(ctx context.Context, limit, offset int) ([]*Topic, error)
- func (c *Client) PatchTask(ctx context.Context, id string, m *Commit) (*Task, error)
- func (c *Client) Poll(ctx context.Context, topic string, p *Promise) (*Context, error)
- func (c *Client) PostPromises(ctx context.Context, topic string, p *Promise) (*Task, error)
- func (c *Client) Request(ctx context.Context, method, endpoint string, body, result any) error
- func (c *Client) Subscribe(ctx context.Context, o *SubscribeOptions, f SubscribeHandler) error
- func (c *Client) UpsertPromise(ctx context.Context, p *Promise) (*Task, error)
- func (c *Client) UpsertTask(ctx context.Context, t *Task) (*Updated, error)
- func (c *Client) UpsertTasks(ctx context.Context, ts []*Task) (*Updated, error)
- type ClientOptions
- type Commit
- type Context
- func (ctx *Context) Abstain() *Context
- func (ctx *Context) Archive() *Context
- func (ctx *Context) Commit() error
- func (ctx *Context) Force() *Context
- func (ctx *Context) Reschedule(t time.Time) *Context
- func (ctx *Context) Reset() *Context
- func (ctx *Context) Retry(duration string) *Context
- func (ctx *Context) SetDefer(duration string) *Context
- func (ctx *Context) SetNonce(nonce string) *Context
- func (ctx *Context) SetPayload(v any) *Context
- func (ctx *Context) SetScheduled(t time.Time) *Context
- func (ctx *Context) SetState(s TaskState) *Context
- func (ctx *Context) SetTopic(topic string) *Context
- type Deleted
- type Error
- type Promise
- type Promises
- type SubscribeHandler
- type SubscribeOptions
- type Task
- type TaskState
- type Tasks
- type Topic
- type Topics
- type Updated
Constants ¶
const DefaultConcurrencyDelay = 1 * time.Second
DefaultConcurrencyDelay is the default value of SubscribeOptions's ConcurrencyDelay.
const DefaultDrainInterval = 5 * time.Second
DefaultDrainInterval is the default value of SubscribeOptions's DrainInterval.
const DefaultErrorInterval = 30 * time.Second
DefaultErrorInterval is the default value of SubscribeOptions's ErrorInterval.
const DefaultLimit = 10
DefaultLimit is the default number of resources to return in pagination.
const DefaultTimeout = "10m"
DefaultTimeout is the default timeout duration for task execution.
const NonceLength = 16
NonceLength is the length of the randomly generated nonce strings.
const StatusClientClosedRequest = 499
StatusClientClosedRequest is the code for client closed request errors.
Variables ¶
var ( // ErrBadRequest is returned when the request is malformed. ErrBadRequest = errors.New("bad request") // ErrNotFound is returned when the requested resource is not found. ErrNotFound = errors.New("not found") // ErrConflict is returned when the resource conflicts with existing ones. ErrConflict = errors.New("conflict") // ErrClientClosedRequest is returned when the client closed the request. ErrClientClosedRequest = errors.New("client closed request") // ErrInternalServerError is returned when the server encountered a // situation it does not know how to handle. ErrInternalServerError = errors.New("internal server error") ErrServiceUnavailable = errors.New("service unavailable") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an HTTP client that talks to Ratus.
func NewClient ¶
func NewClient(o *ClientOptions) (*Client, error)
NewClient creates a new Ratus client instance.
func (*Client) DeletePromise ¶
DeletePromise deletes a promise by the unique ID of its target task.
func (*Client) DeletePromises ¶
DeletePromises deletes all promises in a topic.
func (*Client) DeleteTask ¶
DeleteTask deletes a task by its unique ID.
func (*Client) DeleteTasks ¶
DeleteTasks deletes all tasks in a topic.
func (*Client) DeleteTopic ¶
DeleteTopic deletes a topic and its tasks.
func (*Client) DeleteTopics ¶
DeleteTopics deletes all topics and tasks.
func (*Client) GetLiveness ¶
GetLiveness checks the liveness of the instance.
func (*Client) GetPromise ¶
GetPromise gets a promise by the unique ID of its target task.
func (*Client) GetReadiness ¶
GetReadiness checks the readiness of the instance.
func (*Client) InsertPromise ¶
InsertPromise makes a promise to claim and execute a task if it is in pending state.
func (*Client) InsertTask ¶
InsertTask inserts a new task.
func (*Client) InsertTasks ¶
InsertTasks inserts a batch of tasks while ignoring existing ones.
func (*Client) ListPromises ¶
func (c *Client) ListPromises(ctx context.Context, topic string, limit, offset int) ([]*Promise, error)
ListPromises lists all promises in a topic.
func (*Client) ListTopics ¶
ListTopics lists all topics.
func (*Client) PatchTask ¶
PatchTask applies a set of updates to a task and returns the updated task.
func (*Client) Poll ¶
Poll claims and returns the next available task in a topic. An error wrapping ErrNotFound is returned if the topic is empty, or if no task in the topic has reached its scheduled time of execution.
func (*Client) PostPromises ¶
PostPromises makes a promise to claim and execute the next available task in a topic.
func (*Client) Request ¶
Request calls an API endpoint and stores the response body in the value pointed to by result. Error messages from Ratus will be translated into errors and returned.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, o *SubscribeOptions, f SubscribeHandler) error
Subscribe to a topic and attach a handler function to listen for new tasks and errors. Calling Subscribe will block the calling goroutine indefinitely unless the context times out or gets canceled.
func (*Client) UpsertPromise ¶
UpsertPromise makes a promise to claim and execute a task regardless of its current state.
func (*Client) UpsertTask ¶
UpsertTask inserts or updates a task.
type ClientOptions ¶
type ClientOptions struct { // Origin of the Ratus instance or load balancer to connect to. // An origin is a combination of a scheme, hostname, and port. // Reference: https://web.dev/same-site-same-origin/#origin Origin string // Common header key-value pairs for every outgoing request. Headers map[string]string // Timeout specifies a time limit for requests made by this client. // This is not related to the timeout for task execution. // A Timeout of zero means no timeout. Timeout time.Duration }
ClientOptions contains options to configure a Ratus client.
type Commit ¶
type Commit struct { // If not empty, the commit will be accepted only if the value matches the // corresponding nonce of the target task. Nonce string `json:"nonce,omitempty" bson:"nonce,omitempty"` // If not empty, transfer the task to the specified topic. Topic string `json:"topic,omitempty" bson:"topic,omitempty"` // If not nil, set the state of the task to the specified value. // If nil, the state of the task will be set to "completed" by default. State *TaskState `json:"state,omitempty" bson:"state,omitempty"` // If not nil, set the scheduled time of the task to the specified value. Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"` // If not nil, use this value to replace the payload of the task. Payload any `json:"payload,omitempty" bson:"payload,omitempty"` // A duration relative to the time the commit is accepted, indicating that // the task will be scheduled to execute after this duration. When the // absolute scheduled time is specified, the scheduled time will take // precedence. It is recommended to use relative durations whenever // possible to avoid clock synchronization issues. The value must be a // valid duration string parsable by time.ParseDuration. This field is only // used when creating a commit and will be cleared after converting to an // absolute scheduled time. Defer string `json:"defer,omitempty" bson:"-"` }
Commit contains a set of updates to be applied to a task.
type Context ¶
type Context struct { context.Context // Task acquired by the polling operation. Task *Task // contains filtered or unexported fields }
Context wraps around context.Context to carry scoped values throughout the poll-execute-commit workflow. Its deadline will be automatically set based on the execution deadline of the acquired task. It also provides chainable methods for setting up commits. Since the custom context is only used in parameters and return values, it is not considered anti-pattern. Reference: https://github.com/golang/go/issues/22602
func (*Context) Reschedule ¶
Reschedule is equivalent to calling Abstain followed by SetScheduled(t).
func (*Context) SetPayload ¶
SetPayload sets the value for the Payload field of the commit.
func (*Context) SetScheduled ¶
SetScheduled sets the value for the Scheduled field of the commit.
type Deleted ¶
type Deleted struct { // Number of resources deleted by the operation. Deleted int64 `json:"deleted"` }
Deleted contains result of a delete operation.
type Error ¶
type Error struct { // The error object. Error struct { // Code of the error. Code int `json:"code"` // Message of the error. Message string `json:"message"` } `json:"error"` }
Error contains an error message.
type Promise ¶
type Promise struct { // Unique ID of the promise, which is the same as the target task ID. // A promise with an empty ID is considered an "wildcard promise", and // Ratus will assign an appropriate task based on the status of the queue. // A task can only be owned by a single promise at a given time. ID string `json:"_id,omitempty" bson:"_id" form:"_id"` // Identifier of the consumer instance who consumed the task. Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty" form:"consumer"` // The deadline for the completion of execution promised by the consumer. // Consumer code needs to commit the task before this deadline, otherwise // the task is determined to have timed out and will be reset to the // "pending" state, allowing other consumers to retry. Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty" form:"deadline"` // Timeout duration for task execution promised by the consumer. When the // absolute deadline time is specified, the deadline will take precedence. // It is recommended to use relative durations whenever possible to avoid // clock synchronization issues. The value must be a valid duration string // parsable by time.ParseDuration. This field is only used when creating a // promise and will be cleared after converting to an absolute deadline. Timeout string `json:"timeout,omitempty" bson:"-" form:"timeout"` }
Promise represents a claim on the ownership of an active task.
type Promises ¶
type Promises struct {
Data []*Promise `json:"data"`
}
Promises contains a list of promise resources.
type SubscribeHandler ¶
SubscribeHandler defines the signature of handler functions for the Subscribe method.
type SubscribeOptions ¶
type SubscribeOptions struct { // A wildcard promise containing consumer and timeout settings. // The promise will be reused for all polling operations, thus the ID and // deadline fields will be ignored. Promise *Promise // Name of the topic to subscribe to. Topic string // Maximum number of tasks to be executed concurrently. Concurrency int // Delay added before starting each polling goroutine to avoid spikes. // If zero, DefaultConcurrencyDelay is used. ConcurrencyDelay time.Duration // Pause duration after successful polls. // By default will proceed to the next poll immediately without pausing. PollInterval time.Duration // Pause duration when the topic has been emptied. // If zero, DefaultDrainInterval is used. DrainInterval time.Duration // Pause duration when an error occurs. // If zero, DefaultErrorInterval is used. ErrorInterval time.Duration }
SubscribeOptions contains options for subscribing to a topic.
type Task ¶
type Task struct { // User-defined unique ID of the task. // Task IDs across all topics share the same namespace. ID string `json:"_id" bson:"_id"` // Topic that the task currently belongs to. Tasks under the same topic // will be executed according to the scheduled time. Topic string `json:"topic" bson:"topic"` // Current state of the task. At a given moment, the state of a task may be // either "pending", "active", "completed" or "archived". State TaskState `json:"state" bson:"state"` // The nonce field stores a random string for implementing an optimistic // concurrency control (OCC) layer outside of the storage engine. Ratus // ensures consumers can only commit to tasks that have not changed since // the promise was made by verifying the nonce field. Nonce string `json:"nonce" bson:"nonce"` // Identifier of the producer instance who produced the task. Producer string `json:"producer,omitempty" bson:"producer,omitempty"` // Identifier of the consumer instance who consumed the task. Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty"` // The time the task was created. // Timestamps are generated by the instance running Ratus, remember to // perform clock synchronization before running multiple instances. Produced *time.Time `json:"produced,omitempty" bson:"produced,omitempty"` // The time the task is scheduled to be executed. Tasks will not be // executed until the scheduled time arrives. After the scheduled time, // excessive tasks will be executed in the order of the scheduled time. Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"` // The time the task was claimed by a consumer. // Not to confuse this with the time of commit, which is not recorded. Consumed *time.Time `json:"consumed,omitempty" bson:"consumed,omitempty"` // The deadline for the completion of execution promised by the consumer. // Consumer code needs to commit the task before this deadline, otherwise // the task is determined to have timed out and will be reset to the // "pending" state, allowing other consumers to retry. Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty"` // A minimal descriptor of the task to be executed. // It is not recommended to rely on Ratus as the main storage of tasks. // Instead, consider storing the complete task record in a database, and // use a minimal descriptor as the payload to reference the task. Payload any `json:"payload,omitempty" bson:"payload,omitempty"` // A duration relative to the time the task is accepted, indicating that // the task will be scheduled to execute after this duration. When the // absolute scheduled time is specified, the scheduled time will take // precedence. It is recommended to use relative durations whenever // possible to avoid clock synchronization issues. The value must be a // valid duration string parsable by time.ParseDuration. This field is only // used when creating a task and will be cleared after converting to an // absolute scheduled time. Defer string `json:"defer,omitempty" bson:"-"` }
Task references an idempotent unit of work that should be executed asynchronously.
type TaskState ¶
type TaskState int32
TaskState indicates the state of a task.
const ( // The "pending" state indicates that the task is ready to be executed or // is waiting to be executed in the future. TaskStatePending TaskState = iota // The "active" state indicates that the task is being processed by a // consumer. Active tasks that have timed out will be automatically reset // to the "pending" state. Consumer code should handle failure and set the // state to "pending" to retry later if necessary. TaskStateActive // The "completed" state indicates that the task has completed its execution. // If the storage engine implementation supports TTL, completed tasks will // be automatically deleted after the retention period has expired. TaskStateCompleted // The "archived" state indicates that the task is stored as an archive. // Archived tasks will never be deleted due to expiration. TaskStateArchived )
type Tasks ¶
type Tasks struct {
Data []*Task `json:"data"`
}
Tasks contains a list of task resources.
type Topic ¶
type Topic struct { // User-defined unique name of the topic. Name string `json:"name" bson:"_id"` // The number of tasks that belong to the topic. Count int64 `json:"count,omitempty" bson:"count,omitempty"` }
Topic refers to an ordered subset of tasks with the same topic name property.
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package docs embeds documentation files.
|
Package docs embeds documentation files. |
examples
|
|
internal
|
|
config
Package config contains configurations and command line arguments.
|
Package config contains configurations and command line arguments. |
controller
Package controller implements controllers for API endpoints.
|
Package controller implements controllers for API endpoints. |
engine
Package engine defines the interface for storage engine implementations.
|
Package engine defines the interface for storage engine implementations. |
engine/memdb
Package memdb implements the storage engine interface for MemDB.
|
Package memdb implements the storage engine interface for MemDB. |
engine/mongodb
Package mongodb implements the storage engine interface for MongoDB.
|
Package mongodb implements the storage engine interface for MongoDB. |
engine/stub
Package stub provides a stub engine that returns canned data for testing.
|
Package stub provides a stub engine that returns canned data for testing. |
metrics
Package metrics registers Prometheus metrics.
|
Package metrics registers Prometheus metrics. |
middleware
Package middleware provides functions to inspect and transform requests.
|
Package middleware provides functions to inspect and transform requests. |
nonce
Package nonce generates random alphanumeric strings of fixed length.
|
Package nonce generates random alphanumeric strings of fixed length. |
reqtest
Package reqtest provides utilities for testing requests and responses.
|
Package reqtest provides utilities for testing requests and responses. |
router
Package router provides API endpoint routing.
|
Package router provides API endpoint routing. |