Documentation ¶
Index ¶
- Variables
- func IgnoreIDNil(err error) error
- func IgnoreNotFound(err error) error
- func IsDryRun(v any) bool
- func NewAggBase(_opts ...AggBaseOption) *aggBase
- func NewAggBaseOptions() *aggBaseOptions
- func NewEventOptions() *eventOptions
- func NewServiceOptions[A AggBase]() *serviceOptions[A]
- func PrintAggBaseOptions(packageName string)
- func PrintEventOptions(packageName string)
- func PrintServiceOptions(packageName string)
- func WithBatchTransaction(transaction bool) func(o *batchOptions)
- func WithBatchTransactionOff() func(o *batchOptions)
- type ActionTarget
- type ActionType
- type AggBase
- type AggBaseOption
- func SkipAggBaseOption() AggBaseOption
- func WithAggBaseCreatedAt(createdAt time.Time) AggBaseOption
- func WithAggBaseId(id ID) AggBaseOption
- func WithAggBaseNow(now time.Time) AggBaseOption
- func WithAggBaseOptions(o *aggBaseOptions) AggBaseOption
- func WithAggBaseUpdatedAt(updatedAt time.Time) AggBaseOption
- func WithAggBaseVersion(version uint64) AggBaseOption
- type AggConstructor
- type AggNamer
- type BatchEntry
- type BatchOption
- type Bus
- type DryRunner
- type ErrDuplicate
- type Event
- func (e *Event) AggID() ID
- func (e *Event) AggName() string
- func (e *Event) AggVersion() uint64
- func (e *Event) CreatedAt() time.Time
- func (e *Event) Name() string
- func (e *Event) Payload() proto.Message
- func (e *Event) ProtoMessage() (*pb.Event, error)
- func (e *Event) PublishRequest() (*pb.PublishRequest, error)
- func (_e *Event) SetOptions(_opts ...EventOption) *Event
- func (e *Event) Topic() string
- func (e *Event) UUID() string
- type EventNamer
- type EventOption
- type Events
- type Handler
- type HandlerFunc
- type ID
- type IDGenFunc
- type IDGenerator
- type MultiDocuments
- type NormalBus
- type NormalPublisher
- type Repo
- type Service
- type ServiceOption
- func SkipServiceOption[A AggBase]() ServiceOption[A]
- func WithServiceBus[A AggBase](bus Bus) ServiceOption[A]
- func WithServiceIdGenFunc[A AggBase](f func(ctx context.Context) (ID, error)) ServiceOption[A]
- func WithServiceIdGenerator[A AggBase](idGenerator IDGenerator) ServiceOption[A]
- func WithServiceOptions[A AggBase](o *serviceOptions[A]) ServiceOption[A]
- type SubscribeRule
- type Subscriber
- type TopicNamer
- type TransactionBus
- type TransactionBusTracer
- type TransactionPublisher
- type Validator
- type Vid
Constants ¶
This section is empty.
Variables ¶
var ( ErrIDNil = errors.New("id is nil") ErrNotFound = errors.New("aggregate not found") ErrNotMatched = errors.New("aggregate not matched") )
var GenDefaultTopic = func(aggName string) string { return fmt.Sprintf("%sEvent", aggName) }
var TraceSlog = itrace.SlogHandler
Functions ¶
func IgnoreIDNil ¶
func IgnoreNotFound ¶
func NewAggBaseOptions ¶
func NewAggBaseOptions() *aggBaseOptions
NewAggBaseOptions new options struct
func NewServiceOptions ¶
func NewServiceOptions[A AggBase]() *serviceOptions[A]
NewServiceOptions[A AggBase] new options struct
func PrintAggBaseOptions ¶
func PrintAggBaseOptions(packageName string)
func PrintEventOptions ¶
func PrintEventOptions(packageName string)
func PrintServiceOptions ¶
func PrintServiceOptions(packageName string)
func WithBatchTransaction ¶
func WithBatchTransaction(transaction bool) func(o *batchOptions)
func WithBatchTransactionOff ¶
func WithBatchTransactionOff() func(o *batchOptions)
Types ¶
type ActionTarget ¶
type ActionTarget interface {
// contains filtered or unexported methods
}
type ActionType ¶
type ActionType int
const ( ActionCreate ActionType = iota + 1 ActionUpdate ActionSave ActionDelete )
type AggBase ¶
type AggBase interface { ID() ID Now() time.Time CreatedAt() time.Time UpdatedAt() time.Time Version() uint64 OriginalVersion() uint64 IsNew() bool AddEvent(payload proto.Message, opts ...EventOption) // contains filtered or unexported methods }
AggBase 聚合基础接口
type AggBaseOption ¶
type AggBaseOption interface {
// contains filtered or unexported methods
}
AggBaseOption option interface
func SkipAggBaseOption ¶
func SkipAggBaseOption() AggBaseOption
func WithAggBaseCreatedAt ¶
func WithAggBaseCreatedAt(createdAt time.Time) AggBaseOption
WithAggBaseCreatedAt createdAt option of aggBase
func WithAggBaseNow ¶
func WithAggBaseNow(now time.Time) AggBaseOption
WithAggBaseNow now option of aggBase
func WithAggBaseOptions ¶
func WithAggBaseOptions(o *aggBaseOptions) AggBaseOption
func WithAggBaseUpdatedAt ¶
func WithAggBaseUpdatedAt(updatedAt time.Time) AggBaseOption
WithAggBaseUpdatedAt updatedAt option of aggBase
func WithAggBaseVersion ¶
func WithAggBaseVersion(version uint64) AggBaseOption
WithAggBaseVersion version option of aggBase
type AggConstructor ¶
type AggConstructor[A AggBase] interface { NewAggregate() A }
AggConstructor 构造器接口
type BatchEntry ¶
type BatchEntry[A AggBase] struct { Handler Handler[A] ActionType ActionType ActionTarget ActionTarget }
BatchEntry 批量命令返回条目
func NewBatchEntry ¶
func NewBatchEntry[A AggBase](handler Handler[A], actionType ActionType, actionTarget ActionTarget) *BatchEntry[A]
func NewBatchEntryByFunc ¶
func NewBatchEntryByFunc[A AggBase](hf func(context.Context, A) error, actionType ActionType, actionTarget ActionTarget) *BatchEntry[A]
type BatchOption ¶
type BatchOption func(o *batchOptions)
type ErrDuplicate ¶
type ErrDuplicate[A AggBase] struct { // contains filtered or unexported fields }
func NewDuplicate ¶
func NewDuplicate[A AggBase](a A) *ErrDuplicate[A]
func (*ErrDuplicate[A]) Aggregate ¶
func (e *ErrDuplicate[A]) Aggregate() A
func (*ErrDuplicate[A]) Error ¶
func (e *ErrDuplicate[A]) Error() string
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event 事件
func NewEvent ¶
func NewEvent(payload proto.Message, _opts ...EventOption) *Event
NewEvent constructor
func (*Event) AggVersion ¶
func (*Event) PublishRequest ¶
func (e *Event) PublishRequest() (*pb.PublishRequest, error)
func (*Event) SetOptions ¶
func (_e *Event) SetOptions(_opts ...EventOption) *Event
type EventNamer ¶
type EventNamer interface {
EventName() string
}
EventNamer 实现此接口,则事件名称为EventName()的返回值
type EventOption ¶
type EventOption interface {
// contains filtered or unexported methods
}
EventOption option interface
func SkipEventOption ¶
func SkipEventOption() EventOption
func WithEventCreatedAt ¶
func WithEventCreatedAt(createdAt time.Time) EventOption
WithEventCreatedAt createdAt option of Event
func WithEventOptions ¶
func WithEventOptions(o *eventOptions) EventOption
func WithEventTopic ¶
func WithEventTopic(topic string) EventOption
WithEventTopic topic option of Event
type HandlerFunc ¶
type IDGenerator ¶
IDGenerator ID生成接口
type MultiDocuments ¶
type MultiDocuments interface {
IsMultiDocuments()
}
MultiDocuments 该接口应由聚合实现 如果聚合的数据存储分布在mysql的多个表中,或者mongodb的多个集合中,或者redis的多个key中的类似情况 那么该聚合应该实现此接口,以决定在进行仓储操作时自动区分是否启用事务
type NormalPublisher ¶
type Repo ¶
type Repo[A AggBase] interface { // Get 当记录不存在时,应调用返回ErrNotFound. Get(ctx context.Context, id ID) (A, error) List(ctx context.Context, ids ...ID) ([]A, error) Save(ctx context.Context, a A) error Delete(ctx context.Context, a A) error SaveEvents(ctx context.Context, events Events) error Transaction(ctx context.Context, fn func(ctx context.Context, r Repo[A]) error) error }
Repo 仓储接口
type Service ¶
type Service[A AggBase] interface { Get(ctx context.Context, id ID) (A, error) List(ctx context.Context, ids ...ID) ([]A, error) Create(ctx context.Context, h Handler[A]) (A, error) Delete(ctx context.Context, h Handler[A], t ActionTarget) error Update(ctx context.Context, h Handler[A], t ActionTarget) (A, error) Save(ctx context.Context, h Handler[A], t ActionTarget) (A, error) Batch(ctx context.Context, entries []*BatchEntry[A], opts ...BatchOption) ([]A, error) }
func NewService ¶
func NewService[A AggBase](repo Repo[A], newAggregate func() A, opts ...ServiceOption[A]) Service[A]
type ServiceOption ¶
type ServiceOption[A AggBase] interface { // contains filtered or unexported methods }
ServiceOption[A AggBase] option interface
func SkipServiceOption ¶
func SkipServiceOption[A AggBase]() ServiceOption[A]
func WithServiceBus ¶
func WithServiceBus[A AggBase](bus Bus) ServiceOption[A]
WithServiceBus bus option of service
func WithServiceIdGenFunc ¶
func WithServiceIdGenerator ¶
func WithServiceIdGenerator[A AggBase](idGenerator IDGenerator) ServiceOption[A]
WithServiceIdGenerator idGenerator option of service
func WithServiceOptions ¶
func WithServiceOptions[A AggBase](o *serviceOptions[A]) ServiceOption[A]
type Subscriber ¶
type TopicNamer ¶
type TopicNamer interface {
TopicName() string
}
TopicNamer 实现此接口,则消息的topic值为TopicName()的返回值
type TransactionBus ¶
type TransactionBus interface { Bus TransactionPublisher }
TransactionBus 事务总线
type TransactionBusTracer ¶
type TransactionBusTracer struct {
// contains filtered or unexported fields
}
TransactionBusTracer 为TransactionBus添加trace支持
func (*TransactionBusTracer) Close ¶
func (t *TransactionBusTracer) Close() error
func (*TransactionBusTracer) Subscribe ¶
func (t *TransactionBusTracer) Subscribe(ctx context.Context, rules ...*SubscribeRule) error
func (*TransactionBusTracer) TransactionPublish ¶
func (t *TransactionBusTracer) TransactionPublish(ctx context.Context, repoTransaction func(ctx context.Context) error, requests ...*pb.PublishRequest) error