Documentation
¶
Overview ¶
Package stream provides a NATS client for streaming records between clients with asynchronous response handling.
Overview:
This package implements a client leveraging NATS for sending and receiving streaming data records. It abstracts NATS connections, subscriptions, and message handling complexities, offering a simplified interface for data streaming. The client supports multiplexing multiple streams over a single NATS subscription, handling responses from different streams using a unique token-based mechanism. Additionally, the package introduces a Writer component, designed to abstract the complexities of data encoding and NATS publishing into a simple, intuitive interface.
How It Works:
The Client part of the package manages dynamic inboxes for each streaming session, facilitating the sending of data and listening for responses on dedicated subjects. It leverages the NATS publish-subscribe model for asynchronous communication, efficiently routing and correlating messages to their respective streams.
The Writer component allows for easy publishing of structured data to any NATS subject. It integrates tightly with the Client, utilizing the same connection for streamlined data streaming. The Writer simplifies the publication process, automatically handling data serialization and supporting graceful stream closure with custom codes.
Multiplexing Streams:
To efficiently handle multiple streams, the client uses a single wildcard subscription for all responses. Each request is sent with a unique response subject (derived from a base inbox prefix), with responses routed back to this subject. The client demultiplexes incoming messages by extracting a token from the response subject, identifying the correct stream (or "bucket") for the message. This approach allows managing multiple concurrent streams with minimal overhead, leveraging NATS's lightweight subjects and messaging capabilities.
Key Features:
Asynchronous Streaming: Supports asynchronous data streaming, allowing clients to send data and receive responses without blocking.
Context Support: Integrates with Go's context package for timeouts, cancellation, and deadlines for streaming requests.
Multiplexing: Efficiently multiplexes multiple streams over a single NATS subscription, using unique response subjects for message correlation.
Error Handling: Provides robust error handling, including custom error codes for stream-related errors (e.g., bad data, normal closure).
Data Publication: The Writer simplifies structured data publishing to NATS, with support for automatic serialization and stream closure signals.
Usage:
Initialize the client with a NATS connection and use the provided methods to send streaming requests and handle responses. The client manages the NATS subscription and response routing, simplifying the process of working with streaming data.
Example:
params := stream.ClientParams{Conn: natsConn} client, err := stream.NewClient(params) if err != nil { log.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() responseChan, err := client.OpenStream(ctx, "subject", []byte("data")) if err != nil { log.Fatal(err) } for asyncResult := range responseChan { if asyncResult.Err != nil { log.Printf("Received an error: %v", asyncResult.Err) continue } log.Printf("Received data: %s", asyncResult.Value) }
This package leverages the concurrency.AsyncResult type for handling asynchronous responses, allowing clients to distinguish between successful data responses and error conditions.
Note: This client is designed for use with NATS and requires an established NATS connection to function. It does not handle NATS connection management, which must be performed separately by the user.
Index ¶
Constants ¶
const ( // CloseNormalClosure Indicates that the stream was closed after completing its // intended purpose. No errors occurred, and the closure was planned. CloseNormalClosure = 1000 // CloseGoingAway Used when a service is shutting down or a client // is disconnecting normally but unexpectedly CloseGoingAway = 1001 // CloseUnsupportedData Indicates that data sent over an established stream is invalid, // corrupt, or cannot be processed. This is used after a stream has been successfully // established but encounters data-related issues. CloseUnsupportedData = 1003 // CloseAbnormalClosure Indicates that a stream was closed unexpectedly, // without a known reason. This might be used when a connection is dropped // due to network issues, or a service crashes. CloseAbnormalClosure = 1006 // CloseBadRequest Signifies that the initial request to establish a stream contained // invalid parameters or was malformed, preventing the stream from being established. CloseBadRequest = 4000 // CloseInternalServerErr Used when an unexpected condition was encountered // by the server, preventing it from fulfilling the request. This signals issues // that are internal to the server or the processing system. CloseInternalServerErr = 5000 )
Close error codes that follow these semantics: - 1000 Series: Normal and controlled shutdown scenarios. These are standard and expected reasons for closing a stream. - 4000 Series: Client-related errors or issues with the transmitted data, indicating problems that originate from the caller's side. - 5000 Series: Server-side errors, indicating problems that are internal to the server or the processing system.
const RequestChanLen = 16
RequestChanLen Default request channel length for buffering asynchronous results.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { Conn *nats.Conn // contains filtered or unexported fields }
Client represents a NATS streaming client.
func NewClient ¶
func NewClient(params ClientParams) (*Client, error)
NewClient creates a new NATS client.
func (*Client) OpenStream ¶
func (nc *Client) OpenStream(ctx context.Context, subj string, data []byte) (<-chan *concurrency.AsyncResult[[]byte], error)
OpenStream takes a context, a subject and payload in bytes and expects a channel with multiple responses.
type ClientParams ¶
type ClientParams struct {
Conn *nats.Conn
}
type CloseError ¶
type CloseError struct { // Code is defined in RFC 6455, section 11.7. Code int // Text is the optional text payload. Text string }
CloseError represents a close message.
func (*CloseError) Error ¶
func (e *CloseError) Error() string
CloseError implements the error interface.
type StreamingMsg ¶
type StreamingMsg struct { // Type is the type of the message. Type StreamingMsgType `json:"type"` // Data is the optional data payload. It is only used if Type is streamingMsgTypeData. Data []byte `json:"data,omitempty"` // CloseError is the optional close message. It is only used if Type is streamingMsgTypeClose. CloseError *CloseError `json:"closeError,omitempty"` }
StreamingMsg represents a streaming message that can be sent over NATS. It can be a data message or a close message.
type StreamingMsgType ¶
type StreamingMsgType int
StreamingMsgType represents the type of a streaming message.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
func (*Writer) CloseWithCode ¶
CloseWithCode closes the stream with a specific code.
func (*Writer) WriteObject ¶
WriteObject writes an object to the stream.