Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyAggregateChange(ctx context.Context, aggregate Aggregate, change aggregateChange)
- func BaseAddRoute(app *gin.Engine)
- func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEvent) error
- func NewAggregateId() (id int64)
- func NewDomainEventId() (id int64)
- func NodeTime(snowflakeId int64) time.Time
- func RunHTTP(app *gin.Engine)
- func SetGrpcHeaderForDomainEvent(ctx context.Context, method string, req, reply interface{}, ...) error
- func Startup(conf dtmimp.DBConf) *gin.Engine
- func UnaryClientInterceptor() grpc.UnaryClientInterceptor
- type AbstractAggregate
- type AbstractCommand
- type Aggregate
- type Command
- type CommandHandle
- type Context
- type DBConfig
- type DomainEvent
- type DomainEventMessage
- type DtmEventConsumerConfig
- type DtmEventProducer
- type DtmEventProducerConfig
- type EventBus
- type EventDecoder
- type EventHandle
- type EventStore
- type JsonEventDecoder
- type LocalEventBus
- func (bus *LocalEventBus) Await()
- func (bus *LocalEventBus) Close(ctx context.Context) (err error)
- func (bus *LocalEventBus) Name() string
- func (bus *LocalEventBus) Recv(ctx context.Context, topic string, handle EventHandle) (err error)
- func (bus *LocalEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)
- func (bus *LocalEventBus) Shutdown()
- func (bus *LocalEventBus) Start(ctx context.Context) (err error)
- type MysqlEventStoreConfig
- type RawEventDecoder
- type RegularEvent
- type Repository
- func (r *Repository) Load(ctx context.Context, aggregate Aggregate) (has bool, err error)
- func (r *Repository) RegisterAggregates(ctx context.Context, aggregates ...Aggregate) (err error)
- func (r *Repository) Save(ctx context.Context, aggregates ...Aggregate) (ok bool, err error)
- func (r *Repository) SetSaveListener(ctx context.Context, saveListener RepositorySaveListener) (err error)
- type RepositoryConfig
- type RepositorySaveListener
- type SampleDomainEvent
- func (s *SampleDomainEvent) AggregateId() (id int64)
- func (s *SampleDomainEvent) AggregateName() (name string)
- func (s *SampleDomainEvent) EventBody() (body protoreflect.ProtoMessage)
- func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error)
- func (s *SampleDomainEvent) EventCreateTime() (createTime time.Time)
- func (s *SampleDomainEvent) EventId() (id int64)
- func (s *SampleDomainEvent) EventName() (name string)
- type StoredEvent
Constants ¶
View Source
const (
BusiPort = 8081
)
Variables ¶
View Source
var BusiConf dtmcli.DBConf
View Source
var NodeNumber int64 = 1
Functions ¶
func ApplyAggregateChange ¶
func BaseAddRoute ¶
func ExecuteLocalTransaction ¶
func ExecuteLocalTransaction(ctx context.Context, es EventStore, eventsMessages []DomainEvent) error
func NewAggregateId ¶
func NewAggregateId() (id int64)
func NewDomainEventId ¶
func NewDomainEventId() (id int64)
func SetGrpcHeaderForDomainEvent ¶
func SetGrpcHeaderForDomainEvent(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error
SetGrpcHeaderForDomainEvent interceptor to set head for DomainEvent
func UnaryClientInterceptor ¶
func UnaryClientInterceptor() grpc.UnaryClientInterceptor
Types ¶
type AbstractAggregate ¶
type AbstractAggregate struct { Id int64 `json:"id"` // contains filtered or unexported fields }
func (*AbstractAggregate) Applied ¶
func (a *AbstractAggregate) Applied() (events []DomainEvent)
func (*AbstractAggregate) Apply ¶
func (a *AbstractAggregate) Apply(agg Aggregate, aggChange aggregateChange)
func (*AbstractAggregate) Identifier ¶
func (a *AbstractAggregate) Identifier() (id int64)
func (*AbstractAggregate) InitId ¶
func (a *AbstractAggregate) InitId()
type AbstractCommand ¶
type AbstractCommand struct {
AggregateId string
}
type Aggregate ¶
type Aggregate interface { InitId() Identifier() (id int64) Apply(agg Aggregate, event aggregateChange) Applied() (events []DomainEvent) }
type CommandHandle ¶
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func NewContext ¶
func NewContext(ctx context.Context, repository *Repository) Context
type DomainEvent ¶
type DomainEvent interface { AggregateId() int64 AggregateName() string EventId() int64 EventName() string EventBody() protoreflect.ProtoMessage EventBodyRaw() ([]byte, error) EventCreateTime() time.Time // contains filtered or unexported methods }
type DomainEventMessage ¶
type DomainEventMessage struct { AggregateId int64 `json:"aggregate_id"` AggregateName string `json:"aggregate_name"` EventName string `json:"event_name"` EventId int64 `json:"event_id"` EventBody []byte `json:"event_body"` }
func (*DomainEventMessage) Decode ¶
func (msg *DomainEventMessage) Decode(byteData []byte) (err error)
func (*DomainEventMessage) TopicName ¶
func (msg *DomainEventMessage) TopicName(eventBusName string) string
type DtmEventConsumerConfig ¶
type DtmEventProducer ¶
type DtmEventProducer struct { Name string Brokers []string Producer *dtmcli.Msg EventStore EventStore }
func NewDtmEventProducer ¶
func NewDtmEventProducer(ctx context.Context, config DtmEventProducerConfig) (eventProducer DtmEventProducer, err error)
func (*DtmEventProducer) Send ¶
func (p *DtmEventProducer) Send(ctx context.Context, eventMessages ...DomainEvent) (err error)
type DtmEventProducerConfig ¶
type EventBus ¶
type EventBus interface { Name() string Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error) Recv(ctx context.Context, topic string, handle EventHandle) (err error) Start(ctx context.Context) (err error) Shutdown() Await() Close(ctx context.Context) (err error) }
func NewLocalEventBus ¶
type EventDecoder ¶
type EventHandle ¶
type EventHandle func(ctx context.Context, domainEvent DomainEvent) (err error, requeue bool)
EventHandle handle event, if return err is not nil, it will reject event, if want to requeue, then make the requeue be true
type EventStore ¶
type EventStore interface { Name() (name string) GetDB(ctx context.Context) *sql.DB InitDomainEventStoreTable(ctx context.Context, aggregateName string) ReadEvents(ctx context.Context, aggregateName string, aggregateId int64, lastEventId int64) (events []StoredEvent, err error) StoreEvents(ctx context.Context, events []StoredEvent) (err error) CheckEvents(ctx context.Context, events []StoredEvent) (err error) MakeSnapshot(ctx context.Context, aggregate Aggregate) (err error) LoadSnapshot(ctx context.Context, aggregateId int64, aggregate Aggregate) (lastEventId int64, err error) }
func NewMysqlEventStore ¶
func NewMysqlEventStore(ctx context.Context, config MysqlEventStoreConfig) (es EventStore, err error)
type JsonEventDecoder ¶
type JsonEventDecoder struct {
// contains filtered or unexported fields
}
func NewJsonEventDecoder ¶
func NewJsonEventDecoder(raw []byte) *JsonEventDecoder
func (*JsonEventDecoder) Decode ¶
func (d *JsonEventDecoder) Decode(v interface{}) (err error)
func (*JsonEventDecoder) RAW ¶
func (d *JsonEventDecoder) RAW() (raw []byte)
type LocalEventBus ¶
type LocalEventBus struct {
// contains filtered or unexported fields
}
func (*LocalEventBus) Await ¶
func (bus *LocalEventBus) Await()
func (*LocalEventBus) Name ¶
func (bus *LocalEventBus) Name() string
func (*LocalEventBus) Recv ¶
func (bus *LocalEventBus) Recv(ctx context.Context, topic string, handle EventHandle) (err error)
func (*LocalEventBus) Send ¶
func (bus *LocalEventBus) Send(ctx context.Context, eventMessages ...DomainEventMessage) (err error)
func (*LocalEventBus) Shutdown ¶
func (bus *LocalEventBus) Shutdown()
type MysqlEventStoreConfig ¶
type RawEventDecoder ¶
type RawEventDecoder struct {
// contains filtered or unexported fields
}
func NewRawEventDecoder ¶
func NewRawEventDecoder(raw []byte) *RawEventDecoder
func (*RawEventDecoder) Decode ¶
func (d *RawEventDecoder) Decode(v interface{}) (err error)
func (*RawEventDecoder) RAW ¶
func (d *RawEventDecoder) RAW() (raw []byte)
type RegularEvent ¶
type RegularEvent interface { }
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
func NewRepository ¶
func NewRepository(ctx context.Context, config *RepositoryConfig) (r *Repository, err error)
func (*Repository) RegisterAggregates ¶
func (r *Repository) RegisterAggregates(ctx context.Context, aggregates ...Aggregate) (err error)
func (*Repository) SetSaveListener ¶
func (r *Repository) SetSaveListener(ctx context.Context, saveListener RepositorySaveListener) (err error)
type RepositoryConfig ¶
type RepositoryConfig struct { DomainName string `json:"domain_name"` SubDomainName string `json:"sub_domain_name"` MysqlEventStoreDBConfig DBConfig `json:"mysql_event_store_db_config"` RocketMqEventBusNameServers []string `json:"rocket_mq_event_bus_name_servers"` SaveListener RepositorySaveListener DtmDBConf dtmimp.DBConf }
type RepositorySaveListener ¶
type RepositorySaveListener interface {
Handle(ctx context.Context, event DomainEventMessage)
}
type SampleDomainEvent ¶
type SampleDomainEvent struct {
// contains filtered or unexported fields
}
func (*SampleDomainEvent) AggregateId ¶
func (s *SampleDomainEvent) AggregateId() (id int64)
func (*SampleDomainEvent) AggregateName ¶
func (s *SampleDomainEvent) AggregateName() (name string)
func (*SampleDomainEvent) EventBody ¶
func (s *SampleDomainEvent) EventBody() (body protoreflect.ProtoMessage)
func (*SampleDomainEvent) EventBodyRaw ¶
func (s *SampleDomainEvent) EventBodyRaw() (bodyRaw []byte, err error)
func (*SampleDomainEvent) EventCreateTime ¶
func (s *SampleDomainEvent) EventCreateTime() (createTime time.Time)
func (*SampleDomainEvent) EventId ¶
func (s *SampleDomainEvent) EventId() (id int64)
func (*SampleDomainEvent) EventName ¶
func (s *SampleDomainEvent) EventName() (name string)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.