Documentation ¶
Index ¶
- Constants
- Variables
- func Logger(ctx context.Context) *zap.Logger
- type CompletionIDs
- type HandlerRegistration
- type JobOptions
- type LoadSaveFunc
- type MessageContext
- func (mc *MessageContext) Event(ctx context.Context, message interface{}, options ...PublishOption) error
- func (mc *MessageContext) HasParentSaga() bool
- func (mc *MessageContext) Publish(ctx context.Context, message interface{}, options ...PublishOption) error
- func (mc *MessageContext) Reply(ctx context.Context, message interface{}, options ...PublishOption) error
- func (mc *MessageContext) SagaID() SagaID
- func (mc *MessageContext) Schedule(ctx context.Context, message interface{}, duration time.Duration) error
- func (mc *MessageContext) ScheduleAt(ctx context.Context, message interface{}, duration time.Time) error
- func (mc *MessageContext) StartSaga() SagaID
- type MessageHandler
- type MessagePublisher
- type PublishOption
- func At(when time.Time) PublishOption
- func ForSaga(id SagaID) PublishOption
- func FromNowAt(duration time.Duration) PublishOption
- func PopSaga() PublishOption
- func StartSaga() PublishOption
- func ToQueue(queue string) PublishOption
- func Untransacted() PublishOption
- func WithHigherPriority() PublishOption
- func WithLowerPriority() PublishOption
- func WithTags(tags map[string][]string) PublishOption
- type QueMessagePublisher
- type QueueDef
- type Saga
- type SagaID
- type SagaOption
- type SagaRepository
- func (r *SagaRepository) Delete(ctx context.Context, saga *Saga) error
- func (r *SagaRepository) DeleteByID(ctx context.Context, id SagaID) error
- func (r *SagaRepository) FindByID(ctx context.Context, id SagaID) (*Saga, error)
- func (r *SagaRepository) LoadAndSave(ctx context.Context, id SagaID, loadSaveFunc LoadSaveFunc) error
- func (r *SagaRepository) Upsert(ctx context.Context, saga *Saga) error
- type TransportMessage
Constants ¶
View Source
const (
SagaIDTag = "saga-ids"
)
Variables ¶
View Source
var ( ErrNoOptimisticLock = errors.New("saga optimistic lock failed") ErrNoSaga = errors.New("saga not found") )
Functions ¶
Types ¶
type CompletionIDs ¶
type CompletionIDs struct {
// contains filtered or unexported fields
}
func NewCompletionIDs ¶
func NewCompletionIDs() *CompletionIDs
func (*CompletionIDs) Generate ¶
func (c *CompletionIDs) Generate() string
func (*CompletionIDs) IDs ¶
func (c *CompletionIDs) IDs() []string
type HandlerRegistration ¶
type JobOptions ¶
type LoadSaveFunc ¶
type LoadSaveFunc = func(ctx context.Context, body *json.RawMessage) (interface{}, error)
type MessageContext ¶
type MessageContext struct {
// contains filtered or unexported fields
}
func NewMessageContext ¶
func NewMessageContext(publisher MessagePublisher, handling *TransportMessage) *MessageContext
func (*MessageContext) Event ¶
func (mc *MessageContext) Event(ctx context.Context, message interface{}, options ...PublishOption) error
func (*MessageContext) HasParentSaga ¶
func (mc *MessageContext) HasParentSaga() bool
func (*MessageContext) Publish ¶
func (mc *MessageContext) Publish(ctx context.Context, message interface{}, options ...PublishOption) error
func (*MessageContext) Reply ¶
func (mc *MessageContext) Reply(ctx context.Context, message interface{}, options ...PublishOption) error
func (*MessageContext) SagaID ¶
func (mc *MessageContext) SagaID() SagaID
func (*MessageContext) ScheduleAt ¶
func (*MessageContext) StartSaga ¶
func (mc *MessageContext) StartSaga() SagaID
type MessageHandler ¶
type MessagePublisher ¶
type MessagePublisher interface {
Publish(ctx context.Context, message interface{}, options ...PublishOption) error
}
func NewDevNullMessagePublisher ¶
func NewDevNullMessagePublisher() MessagePublisher
type PublishOption ¶
type PublishOption func(*TransportMessage, *JobOptions) error
func At ¶
func At(when time.Time) PublishOption
func ForSaga ¶
func ForSaga(id SagaID) PublishOption
func FromNowAt ¶
func FromNowAt(duration time.Duration) PublishOption
func PopSaga ¶
func PopSaga() PublishOption
func StartSaga ¶
func StartSaga() PublishOption
func ToQueue ¶
func ToQueue(queue string) PublishOption
func Untransacted ¶
func Untransacted() PublishOption
func WithHigherPriority ¶
func WithHigherPriority() PublishOption
func WithLowerPriority ¶
func WithLowerPriority() PublishOption
func WithTags ¶
func WithTags(tags map[string][]string) PublishOption
type QueMessagePublisher ¶
type QueMessagePublisher struct {
// contains filtered or unexported fields
}
func NewQueMessagePublisher ¶
func NewQueMessagePublisher(metrics *logging.Metrics, db *pgxpool.Pool, q *gue.Client) *QueMessagePublisher
func (*QueMessagePublisher) Publish ¶
func (p *QueMessagePublisher) Publish(ctx context.Context, message interface{}, options ...PublishOption) error
type Saga ¶
type Saga struct { ID SagaID CreatedAt time.Time UpdatedAt time.Time ScheduledAt *time.Time Version int Tags map[string][]string Type string Body *json.RawMessage }
func NewSaga ¶
func NewSaga(options ...SagaOption) *Saga
type SagaRepository ¶
type SagaRepository struct {
// contains filtered or unexported fields
}
func NewSagaRepository ¶
func NewSagaRepository(dbpool *pgxpool.Pool) *SagaRepository
func (*SagaRepository) Delete ¶
func (r *SagaRepository) Delete(ctx context.Context, saga *Saga) error
func (*SagaRepository) DeleteByID ¶
func (r *SagaRepository) DeleteByID(ctx context.Context, id SagaID) error
func (*SagaRepository) LoadAndSave ¶
func (r *SagaRepository) LoadAndSave(ctx context.Context, id SagaID, loadSaveFunc LoadSaveFunc) error
type TransportMessage ¶
Click to show internal directories.
Click to hide internal directories.