stream

package
v1.2.2-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package stream provides a NATS client for streaming records between clients with asynchronous response handling.

Overview:

This package implements a client leveraging NATS for sending and receiving streaming data records. It abstracts NATS connections, subscriptions, and message handling complexities, offering a simplified interface for data streaming. The client supports multiplexing multiple streams over a single NATS subscription, handling responses from different streams using a unique token-based mechanism. Additionally, the package introduces a Writer component, designed to abstract the complexities of data encoding and NATS publishing into a simple, intuitive interface.

How It Works:

  • The Client part of the package manages dynamic inboxes for each streaming session, facilitating the sending of data and listening for responses on dedicated subjects. It leverages the NATS publish-subscribe model for asynchronous communication, efficiently routing and correlating messages to their respective streams.

  • The Writer component allows for easy publishing of structured data to any NATS subject. It integrates tightly with the Client, utilizing the same connection for streamlined data streaming. The Writer simplifies the publication process, automatically handling data serialization and supporting graceful stream closure with custom codes.

Multiplexing Streams:

To efficiently handle multiple streams, the client uses a single wildcard subscription for all responses. Each request is sent with a unique response subject (derived from a base inbox prefix), with responses routed back to this subject. The client demultiplexes incoming messages by extracting a token from the response subject, identifying the correct stream (or "bucket") for the message. This approach allows managing multiple concurrent streams with minimal overhead, leveraging NATS's lightweight subjects and messaging capabilities.

Key Features:

  • Asynchronous Streaming: Supports asynchronous data streaming, allowing clients to send data and receive responses without blocking.

  • Context Support: Integrates with Go's context package for timeouts, cancellation, and deadlines for streaming requests.

  • Multiplexing: Efficiently multiplexes multiple streams over a single NATS subscription, using unique response subjects for message correlation.

  • Error Handling: Provides robust error handling, including custom error codes for stream-related errors (e.g., bad data, normal closure).

  • Data Publication: The Writer simplifies structured data publishing to NATS, with support for automatic serialization and stream closure signals.

Usage:

Initialize the client with a NATS connection and use the provided methods to send streaming requests and handle responses. The client manages the NATS subscription and response routing, simplifying the process of working with streaming data.

Example:

params := stream.ClientParams{Conn: natsConn}
client, err := stream.NewClient(params)
if err != nil {
    log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

responseChan, err := client.OpenStream(ctx, "subject", []byte("data"))
if err != nil {
    log.Fatal(err)
}

for asyncResult := range responseChan {
    if asyncResult.Err != nil {
        log.Printf("Received an error: %v", asyncResult.Err)
        continue
    }
    log.Printf("Received data: %s", asyncResult.Value)
}

This package leverages the concurrency.AsyncResult type for handling asynchronous responses, allowing clients to distinguish between successful data responses and error conditions.

Note: This client is designed for use with NATS and requires an established NATS connection to function. It does not handle NATS connection management, which must be performed separately by the user.

Index

Constants

View Source
const (
	// CloseNormalClosure Indicates that the stream was closed after completing its
	// intended purpose. No errors occurred, and the closure was planned.
	CloseNormalClosure = 1000
	// CloseGoingAway Used when a service is shutting down or a client
	// is disconnecting normally but unexpectedly
	CloseGoingAway = 1001
	// CloseUnsupportedData Indicates that data sent over an established stream is invalid,
	// corrupt, or cannot be processed. This is used after a stream has been successfully
	// established but encounters data-related issues.
	CloseUnsupportedData = 1003
	// CloseAbnormalClosure Indicates that a stream was closed unexpectedly,
	// without a known reason. This might be used when a connection is dropped
	// due to network issues, or a service crashes.
	CloseAbnormalClosure = 1006
	// CloseBadRequest Signifies that the initial request to establish a stream contained
	// invalid parameters or was malformed, preventing the stream from being established.
	CloseBadRequest = 4000
	// CloseInternalServerErr Used when an unexpected condition was encountered
	// by the server, preventing it from fulfilling the request. This signals issues
	// that are internal to the server or the processing system.
	CloseInternalServerErr = 5000
)

Close error codes that follow these semantics: - 1000 Series: Normal and controlled shutdown scenarios. These are standard and expected reasons for closing a stream. - 4000 Series: Client-related errors or issues with the transmitted data, indicating problems that originate from the caller's side. - 5000 Series: Server-side errors, indicating problems that are internal to the server or the processing system.

View Source
const RequestChanLen = 16

RequestChanLen Default request channel length for buffering asynchronous results.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	Conn *nats.Conn
	// contains filtered or unexported fields
}

Client represents a NATS streaming client.

func NewClient

func NewClient(params ClientParams) (*Client, error)

NewClient creates a new NATS client.

func (*Client) NewWriter

func (nc *Client) NewWriter(subject string) *Writer

NewWriter creates a new streaming writer.

func (*Client) OpenStream

func (nc *Client) OpenStream(ctx context.Context, subj string, data []byte) (<-chan *concurrency.AsyncResult[[]byte], error)

OpenStream takes a context, a subject and payload in bytes and expects a channel with multiple responses.

type ClientParams

type ClientParams struct {
	Conn *nats.Conn
}

type CloseError

type CloseError struct {
	// Code is defined in RFC 6455, section 11.7.
	Code int
	// Text is the optional text payload.
	Text string
}

CloseError represents a close message.

func (*CloseError) Error

func (e *CloseError) Error() string

CloseError implements the error interface.

type StreamingMsg

type StreamingMsg struct {
	// Type is the type of the message.
	Type StreamingMsgType `json:"type"`
	// Data is the optional data payload. It is only used if Type is streamingMsgTypeData.
	Data []byte `json:"data,omitempty"`
	// CloseError is the optional close message. It is only used if Type is streamingMsgTypeClose.
	CloseError *CloseError `json:"closeError,omitempty"`
}

StreamingMsg represents a streaming message that can be sent over NATS. It can be a data message or a close message.

type StreamingMsgType

type StreamingMsgType int

StreamingMsgType represents the type of a streaming message.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(client *Client, subject string) *Writer

NewWriter creates a new Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the stream.

func (*Writer) CloseWithCode

func (w *Writer) CloseWithCode(code int, text ...string) error

CloseWithCode closes the stream with a specific code.

func (*Writer) Write

func (w *Writer) Write(data []byte) (int, error)

WriteData writes data to the stream.

func (*Writer) WriteObject

func (w *Writer) WriteObject(obj interface{}) (int, error)

WriteObject writes an object to the stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL