Documentation ¶
Index ¶
- Variables
- type CustomRetry
- type EventBus
- func (e *EventBus) Commit(ctx context.Context) error
- func (e *EventBus) Dispatch(ctx context.Context, events ...*dddfirework.DomainEvent) error
- func (e *EventBus) DispatchBegin(ctx context.Context, evts ...*dddfirework.DomainEvent) (context.Context, error)
- func (e *EventBus) Options() []dddfirework.Option
- func (e *EventBus) RegisterEventHandler(cb dddfirework.DomainEventHandler)
- func (e *EventBus) RegisterEventTXChecker(checker dddfirework.DomainEventTXChecker)
- func (e *EventBus) Rollback(ctx context.Context) error
- func (e *EventBus) Start(ctx context.Context)
- type EventPO
- type EventStatus
- type FailedInfo
- type IRetryStrategy
- type IntervalRetry
- type LimitRetry
- type Option
- type Options
- type RetryInfo
- type ServicePO
- type Transaction
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrInvalidDB = fmt.Errorf("invalid db")
View Source
var ErrNoTransaction = fmt.Errorf("no transaction")
View Source
var ErrServiceNotCreate = fmt.Errorf("service not create")
Functions ¶
This section is empty.
Types ¶
type CustomRetry ¶
CustomRetry 自定义重试次数和间隔
func (*CustomRetry) Next ¶
func (c *CustomRetry) Next(info *RetryInfo) *RetryInfo
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
func NewEventBus ¶
NewEventBus 提供领域事件直接持久化到数据库,异步查询事件并推送的功能 需要在业务数据库提前创建符合 EventPO, ServicePO 描述的库表,并且使用兼容 gorm Model 的 executor 参数:serviceName 服务名,同一个服务之间只有一个消费,不同服务之间独立消费 用法:eventBus := NewEventBus("service", db); NewEngine(lock, eventBus.Options()...)
func (*EventBus) Dispatch ¶
func (e *EventBus) Dispatch(ctx context.Context, events ...*dddfirework.DomainEvent) error
Dispatch ...
func (*EventBus) DispatchBegin ¶
func (e *EventBus) DispatchBegin(ctx context.Context, evts ...*dddfirework.DomainEvent) (context.Context, error)
DispatchBegin 开启事务消息
func (*EventBus) Options ¶
func (e *EventBus) Options() []dddfirework.Option
func (*EventBus) RegisterEventHandler ¶
func (e *EventBus) RegisterEventHandler(cb dddfirework.DomainEventHandler)
func (*EventBus) RegisterEventTXChecker ¶
func (e *EventBus) RegisterEventTXChecker(checker dddfirework.DomainEventTXChecker)
type EventPO ¶
type EventPO struct { ID int64 `gorm:"primaryKey;autoIncrement"` EventID string `gorm:"column:event_id"` Event *dddfirework.DomainEvent `gorm:"serializer:json"` TransID int64 `gorm:"column:trans_id"` // 事务id EventCreatedAt time.Time `gorm:"index"` // 事件的创建时间 CreatedAt time.Time `gorm:"index"` // 记录创建时间 }
EventPO 事件存储模型
CREATE TABLE `ddd_domain_event` (
`id` int NOT NULL AUTO_INCREMENT, `event_id` varchar(64) NOT NULL, `event` text NOT NULL, `trans_id` int, `event_created_at` datetime(3) DEFAULT NULL, `created_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `idx_ddd_domain_event_event_id` (`event_id`), KEY `idx_ddd_domain_event_trans_id` (`trans_id`), KEY `idx_ddd_domain_event_created_at` (`created_at`), KEY `idx_ddd_domain_event_event_created_at` (`event_created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
type EventStatus ¶
type EventStatus int8
const ( EventStatusToSend EventStatus = 1 EventStatusSent EventStatus = 2 EventStatusFailed EventStatus = 3 )
type FailedInfo ¶
type IRetryStrategy ¶
type IntervalRetry ¶
IntervalRetry 指定固定间隔和次数
func (*IntervalRetry) Next ¶
func (c *IntervalRetry) Next(info *RetryInfo) *RetryInfo
type LimitRetry ¶
type LimitRetry struct {
Limit int
}
LimitRetry 设定最大重试次数,不指定间隔
func (*LimitRetry) Next ¶
func (c *LimitRetry) Next(info *RetryInfo) *RetryInfo
type Options ¶
type Options struct { // 重试策略:有两种方式 // 1, RetryInterval + RetryLimit 表示固定间隔重试 // 2, CustomRetry 表示自定义间隔重试 RetryLimit int // 重试次数 RetryInterval time.Duration // 重试间隔 CustomRetry []time.Duration // 自定义重试间隔 DefaultOffset *int64 // 默认起始 offset RunInterval time.Duration // 默认轮询间隔 CleanCron string // 默认清理周期 RetentionTime time.Duration // 消费完成的event在db里的保留时间 LimitPerRun int // 每次轮询最大的处理条数 ConsumeConcurrent int // 事件消费的并发数 RetryStrategy IRetryStrategy TXCheckTimeout time.Duration }
type ServicePO ¶
type ServicePO struct { Name string `gorm:"primaryKey"` Retry []*RetryInfo `gorm:"serializer:json"` // 重试信息 Failed []*RetryInfo `gorm:"serializer:json"` // 失败信息 Offset int64 `gorm:"column:offset"` // 消费位置,等于最后一次消费的事件id CreatedAt time.Time `gorm:"index"` // 记录创建时间 UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 }
ServicePO 服务存储模型
CREATE TABLE `ddd_eventbus_service` ( `name` varchar(30) NOT NULL, `failed` text, `retry` text, `offset` bigint(20) DEFAULT NULL, `created_at` datetime(3) DEFAULT NULL, `updated_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`name`), KEY `idx_ddd_eventbus_service_created_at` (`created_at`), KEY `idx_ddd_eventbus_service_updated_at` (`updated_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
type Transaction ¶
type Transaction struct { ID int64 `gorm:"primaryKey;autoIncrement"` Service string `gorm:"column:service"` // 服务名 Events []*dddfirework.DomainEvent `gorm:"serializer:json"` DueTime time.Time `gorm:"column:due_time"` // 事务超时时间 CreatedAt time.Time `gorm:"index"` // 记录创建时间 }
CREATE TABLE `ddd_event_transaction` (
`id` int NOT NULL AUTO_INCREMENT, `service` varchar(30) NOT NULL, `events` text, `due_time` datetime(3) DEFAULT NULL, `created_at` datetime(3) DEFAULT NULL, PRIMARY KEY (`id`), KEY `idx_ddd_event_transaction_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
func (*Transaction) TableName ¶
func (o *Transaction) TableName() string
Click to show internal directories.
Click to hide internal directories.