Documentation ¶
Index ¶
- type Client
- func (c *Client) Connect() error
- func (c *Client) CountStreamMessages(ctx context.Context) (int, error)
- func (c *Client) DeleteMessageBySeq(ctx context.Context, seq uint64) error
- func (c *Client) GetMessageBySeq(ctx context.Context, seq uint64) (*MessageItem, error)
- func (c *Client) GetMessageIterator(ctx context.Context) (jetstream.MessagesContext, error)
- func (c *Client) ListUndeliveredMessages(ctx context.Context) ([]MessageItem, error)
- func (c *Client) PublishToStream(ctx context.Context, data []byte) (uint64, error)
- type Manager
- type MessageItem
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { JS jetstream.JetStream ConnectFn func(url string, options ...nats.Option) (*nats.Conn, error) JetStreamFn func(*nats.Conn) (jetstream.JetStream, error) // contains filtered or unexported fields }
Client implementation of the Client's task operations.
func (*Client) Connect ¶
Connect establishes the connection to the NATS server and JetStream context. This method returns an error if there are any issues during connection.
func (*Client) CountStreamMessages ¶
CountStreamMessages returns the total number of messages in the specified JetStream stream (queue).
func (*Client) DeleteMessageBySeq ¶
DeleteMessageBySeq deletes a single message from the specified JetStream stream by its sequence number.
func (*Client) GetMessageBySeq ¶
GetMessageBySeq retrieves a single message from the specified JetStream stream by its sequence number.
func (*Client) GetMessageIterator ¶
GetMessageIterator retrieves the message iterator using a JetStream consumer.
func (*Client) ListUndeliveredMessages ¶
func (c *Client) ListUndeliveredMessages( ctx context.Context, ) ([]MessageItem, error)
ListUndeliveredMessages retrieves a list of undelivered messages from the JetStream consumer.
The ephemeral consumer is created specifically to "peek" into the stream and fetch undelivered messages without acknowledging them, thus allowing other consumers to still process the messages normally. This method is useful for scenarios where you want visibility into pending messages but do not want to consume them in the traditional sense.
NOTE(retr0h): This function does not implement pagination as the queue is not expected to grow large enough to warrant it. In normal operation, if the queue does become large, it indicates an operational issue that requires further investigation.
type Manager ¶
type Manager interface { // Connect establishes the connection to the NATS server and JetStream context. // This method returns an error if there are any issues during connection. Connect() error //// Connect(...jetstream.JetStreamOpt) error // CountStreamMessages returns the total number of messages in the specified JetStream stream (queue). CountStreamMessages( ctx context.Context, ) (int, error) // DeleteMessageBySeq deletes a single message from the specified JetStream // stream by its sequence number. DeleteMessageBySeq( ctx context.Context, seq uint64, ) error // GetMessageBySeq retrieves a single message from the specified JetStream // stream by its sequence number. GetMessageBySeq( ctx context.Context, seq uint64, ) (*MessageItem, error) // ListUndeliveredMessages retrieves a list of undelivered messages from the // JetStream consumer. ListUndeliveredMessages( ctx context.Context, ) ([]MessageItem, error) // PublishToStream publishes a message to the specified JetStream stream using the existing JetStream context. PublishToStream( ctx context.Context, data []byte, ) (uint64, error) // GetMessageIterator retrieves the message iterator using a JetStream consumer. GetMessageIterator( ctx context.Context, ) (jetstream.MessagesContext, error) }
Manager responsible for Client operations.
type MessageItem ¶
type MessageItem struct { // Stream sequence number. StreamSeq uint64 `json:"stream_sequence"` // Timestamp when the message was stored in the stream. StoredAt time.Time `json:"stored_at"` // Data the message (raw data). Data []byte `json:"body"` }
MessageItem represents a JetStream message with metadata and content.