Documentation ¶
Index ¶
- type AckIndication
- type AckSeqNum
- type AlertOnErrorCB
- type ForwardMessageHandlerCB
- type JetStreamACKBroadcaster
- type JetStreamACKReceiver
- type JetStreamAckHandler
- type JetStreamInflightMsgProcessor
- type JetStreamPublisher
- type JetStreamPushSubscriber
- type MessageDispatcher
- type MsgToDeliver
- type MsgToDeliverSeq
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AckIndication ¶
type AckIndication struct { // Stream is the name of the stream Stream string `json:"stream" validate:"required,alphanum|uuid"` // Consumer is the name of the consumer Consumer string `json:"consumer" validate:"required,alphanum|uuid"` // SeqNum is the sequence number of the JetStream message SeqNum AckSeqNum `json:"seq_num" validate:"required,dive"` }
AckIndication is the ACK of a NATs JetStream message which contains its key parameters
func (AckIndication) String ¶
func (m AckIndication) String() string
String toString for ackIndication
type AckSeqNum ¶
type AckSeqNum struct { // Stream is the JetStream message sequence number for this stream Stream uint64 `json:"stream" validate:"required,gte=0"` // Consumer is the JetStream message sequence number for this consumer Consumer uint64 `json:"consumer" validate:"required,gte=0"` }
AckSeqNum are the sequence numbers of the NATs JetStream message
type AlertOnErrorCB ¶
type AlertOnErrorCB func(err error)
AlertOnErrorCB callback used to expose internal error to an outer context for handling
type ForwardMessageHandlerCB ¶
ForwardMessageHandlerCB callback used to forward new messages to the next pipeline stage
type JetStreamACKBroadcaster ¶
type JetStreamACKBroadcaster interface { // BroadcastACK broadcast a JetStream message ACK BroadcastACK(ctxt context.Context, ack AckIndication) error }
JetStreamACKBroadcaster broadcasts JetStream message ACK through NATs subjects
func GetJetStreamACKBroadcaster ¶
func GetJetStreamACKBroadcaster( natsClient *core.NatsClient, instance string, ) (JetStreamACKBroadcaster, error)
GetJetStreamACKBroadcaster define JetStreamACKBroadcaster
type JetStreamACKReceiver ¶
type JetStreamACKReceiver interface { // SubscribeForACKs start receiving JetStream message ACKs SubscribeForACKs(wg *sync.WaitGroup, handler JetStreamAckHandler) error }
JetStreamACKReceiver processes JetStream message ACKs being broadcast through NATs subjects
type JetStreamAckHandler ¶
type JetStreamAckHandler func(context.Context, AckIndication)
JetStreamAckHandler is the function signature for callback processing a JetStream ACK
type JetStreamInflightMsgProcessor ¶
type JetStreamInflightMsgProcessor interface { // RecordInflightMessage records a new JetStream message inflight awaiting ACK RecordInflightMessage(callCtxt context.Context, msg *nats.Msg, blocking bool) error // HandlerMsgACK processes a new message ACK HandlerMsgACK(callCtxt context.Context, ack AckIndication, blocking bool) error }
JetStreamInflightMsgProcessor processes inflight JetStream messages awaiting ACK
type JetStreamPublisher ¶
type JetStreamPublisher interface { // Publish publishes a new message into JetStream on a subject Publish(ctxt context.Context, subject string, msg []byte) error }
JetStreamPublisher publishes new messages into JetStream
func GetJetStreamPublisher ¶
func GetJetStreamPublisher( natsClient *core.NatsClient, instance string, ) (JetStreamPublisher, error)
GetJetStreamPublisher get new JetStreamPublisher
type JetStreamPushSubscriber ¶
type JetStreamPushSubscriber interface { // StartReading begin reading data from JetStream StartReading( forwardCB ForwardMessageHandlerCB, errorCB AlertOnErrorCB, wg *sync.WaitGroup, ) error }
JetStreamPushSubscriber is directly reading from JetStream with a push consumer
type MessageDispatcher ¶
type MessageDispatcher interface { // Start starts operations Start(msgOutput ForwardMessageHandlerCB, errorCB AlertOnErrorCB) error }
MessageDispatcher process a consumer subscription request from a client and dispatch messages to that client
func GetPushMessageDispatcher ¶
func GetPushMessageDispatcher( ctxt context.Context, natsClient *core.NatsClient, stream, subject, consumer string, deliveryGroup *string, maxInflightMsgs int, wg *sync.WaitGroup, ) (MessageDispatcher, error)
GetPushMessageDispatcher get a new push MessageDispatcher
type MsgToDeliver ¶
type MsgToDeliver struct { // Stream is the name of the stream Stream string `json:"stream" validate:"required,alphanum|uuid"` // Subject is the name of the subject / subject filter Subject string `json:"subject" validate:"required"` // Consumer is the name of the consumer Consumer string `json:"consumer" validate:"required,alphanum|uuid"` // Sequence is the sequence numbers for this JetStream message Sequence MsgToDeliverSeq `json:"sequence" validate:"required,dive"` // Message is the message body Message []byte `json:"b64_msg" validate:"required" swaggertype:"string" format:"base64" example:"SGVsbG8gV29ybGQK"` }
MsgToDeliver a structure for representing a message to send out to a subscribing client
func ConvertJSMessageDeliver ¶
func ConvertJSMessageDeliver(subject string, msg *nats.Msg) (MsgToDeliver, error)
ConvertJSMessageDeliver convert a JetStream message for delivery
func (MsgToDeliver) String ¶
func (m MsgToDeliver) String() string
String toString function for MsgToDeliver
type MsgToDeliverSeq ¶
type MsgToDeliverSeq struct { // Stream is the message sequence number within the stream Stream uint64 `json:"stream" validate:"required,gte=0"` // Consumer is the message sequence number for this consumer Consumer uint64 `json:"consumer" validate:"required,gte=0"` }
MsgToDeliverSeq sequence numbers for a JetStream message