Versions in this module Expand all Collapse all v1 v1.0.0 Nov 13, 2024 Changes in this version + const DefaultMaxTries + const DefaultTimeout + const StateCompleted + const StateError + const StatePending + const StateRunning + type Callback func(t Task) + type ChangeStreamInterface interface + Close func(ctx context.Context) error + Decode func(v interface{}) error + Next func(ctx context.Context) bool + type CollectionInterface interface + FindOneAndUpdate func(ctx context.Context, filter interface{}, update interface{}, ...) *mongo.SingleResult + Indexes func() mongo.IndexView + InsertOne func(ctx context.Context, document interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) + UpdateMany func(ctx context.Context, filter interface{}, update interface{}, ...) (*mongo.UpdateResult, error) + UpdateOne func(ctx context.Context, filter interface{}, update interface{}, ...) (*mongo.UpdateResult, error) + Watch func(ctx context.Context, pipeline interface{}, ...) (*mongo.ChangeStream, error) + type DbInterface interface + Context func() context.Context + CreateIndexes func(index []mongo.IndexModel) error + FindOneAndUpdate func(filter interface{}, update interface{}, ...) *mongo.SingleResult + InsertOne func(document interface{}) (primitive.ObjectID, error) + UpdateMany func(filter interface{}, update interface{}) error + UpdateOne func(filter interface{}, update interface{}) error + Watch func(pipeline interface{}) (ChangeStreamInterface, error) + type Meta struct + Completed *time.Time + Created time.Time + Dispatched *time.Time + type PublishOptions struct + MaxTries uint + Tries int + func NewPublishOptions() *PublishOptions + func (p *PublishOptions) SetMaxTries(maxTries uint) *PublishOptions + type Queue struct + func NewQueue(db DbInterface) *Queue + func (q *Queue) Ack(id string) error + func (q *Queue) CreateIndexes() error + func (q *Queue) Err(id string, err error) error + func (q *Queue) GetNext(topic string) (*Task, error) + func (q *Queue) GetNextById(id primitive.ObjectID) (*Task, error) + func (q *Queue) Publish(topic string, payload any, opts ...*PublishOptions) (*Task, error) + func (q *Queue) Reschedule(task *Task) (*Task, error) + func (q *Queue) Selfcare(topic string, timeout time.Duration) error + func (q *Queue) Subscribe(topic string, cb Callback) error + type StdDb struct + func NewStdDb(collection CollectionInterface, ctx context.Context) *StdDb + func (d *StdDb) Context() context.Context + func (d *StdDb) CreateIndexes(indexes []mongo.IndexModel) error + func (d *StdDb) FindOneAndUpdate(filter interface{}, update interface{}, ...) *mongo.SingleResult + func (d *StdDb) InsertOne(document interface{}) (primitive.ObjectID, error) + func (d *StdDb) UpdateMany(filter interface{}, update interface{}) error + func (d *StdDb) UpdateOne(filter interface{}, update interface{}) error + func (d *StdDb) Watch(pipeline interface{}) (ChangeStreamInterface, error) + type Task struct + Id primitive.ObjectID + MaxTries uint + Message string + Meta Meta + Payload any + State string + Topic string + Tries uint