gddd

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2023 License: MIT Imports: 27 Imported by: 0

README

执行顺序

|              Command              |
|       Repository      | Aggregate |
|            | EventBus |
|       EventStore      |

Command(命令)

命令

Aggregate(聚合)

用于描述业务,并产生事件 定义实体,和事件,所有实体更改只能通过事件

Repository(仓库)

用于存储数据的接口

EventBus(事件总线)

用于传输事件

EventStore(事件存储)

用于存储事件,并可回溯(EventSourcing)

TODO

  • 事件消费
  • 事件归档:将每日的事件归档
  • 历史事件溯源
  • go-zero 适配
  • 唯一索引:创建用户时,邮箱唯一
  • 文档

Documentation

Index

Constants

View Source
const (
	BusiPort = 8081
)

Variables

View Source
var BusiConf dtmcli.DBConf
View Source
var NodeNumber int64 = 1

Functions

func ApplyAggregateChange

func ApplyAggregateChange(ctx context.Context, aggregate Aggregate, change aggregateChange)

func BaseAddRoute

func BaseAddRoute(app *gin.Engine)

func ExecuteLocalTransaction

func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEvent) error

func NewAggregateId

func NewAggregateId() (id int64)

func NewDomainEventId

func NewDomainEventId() (id int64)

func NodeTime

func NodeTime(snowflakeId int64) time.Time

func RunHTTP

func RunHTTP(app *gin.Engine)

RunHTTP will run http server

func SetGrpcHeaderForDomainEvent

func SetGrpcHeaderForDomainEvent(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error

SetGrpcHeaderForDomainEvent interceptor to set head for DomainEvent

func Startup

func Startup(conf dtmimp.DBConf) *gin.Engine

Startup startup the busi's http service

func UnaryClientInterceptor

func UnaryClientInterceptor() grpc.UnaryClientInterceptor

Types

type AbstractAggregate

type AbstractAggregate struct {
	Id int64 `json:"id"`
	// contains filtered or unexported fields
}

func (*AbstractAggregate) Applied

func (a *AbstractAggregate) Applied() (events []DomainEvent)

func (*AbstractAggregate) Apply

func (a *AbstractAggregate) Apply(agg Aggregate, aggChange aggregateChange)

func (*AbstractAggregate) Identifier

func (a *AbstractAggregate) Identifier() (id int64)

func (*AbstractAggregate) InitId

func (a *AbstractAggregate) InitId()

type AbstractCommand

type AbstractCommand struct {
	AggregateId string
}

type Aggregate

type Aggregate interface {
	InitId()
	Identifier() (id int64)
	Apply(agg Aggregate, event aggregateChange)
	Applied() (events []DomainEvent)
}

type Command

type Command interface {
}

type CommandHandle

type CommandHandle func(ctx context.Context, command Command) (result interface{}, err error)

type Context

type Context struct {
	// contains filtered or unexported fields
}

func NewContext

func NewContext(ctx context.Context, repository *Repository) Context

func (*Context) Apply

func (c *Context) Apply(aggregate Aggregate, change aggregateChange)

func (*Context) Load

func (c *Context) Load(aggregate Aggregate) (has bool, err error)

func (*Context) Save

func (c *Context) Save(aggregates ...Aggregate) (ok bool, err error)

type DBConfig

type DBConfig struct {
	SqlDataSourceName  string `json:"sql_data_source_name"`
	MaxIdleConnections int    `json:"max_idle_connections"`
	MaxOpenConnections int    `json:"max_open_connections"`
}

type DomainEvent

type DomainEvent interface {
	AggregateId() int64
	AggregateName() string
	EventId() int64
	EventName() string
	EventBody() protoreflect.ProtoMessage
	EventBodyRaw() ([]byte, error)
	EventCreateTime() time.Time
	// contains filtered or unexported methods
}

type DomainEventMessage

type DomainEventMessage struct {
	AggregateId   int64  `json:"aggregate_id"`
	AggregateName string `json:"aggregate_name"`
	EventName     string `json:"event_name"`
	EventId       int64  `json:"event_id"`
	EventBody     []byte `json:"event_body"`
}

func (*DomainEventMessage) Decode

func (msg *DomainEventMessage) Decode(byteData []byte) (err error)

func (*DomainEventMessage) TopicName

func (msg *DomainEventMessage) TopicName(eventBusName string) string

type DtmEventConsumerConfig

type DtmEventConsumerConfig struct {
	DomainName  string
	GroupName   string
	NameServers []string
}

type DtmEventProducer

type DtmEventProducer struct {
	Name       string
	Brokers    []string
	Producer   *dtmcli.Msg
	EventStore EventStore
}

func NewDtmEventProducer

func NewDtmEventProducer(ctx context.Context, config DtmEventProducerConfig) (eventProducer DtmEventProducer, err error)

func (*DtmEventProducer) Send

func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEvent) (err error)

type DtmEventProducerConfig

type DtmEventProducerConfig struct {
	DomainName    string
	SubDomainName string
	NameServers   []string
	EventStore    *EventStore
	DtmDBConf     dtmimp.DBConf
}

type EventBus

type EventBus interface {
	Name() string
	Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)
	Recv(ctx context.Context, topic string, handle EventHandle) (err error)
	Start(ctx context.Context) (err error)
	Shutdown()
	Await()
	Close(ctx context.Context) (err error)
}

func NewLocalEventBus

func NewLocalEventBus(name string) (bus EventBus)

type EventDecoder

type EventDecoder interface {
	RAW() (raw []byte)
	Decode(v interface{}) (err error)
}

type EventHandle

type EventHandle func(ctx context.Context, domainEvent DomainEvent) (err error, requeue bool)

EventHandle handle event, if return err is not nil, it will reject event, if want to requeue, then make the requeue be true

type EventStore

type EventStore interface {
	Name() (name string)
	GetDB(ctx context.Context) *sql.DB
	InitDomainEventStoreTable(ctx context.Context, aggregateName string)
	ReadEvents(ctx context.Context, aggregateName string, aggregateId int64, lastEventId int64) (events []StoredEvent, err error)
	StoreEvents(ctx context.Context, events []StoredEvent) (err error)
	CheckEvents(ctx context.Context, events []StoredEvent) (err error)
	MakeSnapshot(ctx context.Context, aggregate Aggregate) (err error)
	LoadSnapshot(ctx context.Context, aggregateId int64, aggregate Aggregate) (lastEventId int64, err error)
}

func NewMysqlEventStore

func NewMysqlEventStore(ctx context.Context, config MysqlEventStoreConfig) (es EventStore, err error)

type JsonEventDecoder

type JsonEventDecoder struct {
	// contains filtered or unexported fields
}

func NewJsonEventDecoder

func NewJsonEventDecoder(raw []byte) *JsonEventDecoder

func (*JsonEventDecoder) Decode

func (d *JsonEventDecoder) Decode(v interface{}) (err error)

func (*JsonEventDecoder) RAW

func (d *JsonEventDecoder) RAW() (raw []byte)

type LocalEventBus

type LocalEventBus struct {
	// contains filtered or unexported fields
}

func (*LocalEventBus) Await

func (bus *LocalEventBus) Await()

func (*LocalEventBus) Close

func (bus *LocalEventBus) Close(ctx context.Context) (err error)

func (*LocalEventBus) Name

func (bus *LocalEventBus) Name() string

func (*LocalEventBus) Recv

func (bus *LocalEventBus) Recv(ctx context.Context, topic string, handle EventHandle) (err error)

func (*LocalEventBus) Send

func (bus *LocalEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)

func (*LocalEventBus) Shutdown

func (bus *LocalEventBus) Shutdown()

func (*LocalEventBus) Start

func (bus *LocalEventBus) Start(ctx context.Context) (err error)

type MysqlEventStoreConfig

type MysqlEventStoreConfig struct {
	SubDomainName string
	DBConfig      DBConfig
}

type RawEventDecoder

type RawEventDecoder struct {
	// contains filtered or unexported fields
}

func NewRawEventDecoder

func NewRawEventDecoder(raw []byte) *RawEventDecoder

func (*RawEventDecoder) Decode

func (d *RawEventDecoder) Decode(v interface{}) (err error)

func (*RawEventDecoder) RAW

func (d *RawEventDecoder) RAW() (raw []byte)

type RegularEvent

type RegularEvent interface {
}

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

func NewRepository

func NewRepository(ctx context.Context, config *RepositoryConfig) (r *Repository, err error)

func (*Repository) Load

func (r *Repository) Load(ctx context.Context, aggregate Aggregate) (has bool, err error)

func (*Repository) RegisterAggregates

func (r *Repository) RegisterAggregates(ctx context.Context, aggregates ...Aggregate) (err error)

func (*Repository) Save

func (r *Repository) Save(ctx context.Context, aggregates ...Aggregate) (ok bool, err error)

func (*Repository) SetSaveListener

func (r *Repository) SetSaveListener(ctx context.Context, saveListener RepositorySaveListener) (err error)

type RepositoryConfig

type RepositoryConfig struct {
	DomainName                  string   `json:"domain_name"`
	SubDomainName               string   `json:"sub_domain_name"`
	MysqlEventStoreDBConfig     DBConfig `json:"mysql_event_store_db_config"`
	RocketMqEventBusNameServers []string `json:"rocket_mq_event_bus_name_servers"`
	SaveListener                RepositorySaveListener
	DtmDBConf                   dtmimp.DBConf
}

type RepositorySaveListener

type RepositorySaveListener interface {
	Handle(ctx context.Context, event DomainEventMessage)
}

type SampleDomainEvent

type SampleDomainEvent struct {
	// contains filtered or unexported fields
}

func (*SampleDomainEvent) AggregateId

func (s *SampleDomainEvent) AggregateId() (id int64)

func (*SampleDomainEvent) AggregateName

func (s *SampleDomainEvent) AggregateName() (name string)

func (*SampleDomainEvent) EventBody

func (s *SampleDomainEvent) EventBody() (body protoreflect.ProtoMessage)

func (*SampleDomainEvent) EventBodyRaw

func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error)

func (*SampleDomainEvent) EventCreateTime

func (s *SampleDomainEvent) EventCreateTime() (createTime time.Time)

func (*SampleDomainEvent) EventId

func (s *SampleDomainEvent) EventId() (id int64)

func (*SampleDomainEvent) EventName

func (s *SampleDomainEvent) EventName() (name string)

type StoredEvent

type StoredEvent interface {
	AggregateId() int64
	AggregateName() string
	EventId() int64
	EventName() string
	EventBodyRaw() []byte
	EventCreateTime() time.Time
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL