client

package
v0.0.0-...-bd9c4bf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	JS          jetstream.JetStream
	ConnectFn   func(url string, options ...nats.Option) (*nats.Conn, error)
	JetStreamFn func(*nats.Conn) (jetstream.JetStream, error)
	// contains filtered or unexported fields
}

Client implementation of the Client's task operations.

func New

func New(
	appConfig config.Config,
	logger *slog.Logger,
) *Client

New initialize and configure a new Client instance.

func (*Client) Connect

func (c *Client) Connect() error

Connect establishes the connection to the NATS server and JetStream context. This method returns an error if there are any issues during connection.

func (*Client) CountStreamMessages

func (c *Client) CountStreamMessages(
	ctx context.Context,
) (int, error)

CountStreamMessages returns the total number of messages in the specified JetStream stream (queue).

func (*Client) DeleteMessageBySeq

func (c *Client) DeleteMessageBySeq(
	ctx context.Context,
	seq uint64,
) error

DeleteMessageBySeq deletes a single message from the specified JetStream stream by its sequence number.

func (*Client) GetMessageBySeq

func (c *Client) GetMessageBySeq(
	ctx context.Context,
	seq uint64,
) (*MessageItem, error)

GetMessageBySeq retrieves a single message from the specified JetStream stream by its sequence number.

func (*Client) GetMessageIterator

func (c *Client) GetMessageIterator(
	ctx context.Context,
) (jetstream.MessagesContext, error)

GetMessageIterator retrieves the message iterator using a JetStream consumer.

func (*Client) ListUndeliveredMessages

func (c *Client) ListUndeliveredMessages(
	ctx context.Context,
) ([]MessageItem, error)

ListUndeliveredMessages retrieves a list of undelivered messages from the JetStream consumer.

The ephemeral consumer is created specifically to "peek" into the stream and fetch undelivered messages without acknowledging them, thus allowing other consumers to still process the messages normally. This method is useful for scenarios where you want visibility into pending messages but do not want to consume them in the traditional sense.

NOTE(retr0h): This function does not implement pagination as the queue is not expected to grow large enough to warrant it. In normal operation, if the queue does become large, it indicates an operational issue that requires further investigation.

func (*Client) PublishToStream

func (c *Client) PublishToStream(
	ctx context.Context,
	data []byte,
) (uint64, error)

PublishToStream publishes a message to the specified JetStream stream using the existing JetStream context.

type Manager

type Manager interface {
	// Connect establishes the connection to the NATS server and JetStream context.
	// This method returns an error if there are any issues during connection.
	Connect() error
	//// Connect(...jetstream.JetStreamOpt) error
	// CountStreamMessages returns the total number of messages in the specified JetStream stream (queue).
	CountStreamMessages(
		ctx context.Context,
	) (int, error)
	// DeleteMessageBySeq deletes a single message from the specified JetStream
	// stream by its sequence number.
	DeleteMessageBySeq(
		ctx context.Context,
		seq uint64,
	) error
	// GetMessageBySeq retrieves a single message from the specified JetStream
	// stream by its sequence number.
	GetMessageBySeq(
		ctx context.Context,
		seq uint64,
	) (*MessageItem, error)
	// ListUndeliveredMessages retrieves a list of undelivered messages from the
	// JetStream consumer.
	ListUndeliveredMessages(
		ctx context.Context,
	) ([]MessageItem, error)
	// PublishToStream publishes a message to the specified JetStream stream using the existing JetStream context.
	PublishToStream(
		ctx context.Context,
		data []byte,
	) (uint64, error)
	// GetMessageIterator retrieves the message iterator using a JetStream consumer.
	GetMessageIterator(
		ctx context.Context,
	) (jetstream.MessagesContext, error)
}

Manager responsible for Client operations.

type MessageItem

type MessageItem struct {
	// Stream sequence number.
	StreamSeq uint64 `json:"stream_sequence"`
	// Timestamp when the message was stored in the stream.
	StoredAt time.Time `json:"stored_at"`
	// Data the message (raw data).
	Data []byte `json:"body"`
}

MessageItem represents a JetStream message with metadata and content.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL