dataflow

package
v0.0.0-...-de18be0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

ref1: https://github.com/KScaesar/art?tab=readme-ov-file#example

ref2: https://github.com/KScaesar/art/blob/main/message.go

Package dataflow is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotFoundSubject = errors.New("not found subject mux")
)

Functions

func Gather

func Gather(multiReply []Reply) (Results []any, Err error)

Gather 代表的意思是, Producer 透過 多個 Reply 取得 多個 Message 的 response, 其中 Message 和 Reply 是 1 對 1 關聯

func GatherWithCtx

func GatherWithCtx(ctx context.Context, multiReply []Reply) (Results []any, Err error)

func PutMessage

func PutMessage(message *Message)

Types

type Consumer

type Consumer interface {
	Listen() (err error)
	Stop() error
}

type ErrorHandleFunc

type ErrorHandleFunc func(message *Message, dep any, err error) error

type HandleFunc

type HandleFunc func(message *Message, dep any) error
func Link(handler HandleFunc, middlewares ...Middleware) HandleFunc
func (h HandleFunc) Link(middlewares ...Middleware) HandleFunc

func (HandleFunc) PostMiddleware

func (h HandleFunc) PostMiddleware() Middleware

func (HandleFunc) PreMiddleware

func (h HandleFunc) PreMiddleware() Middleware

type Message

type Message struct {
	Subject string

	Bytes []byte // ingress byte payload or egress byte payload
	Body  any    // egress golang object

	Mutex sync.Mutex // for egress broadcast data process

	// RouteParam are used to capture values from subject.
	// These parameters represent resources or identifiers.
	//
	// Example:
	//
	//	define mux subject = "/users/{id}"
	//	send or recv subject = "/users/1017"
	//
	//	get route param:
	//		key : value => id : 1017
	RouteParam maputil.Data

	Metadata maputil.Data

	// raw message from 3rd pkg
	//
	// Example:
	//
	// fiber.Ctx or amqp.Delivery or kafka.Message
	RawInfra any

	Ctx context.Context
	// contains filtered or unexported fields
}

Message represents a high-level abstraction data structure containing metadata (e.g. header) + body

func GetMessage

func GetMessage() *Message

func NewBodyEgress

func NewBodyEgress(subject string, body any) *Message

func NewBytesEgress

func NewBytesEgress(subject string, bMessage []byte) *Message

func (*Message) AckPingPong

func (msg *Message) AckPingPong()

func (*Message) Copy

func (msg *Message) Copy() *Message

func (*Message) MsgId

func (msg *Message) MsgId() string

func (*Message) Reply

func (msg *Message) Reply() Reply

func (*Message) SetMsgId

func (msg *Message) SetMsgId(msgId string)

func (*Message) SetPingPong

func (msg *Message) SetPingPong(pingpong chan struct{})

func (*Message) SetReply

func (msg *Message) SetReply(r Reply)

type Middleware

type Middleware func(next HandleFunc) HandleFunc

func UseRecover

func UseRecover() Middleware

type MockProducer

type MockProducer struct {
	// contains filtered or unexported fields
}

MockProducer is a mock of Producer interface.

func NewMockProducer

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance.

func (*MockProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockProducer) Send

func (m *MockProducer) Send(arg0 ...*Message) error

Send mocks base method.

func (*MockProducer) SendWithCtx

func (m *MockProducer) SendWithCtx(arg0 context.Context, arg1 ...*Message) error

SendWithCtx mocks base method.

type MockProducerMockRecorder

type MockProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer.

func (*MockProducerMockRecorder) Send

Send indicates an expected call of Send.

func (*MockProducerMockRecorder) SendWithCtx

func (mr *MockProducerMockRecorder) SendWithCtx(arg0 any, arg1 ...any) *MockProducerSendWithCtxCall

SendWithCtx indicates an expected call of SendWithCtx.

type MockProducerSendCall

type MockProducerSendCall struct {
	*gomock.Call
}

MockProducerSendCall wrap *gomock.Call

func (*MockProducerSendCall) Do

Do rewrite *gomock.Call.Do

func (*MockProducerSendCall) DoAndReturn

func (c *MockProducerSendCall) DoAndReturn(f func(...*Message) error) *MockProducerSendCall

DoAndReturn rewrite *gomock.Call.DoAndReturn

func (*MockProducerSendCall) Return

Return rewrite *gomock.Call.Return

type MockProducerSendWithCtxCall

type MockProducerSendWithCtxCall struct {
	*gomock.Call
}

MockProducerSendWithCtxCall wrap *gomock.Call

func (*MockProducerSendWithCtxCall) Do

Do rewrite *gomock.Call.Do

func (*MockProducerSendWithCtxCall) DoAndReturn

DoAndReturn rewrite *gomock.Call.DoAndReturn

func (*MockProducerSendWithCtxCall) Return

Return rewrite *gomock.Call.Return

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux refers to a router or multiplexer, which can be used to handle different message.

func NewMux

func NewMux(routeDelimiter string) *Mux

NewMux If routeDelimiter is an empty string, Message.RouteParam cannot be used. RouteDelimiter can only be set to a string of length 1. This parameter determines different parts of the Message.Subject.

func (*Mux) DefaultHandler

func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux

DefaultHandler When a subject cannot be found, execute the 'Default'.

"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."

func (*Mux) Endpoints

func (mux *Mux) Endpoints(action func(subject, handler string))

Endpoints get register handler function information

func (*Mux) ErrorHandler

func (mux *Mux) ErrorHandler(errHandler ErrorHandleFunc) *Mux

func (*Mux) Group

func (mux *Mux) Group(groupName string) *Mux

func (*Mux) GroupByNumber

func (mux *Mux) GroupByNumber(groupName int) *Mux

func (*Mux) HandleMessage

func (mux *Mux) HandleMessage(message *Message, dependency any) (err error)

HandleMessage is also a HandleFunc, but with added routing capabilities.

func (*Mux) Handler

func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux

func (*Mux) HandlerByNumber

func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux

func (*Mux) Middleware

func (mux *Mux) Middleware(middlewares ...Middleware) *Mux

Middleware Before registering handler, middleware must be defined; otherwise, the handler won't be able to use middleware.

func (*Mux) NotFoundHandler

func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux

NotFoundHandler When a subject cannot be found, execute the 'NotFound'.

"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."

func (*Mux) PostMiddleware

func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux

func (*Mux) PreMiddleware

func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux

func (*Mux) Transform

func (mux *Mux) Transform(transform HandleFunc) *Mux

Transform when executing trie.handleMessage will re-fetch Message.Subject or update Message.Bytes after executing the transform function

type Producer

type Producer interface {
	Send(messages ...*Message) error
	SendWithCtx(ctx context.Context, messages ...*Message) error
}

type Reply

type Reply struct {
	// contains filtered or unexported fields
}

Reply is used to push or pull a response. The response represents the result obtained after the Consumer processes the Message send by the Producer.

- Consumer use Reply.Push response.

- Producer use Reply.Pull response.

https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html https://docs.nats.io/nats-concepts/core-nats/reqreply#the-pattern

func NewReply

func NewReply(qty int) Reply

func (Reply) Pull

func (r Reply) Pull() (Result any, Err error)

Pull 代表的意思是, Producer 透過 一個 Reply 取得 一個 Message 的 response

func (Reply) PullWithCtx

func (r Reply) PullWithCtx(ctx context.Context) (Result any, Err error)

func (Reply) Pulls

func (r Reply) Pulls() (Results []any, Err error)

Pulls 代表的意思是, Producer 透過 一個 Reply 取得 多個 Message 的 response

func (Reply) PullsWithCtx

func (r Reply) PullsWithCtx(ctx context.Context) (Results []any, Err error)

func (Reply) Push

func (r Reply) Push(Result any, Err error)

func (Reply) PushWithCtx

func (r Reply) PushWithCtx(ctx context.Context, Result any, Err error) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL