Documentation ¶
Index ¶
- Constants
- type Callback
- type ChangeStreamInterface
- type CollectionInterface
- type DbInterface
- type Meta
- type PublishOptions
- type Queue
- func (q *Queue) Ack(id string) error
- func (q *Queue) CreateIndexes() error
- func (q *Queue) Err(id string, err error) error
- func (q *Queue) GetNext(topic string) (*Task, error)
- func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error)
- func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Task, error)
- func (q *Queue) Reschedule(task *Task) (*Task, error)
- func (q *Queue) Selfcare(topic string, timeout time.Duration) error
- func (q *Queue) Subscribe(topic string, cb Callback) error
- type StdDb
- func (d *StdDb) Context() context.Context
- func (d *StdDb) CreateIndexes(indexes []mongo.IndexModel) error
- func (d *StdDb) FindOneAndUpdate(filter interface{}, update interface{}, ...) *mongo.SingleResult
- func (d *StdDb) InsertOne(document interface{}) (primitive.ObjectID, error)
- func (d *StdDb) UpdateMany(filter interface{}, update interface{}) error
- func (d *StdDb) UpdateOne(filter interface{}, update interface{}) error
- func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error)
- type Task
Constants ¶
const ( StatePending = "pending" StateRunning = "running" StateCompleted = "completed" StateError = "error" )
const ( DefaultTimeout = time.Minute * 5 DefaultMaxTries = 3 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeStreamInterface ¶
type CollectionInterface ¶
type CollectionInterface interface { InsertOne(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult UpdateOne(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) UpdateMany(ctx context.Context, filter interface{}, update interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) Watch(ctx context.Context, pipeline interface{}, opts ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) Indexes() mongo.IndexView }
type DbInterface ¶
type DbInterface interface { InsertOne(document interface{}) (primitive.ObjectID, error) FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult UpdateOne(filter interface{}, update interface{}) error UpdateMany(filter interface{}, update interface{}) error Watch(pipeline interface{}) (ChangeStreamInterface, error) CreateIndexes(index []mongo.IndexModel) error Context() context.Context }
type PublishOptions ¶
func NewPublishOptions ¶
func NewPublishOptions() *PublishOptions
NewPublishOptions creates a new PublishOptions with default settings.
func (*PublishOptions) SetMaxTries ¶
func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions
SetMaxTries sets the maximum number of retry attempts for publishing. Returns the updated PublishOptions instance.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(db DbInterface) *Queue
NewQueue initializes a new Queue instance with the provided DbInterface.
func (*Queue) Ack ¶
Ack acknowledges a task completion by its ID, updating its state to "completed" and setting the completion timestamp.
func (*Queue) CreateIndexes ¶
CreateIndexes creates MongoDB indexes for the task collection to improve query performance and manage TTL for completed tasks.
func (*Queue) Err ¶
Err updates the state of a task to "error" by its ID, setting the completion time and storing the error message.
func (*Queue) GetNext ¶
GetNext retrieves the next item from the queue for the given topic, marks it as running, and increments its tries count.
func (*Queue) GetNextById ¶
GetNextById retrieves the next pending task by its ID, transitions it to the running state, and increments its tries count.
func (*Queue) Publish ¶
Publish inserts a new task into the queue with the given topic, payload, and maxTries. If maxTries is zero, it defaults to DefaultMaxTries.
func (*Queue) Reschedule ¶
Reschedule republishes a task to the queue, retaining its topic, payload, tries, and maxTries settings.
func (*Queue) Selfcare ¶
Selfcare re-schedules long-running tasks and sets tasks exceeding max tries to error state. It updates tasks in an ongoing state that haven't been acknowledged within a specific timeout period. If timeout is zero, the default timeout value is used. Optionally, tasks can be filtered by topic.
type StdDb ¶
type StdDb struct {
// contains filtered or unexported fields
}
func (*StdDb) CreateIndexes ¶
func (d *StdDb) CreateIndexes(indexes []mongo.IndexModel) error
func (*StdDb) FindOneAndUpdate ¶
func (d *StdDb) FindOneAndUpdate(filter interface{}, update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
func (*StdDb) UpdateMany ¶
func (*StdDb) Watch ¶
func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error)