dgo

package module
v0.0.0-...-0948bf3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIDNil      = errors.New("id is nil")
	ErrNotFound   = errors.New("aggregate not found")
	ErrNotMatched = errors.New("aggregate not matched")
)
View Source
var GenDefaultTopic = func(aggName string) string {
	return fmt.Sprintf("%sEvent", aggName)
}
View Source
var TraceSlog = itrace.SlogHandler

Functions

func IgnoreIDNil

func IgnoreIDNil(err error) error

func IgnoreNotFound

func IgnoreNotFound(err error) error

func IsDryRun

func IsDryRun(v any) bool

func NewAggBase

func NewAggBase(_opts ...AggBaseOption) *aggBase

NewAggBase constructor

func NewAggBaseOptions

func NewAggBaseOptions() *aggBaseOptions

NewAggBaseOptions new options struct

func NewEventOptions

func NewEventOptions() *eventOptions

NewEventOptions new options struct

func NewServiceOptions

func NewServiceOptions[A AggBase]() *serviceOptions[A]

NewServiceOptions[A AggBase] new options struct

func PrintAggBaseOptions

func PrintAggBaseOptions(packageName string)

func PrintEventOptions

func PrintEventOptions(packageName string)

func PrintServiceOptions

func PrintServiceOptions(packageName string)

func WithBatchTransaction

func WithBatchTransaction(transaction bool) func(o *batchOptions)

func WithBatchTransactionOff

func WithBatchTransactionOff() func(o *batchOptions)

Types

type ActionTarget

type ActionTarget interface {
	// contains filtered or unexported methods
}

type ActionType

type ActionType int
const (
	ActionCreate ActionType = iota + 1
	ActionUpdate
	ActionSave
	ActionDelete
)

type AggBase

type AggBase interface {
	ID() ID
	Now() time.Time
	CreatedAt() time.Time
	UpdatedAt() time.Time
	Version() uint64
	OriginalVersion() uint64
	IsNew() bool
	AddEvent(payload proto.Message, opts ...EventOption)
	// contains filtered or unexported methods
}

AggBase 聚合基础接口

type AggBaseOption

type AggBaseOption interface {
	// contains filtered or unexported methods
}

AggBaseOption option interface

func SkipAggBaseOption

func SkipAggBaseOption() AggBaseOption

func WithAggBaseCreatedAt

func WithAggBaseCreatedAt(createdAt time.Time) AggBaseOption

WithAggBaseCreatedAt createdAt option of aggBase

func WithAggBaseId

func WithAggBaseId(id ID) AggBaseOption

WithAggBaseId id option of aggBase

func WithAggBaseNow

func WithAggBaseNow(now time.Time) AggBaseOption

WithAggBaseNow now option of aggBase

func WithAggBaseOptions

func WithAggBaseOptions(o *aggBaseOptions) AggBaseOption

func WithAggBaseUpdatedAt

func WithAggBaseUpdatedAt(updatedAt time.Time) AggBaseOption

WithAggBaseUpdatedAt updatedAt option of aggBase

func WithAggBaseVersion

func WithAggBaseVersion(version uint64) AggBaseOption

WithAggBaseVersion version option of aggBase

type AggConstructor

type AggConstructor[A AggBase] interface {
	NewAggregate() A
}

AggConstructor 构造器接口

type AggNamer

type AggNamer interface {
	AggName() string
}

AggNamer 实现此接口,则该聚合的名称为AggName()的返回值

type BatchEntry

type BatchEntry[A AggBase] struct {
	Handler      Handler[A]
	ActionType   ActionType
	ActionTarget ActionTarget
}

BatchEntry 批量命令返回条目

func NewBatchEntry

func NewBatchEntry[A AggBase](handler Handler[A], actionType ActionType, actionTarget ActionTarget) *BatchEntry[A]

func NewBatchEntryByFunc

func NewBatchEntryByFunc[A AggBase](hf func(context.Context, A) error, actionType ActionType, actionTarget ActionTarget) *BatchEntry[A]

type BatchOption

type BatchOption func(o *batchOptions)

type Bus

type Bus interface {
	Subscriber
}

Bus 事件总线

type DryRunner

type DryRunner interface {
	DryRun() bool
}

DryRunner dry runner

type ErrDuplicate

type ErrDuplicate[A AggBase] struct {
	// contains filtered or unexported fields
}

func NewDuplicate

func NewDuplicate[A AggBase](a A) *ErrDuplicate[A]

func (*ErrDuplicate[A]) Aggregate

func (e *ErrDuplicate[A]) Aggregate() A

func (*ErrDuplicate[A]) Error

func (e *ErrDuplicate[A]) Error() string

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event 事件

func NewEvent

func NewEvent(payload proto.Message, _opts ...EventOption) *Event

NewEvent constructor

func (*Event) AggID

func (e *Event) AggID() ID

func (*Event) AggName

func (e *Event) AggName() string

func (*Event) AggVersion

func (e *Event) AggVersion() uint64

func (*Event) CreatedAt

func (e *Event) CreatedAt() time.Time

func (*Event) Name

func (e *Event) Name() string

func (*Event) Payload

func (e *Event) Payload() proto.Message

func (*Event) ProtoMessage

func (e *Event) ProtoMessage() (*pb.Event, error)

func (*Event) PublishRequest

func (e *Event) PublishRequest() (*pb.PublishRequest, error)

func (*Event) SetOptions

func (_e *Event) SetOptions(_opts ...EventOption) *Event

func (*Event) Topic

func (e *Event) Topic() string

func (*Event) UUID

func (e *Event) UUID() string

type EventNamer

type EventNamer interface {
	EventName() string
}

EventNamer 实现此接口,则事件名称为EventName()的返回值

type EventOption

type EventOption interface {
	// contains filtered or unexported methods
}

EventOption option interface

func SkipEventOption

func SkipEventOption() EventOption

func WithEventCreatedAt

func WithEventCreatedAt(createdAt time.Time) EventOption

WithEventCreatedAt createdAt option of Event

func WithEventName

func WithEventName(name string) EventOption

WithEventName name option of Event

func WithEventOptions

func WithEventOptions(o *eventOptions) EventOption

func WithEventTopic

func WithEventTopic(topic string) EventOption

WithEventTopic topic option of Event

func WithEventUuid

func WithEventUuid(uuid string) EventOption

WithEventUuid uuid option of Event

type Events

type Events []*Event

Events 事件切片

type Handler

type Handler[A AggBase] interface {
	Handle(ctx context.Context, a A) error
}

Handler 命令处理接口

func NewHandler

func NewHandler[A AggBase](hf func(context.Context, A) error) Handler[A]

type HandlerFunc

type HandlerFunc[A AggBase] func(context.Context, A) error

func (HandlerFunc[A]) Handle

func (f HandlerFunc[A]) Handle(ctx context.Context, a A) error

type ID

type ID string

ID 聚合根ID

func GenNoHyphenUUID

func GenNoHyphenUUID(ctx context.Context) (ID, error)

func NewID

func NewID(id string) ID

func (ID) IsEmpty

func (id ID) IsEmpty() bool

func (ID) NotEmpty

func (id ID) NotEmpty() bool

func (ID) String

func (id ID) String() string

type IDGenFunc

type IDGenFunc func(ctx context.Context) (ID, error)

func (IDGenFunc) GenID

func (f IDGenFunc) GenID(ctx context.Context) (ID, error)

type IDGenerator

type IDGenerator interface {
	GenID(ctx context.Context) (ID, error)
}

IDGenerator ID生成接口

type MultiDocuments

type MultiDocuments interface {
	IsMultiDocuments()
}

MultiDocuments 该接口应由聚合实现 如果聚合的数据存储分布在mysql的多个表中,或者mongodb的多个集合中,或者redis的多个key中的类似情况 那么该聚合应该实现此接口,以决定在进行仓储操作时自动区分是否启用事务

type NormalBus

type NormalBus interface {
	Bus
	NormalPublisher
}

NormalBus 普通总线

type NormalPublisher

type NormalPublisher interface {
	io.Closer
	Publish(ctx context.Context, requests ...*pb.PublishRequest) error
}

type Repo

type Repo[A AggBase] interface {
	// Get 当记录不存在时,应调用返回ErrNotFound.
	Get(ctx context.Context, id ID) (A, error)
	List(ctx context.Context, ids ...ID) ([]A, error)
	Save(ctx context.Context, a A) error
	Delete(ctx context.Context, a A) error
	SaveEvents(ctx context.Context, events Events) error
	Transaction(ctx context.Context, fn func(ctx context.Context, r Repo[A]) error) error
}

Repo 仓储接口

type Service

type Service[A AggBase] interface {
	Get(ctx context.Context, id ID) (A, error)
	List(ctx context.Context, ids ...ID) ([]A, error)
	Create(ctx context.Context, h Handler[A]) (A, error)
	Delete(ctx context.Context, h Handler[A], t ActionTarget) error
	Update(ctx context.Context, h Handler[A], t ActionTarget) (A, error)
	Save(ctx context.Context, h Handler[A], t ActionTarget) (A, error)
	Batch(ctx context.Context, entries []*BatchEntry[A], opts ...BatchOption) ([]A, error)
}

func NewService

func NewService[A AggBase](repo Repo[A], newAggregate func() A, opts ...ServiceOption[A]) Service[A]

type ServiceOption

type ServiceOption[A AggBase] interface {
	// contains filtered or unexported methods
}

ServiceOption[A AggBase] option interface

func SkipServiceOption

func SkipServiceOption[A AggBase]() ServiceOption[A]

func WithServiceBus

func WithServiceBus[A AggBase](bus Bus) ServiceOption[A]

WithServiceBus bus option of service

func WithServiceIdGenFunc

func WithServiceIdGenFunc[A AggBase](f func(ctx context.Context) (ID, error)) ServiceOption[A]

func WithServiceIdGenerator

func WithServiceIdGenerator[A AggBase](idGenerator IDGenerator) ServiceOption[A]

WithServiceIdGenerator idGenerator option of service

func WithServiceOptions

func WithServiceOptions[A AggBase](o *serviceOptions[A]) ServiceOption[A]

type SubscribeRule

type SubscribeRule struct {
	Topic  string
	Handle func(ctx context.Context, e *pb.Event) error
}

SubscribeRule 订阅规则

func NewSubscribeRule

func NewSubscribeRule(topic string, handle func(ctx context.Context, e *pb.Event) error) *SubscribeRule

type Subscriber

type Subscriber interface {
	io.Closer
	Subscribe(ctx context.Context, rules ...*SubscribeRule) error
}

type TopicNamer

type TopicNamer interface {
	TopicName() string
}

TopicNamer 实现此接口,则消息的topic值为TopicName()的返回值

type TransactionBus

type TransactionBus interface {
	Bus
	TransactionPublisher
}

TransactionBus 事务总线

type TransactionBusTracer

type TransactionBusTracer struct {
	// contains filtered or unexported fields
}

TransactionBusTracer 为TransactionBus添加trace支持

func (*TransactionBusTracer) Close

func (t *TransactionBusTracer) Close() error

func (*TransactionBusTracer) Subscribe

func (t *TransactionBusTracer) Subscribe(ctx context.Context, rules ...*SubscribeRule) error

func (*TransactionBusTracer) TransactionPublish

func (t *TransactionBusTracer) TransactionPublish(ctx context.Context, repoTransaction func(ctx context.Context) error, requests ...*pb.PublishRequest) error

type TransactionPublisher

type TransactionPublisher interface {
	io.Closer
	TransactionPublish(ctx context.Context, repoTransaction func(ctx context.Context) error, requests ...*pb.PublishRequest) error
}

type Validator

type Validator interface {
	Validate() error
}

Validator validator

type Vid

type Vid interface {
	ID() ID
	Version() uint64
}

Vid version and ID

func NewVid

func NewVid(id ID, version uint64) Vid

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL