Documentation ¶
Overview ¶
ref1: https://github.com/KScaesar/art?tab=readme-ov-file#example
ref2: https://github.com/KScaesar/art/blob/main/message.go
Package dataflow is a generated GoMock package.
Index ¶
- Variables
- func Gather(multiReply []Reply) (Results []any, Err error)
- func GatherWithCtx(ctx context.Context, multiReply []Reply) (Results []any, Err error)
- func PutMessage(message *Message)
- type Consumer
- type ErrorHandleFunc
- type HandleFunc
- type Message
- type Middleware
- type MockProducer
- type MockProducerMockRecorder
- type MockProducerSendCall
- type MockProducerSendWithCtxCall
- func (c *MockProducerSendWithCtxCall) Do(f func(context.Context, ...*Message) error) *MockProducerSendWithCtxCall
- func (c *MockProducerSendWithCtxCall) DoAndReturn(f func(context.Context, ...*Message) error) *MockProducerSendWithCtxCall
- func (c *MockProducerSendWithCtxCall) Return(arg0 error) *MockProducerSendWithCtxCall
- type Mux
- func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) Endpoints(action func(subject, handler string))
- func (mux *Mux) ErrorHandler(errHandler ErrorHandleFunc) *Mux
- func (mux *Mux) Group(groupName string) *Mux
- func (mux *Mux) GroupByNumber(groupName int) *Mux
- func (mux *Mux) HandleMessage(message *Message, dependency any) (err error)
- func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
- func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
- func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) Transform(transform HandleFunc) *Mux
- type Producer
- type Reply
- func (r Reply) Pull() (Result any, Err error)
- func (r Reply) PullWithCtx(ctx context.Context) (Result any, Err error)
- func (r Reply) Pulls() (Results []any, Err error)
- func (r Reply) PullsWithCtx(ctx context.Context) (Results []any, Err error)
- func (r Reply) Push(Result any, Err error)
- func (r Reply) PushWithCtx(ctx context.Context, Result any, Err error) (err error)
Constants ¶
This section is empty.
Variables ¶
var (
ErrNotFoundSubject = errors.New("not found subject mux")
)
Functions ¶
func Gather ¶
Gather 代表的意思是, Producer 透過 多個 Reply 取得 多個 Message 的 response, 其中 Message 和 Reply 是 1 對 1 關聯
func GatherWithCtx ¶
func PutMessage ¶
func PutMessage(message *Message)
Types ¶
type HandleFunc ¶
func Link ¶
func Link(handler HandleFunc, middlewares ...Middleware) HandleFunc
func (HandleFunc) Link ¶
func (h HandleFunc) Link(middlewares ...Middleware) HandleFunc
func (HandleFunc) PostMiddleware ¶
func (h HandleFunc) PostMiddleware() Middleware
func (HandleFunc) PreMiddleware ¶
func (h HandleFunc) PreMiddleware() Middleware
type Message ¶
type Message struct { Subject string Bytes []byte // ingress byte payload or egress byte payload Body any // egress golang object Mutex sync.Mutex // for egress broadcast data process // RouteParam are used to capture values from subject. // These parameters represent resources or identifiers. // // Example: // // define mux subject = "/users/{id}" // send or recv subject = "/users/1017" // // get route param: // key : value => id : 1017 RouteParam maputil.Data Metadata maputil.Data // raw message from 3rd pkg // // Example: // // fiber.Ctx or amqp.Delivery or kafka.Message RawInfra any Ctx context.Context // contains filtered or unexported fields }
Message represents a high-level abstraction data structure containing metadata (e.g. header) + body
func GetMessage ¶
func GetMessage() *Message
func NewBodyEgress ¶
func NewBytesEgress ¶
func (*Message) AckPingPong ¶
func (msg *Message) AckPingPong()
func (*Message) SetPingPong ¶
func (msg *Message) SetPingPong(pingpong chan struct{})
type Middleware ¶
type Middleware func(next HandleFunc) HandleFunc
func UseRecover ¶
func UseRecover() Middleware
type MockProducer ¶
type MockProducer struct {
// contains filtered or unexported fields
}
MockProducer is a mock of Producer interface.
func NewMockProducer ¶
func NewMockProducer(ctrl *gomock.Controller) *MockProducer
NewMockProducer creates a new mock instance.
func (*MockProducer) EXPECT ¶
func (m *MockProducer) EXPECT() *MockProducerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockProducer) Send ¶
func (m *MockProducer) Send(arg0 ...*Message) error
Send mocks base method.
func (*MockProducer) SendWithCtx ¶
func (m *MockProducer) SendWithCtx(arg0 context.Context, arg1 ...*Message) error
SendWithCtx mocks base method.
type MockProducerMockRecorder ¶
type MockProducerMockRecorder struct {
// contains filtered or unexported fields
}
MockProducerMockRecorder is the mock recorder for MockProducer.
func (*MockProducerMockRecorder) Send ¶
func (mr *MockProducerMockRecorder) Send(arg0 ...any) *MockProducerSendCall
Send indicates an expected call of Send.
func (*MockProducerMockRecorder) SendWithCtx ¶
func (mr *MockProducerMockRecorder) SendWithCtx(arg0 any, arg1 ...any) *MockProducerSendWithCtxCall
SendWithCtx indicates an expected call of SendWithCtx.
type MockProducerSendCall ¶
MockProducerSendCall wrap *gomock.Call
func (*MockProducerSendCall) Do ¶
func (c *MockProducerSendCall) Do(f func(...*Message) error) *MockProducerSendCall
Do rewrite *gomock.Call.Do
func (*MockProducerSendCall) DoAndReturn ¶
func (c *MockProducerSendCall) DoAndReturn(f func(...*Message) error) *MockProducerSendCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockProducerSendCall) Return ¶
func (c *MockProducerSendCall) Return(arg0 error) *MockProducerSendCall
Return rewrite *gomock.Call.Return
type MockProducerSendWithCtxCall ¶
MockProducerSendWithCtxCall wrap *gomock.Call
func (*MockProducerSendWithCtxCall) Do ¶
func (c *MockProducerSendWithCtxCall) Do(f func(context.Context, ...*Message) error) *MockProducerSendWithCtxCall
Do rewrite *gomock.Call.Do
func (*MockProducerSendWithCtxCall) DoAndReturn ¶
func (c *MockProducerSendWithCtxCall) DoAndReturn(f func(context.Context, ...*Message) error) *MockProducerSendWithCtxCall
DoAndReturn rewrite *gomock.Call.DoAndReturn
func (*MockProducerSendWithCtxCall) Return ¶
func (c *MockProducerSendWithCtxCall) Return(arg0 error) *MockProducerSendWithCtxCall
Return rewrite *gomock.Call.Return
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux refers to a router or multiplexer, which can be used to handle different message.
func NewMux ¶
NewMux If routeDelimiter is an empty string, Message.RouteParam cannot be used. RouteDelimiter can only be set to a string of length 1. This parameter determines different parts of the Message.Subject.
func (*Mux) DefaultHandler ¶
func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
DefaultHandler When a subject cannot be found, execute the 'Default'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) ErrorHandler ¶
func (mux *Mux) ErrorHandler(errHandler ErrorHandleFunc) *Mux
func (*Mux) GroupByNumber ¶
func (*Mux) HandleMessage ¶
HandleMessage is also a HandleFunc, but with added routing capabilities.
func (*Mux) Handler ¶
func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) HandlerByNumber ¶
func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) Middleware ¶
func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
Middleware Before registering handler, middleware must be defined; otherwise, the handler won't be able to use middleware.
func (*Mux) NotFoundHandler ¶
func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
NotFoundHandler When a subject cannot be found, execute the 'NotFound'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) PostMiddleware ¶
func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) PreMiddleware ¶
func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) Transform ¶
func (mux *Mux) Transform(transform HandleFunc) *Mux
Transform when executing trie.handleMessage will re-fetch Message.Subject or update Message.Bytes after executing the transform function
type Reply ¶
type Reply struct {
// contains filtered or unexported fields
}
Reply is used to push or pull a response. The response represents the result obtained after the Consumer processes the Message send by the Producer.
- Consumer use Reply.Push response.
- Producer use Reply.Pull response.
https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html https://docs.nats.io/nats-concepts/core-nats/reqreply#the-pattern