stream

package
v1.5.0-alpha10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package stream provides a NATS client for streaming records between clients with asynchronous response handling.

Overview:

This package implements a client leveraging NATS for sending and receiving streaming data records. It abstracts NATS connections, subscriptions, and message handling complexities, offering a simplified interface for data streaming. The client supports multiplexing multiple streams over a single NATS subscription, handling responses from different streams using a unique token-based mechanism. Additionally, the package introduces a Writer component, designed to abstract the complexities of data encoding and NATS publishing into a simple, intuitive interface.

How It Works:

  • The ConsumerClient part of the package manages dynamic inboxes for each streaming session, facilitating the sending of data and listening for responses on dedicated subjects. It leverages the NATS publish-subscribe model for asynchronous communication, efficiently routing and correlating messages to their respective streams.

  • The ProducerClient component is instrumental in maintaining the smooth exchange of information between the server and client. Its primary role is to manage the heartbeat mechanism that plays a pivotal role in synchronizing the client and server. A 'heartbeat' is essentially a methodical signal dispatched by the ProducerClient at fixed intervals. Each heartbeat carries a list of active stream IDs, which instructs the ConsumerClient on which streams to keep open for continued communication.Taking advantage of NATS's asynchronous publish-subscribe model, the ProducerClient ensures that heartbeat messages are efficiently directed to the right ConsumerClient. In turn, the ConsumerClient recognizes the active streams and keeps them open for receiving subsequent messages. It's important that the ProducerClient receives a response to each heartbeat within a certain timeout period. This response comprises a list of non-active stream IDs sent by the ConsumerClient. The ProducerClient can interpret this response to understand which streams are unnecessary, prompting it to stop publishing to those specific inbox subjects that were initially created by the ConsumerClient. In the absence of a timely heartbeat response, the ProducerClient assumes the ConsumerClient no longer requires the information. Consequently, it ceases publishing to those specific inbox subjects, thus preserving resources and ensuring efficient communication. Through this mechanism, the ProducerClient not only facilitates efficient communication but also effective resource management. It stops the overuse of streams that are no longer in demand, thereby maintaining the application's responsiveness and real-time prowess

  • The Writer component allows for easy publishing of structured data to any NATS subject. It integrates tightly with the ConsumerClient, utilizing the same connection for streamlined data streaming. The Writer simplifies the publication process, automatically handling data serialization and supporting graceful stream closure with custom codes.

Multiplexing Streams:

To efficiently handle multiple streams, the client uses a single wildcard subscription for all responses. Each request is sent with a unique response subject (derived from a base inbox prefix), with responses routed back to this subject. The client demultiplexes incoming messages by extracting a token from the response subject, identifying the correct stream (or "bucket") for the message. This approach allows managing multiple concurrent streams with minimal overhead, leveraging NATS's lightweight subjects and messaging capabilities.

Key Features:

  • Asynchronous Streaming: Supports asynchronous data streaming, allowing clients to send data and receive responses without blocking.

  • Context Support: Integrates with Go's context package for timeouts, cancellation, and deadlines for streaming requests.

  • Multiplexing: Efficiently multiplexes multiple streams over a single NATS subscription, using unique response subjects for message correlation.

  • Error Handling: Provides robust error handling, including custom error codes for stream-related errors (e.g., bad data, normal closure).

  • Data Publication: The Writer simplifies structured data publishing to NATS, with support for automatic serialization and stream closure signals.

  • HeartBeating: The producer client in regular interval sends information to consumer client about the active stream ids.

Usage:

Initialize the client with a NATS connection and use the provided methods to send streaming requests and handle responses. The client manages the NATS subscription and response routing, simplifying the process of working with streaming data.

Example:

params := stream.ConsumerClientParams{Conn: natsConn}
client, err := stream.NewConsumerClient(params)
if err != nil {
    log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

responseChan, err := client.OpenStream(ctx, "subject", []byte("data"))
if err != nil {
    log.Fatal(err)
}

for asyncResult := range responseChan {
    if asyncResult.Err != nil {
        log.Printf("Received an error: %v", asyncResult.Err)
        continue
    }
    log.Printf("Received data: %s", asyncResult.Value)
}

This package leverages the concurrency.AsyncResult type for handling asynchronous responses, allowing clients to distinguish between successful data responses and error conditions.

Note: This client is designed for use with NATS and requires an established NATS connection to function. It does not handle NATS connection management, which must be performed separately by the user.

Index

Constants

View Source
const (
	// CloseNormalClosure Indicates that the stream was closed after completing its
	// intended purpose. No errors occurred, and the closure was planned.
	CloseNormalClosure = 1000
	// CloseGoingAway Used when a service is shutting down or a client
	// is disconnecting normally but unexpectedly
	CloseGoingAway = 1001
	// CloseUnsupportedData Indicates that data sent over an established stream is invalid,
	// corrupt, or cannot be processed. This is used after a stream has been successfully
	// established but encounters data-related issues.
	CloseUnsupportedData = 1003
	// CloseAbnormalClosure Indicates that a stream was closed unexpectedly,
	// without a known reason. This might be used when a connection is dropped
	// due to network issues, or a service crashes.
	CloseAbnormalClosure = 1006
	// CloseBadRequest Signifies that the initial request to establish a stream contained
	// invalid parameters or was malformed, preventing the stream from being established.
	CloseBadRequest = 4000
	// CloseInternalServerErr Used when an unexpected condition was encountered
	// by the server, preventing it from fulfilling the request. This signals issues
	// that are internal to the server or the processing system.
	CloseInternalServerErr = 5000
)

Close error codes that follow these semantics: - 1000 Series: Normal and controlled shutdown scenarios. These are standard and expected reasons for closing a stream. - 4000 Series: Client-related errors or issues with the transmitted data, indicating problems that originate from the caller's side. - 5000 Series: Server-side errors, indicating problems that are internal to the server or the processing system.

View Source
const (
	DefaultHeartBeatIntervalDuration        = 10 * time.Second
	DefaultHeartBeatRequestTimeout          = 5 * time.Second
	DefaultStreamCancellationBufferDuration = 10 * time.Second
)

Default Stream Configurations

View Source
const RequestChanLen = 16

RequestChanLen Default request channel length for buffering asynchronous results.

Variables

This section is empty.

Functions

This section is empty.

Types

type CloseError

type CloseError struct {
	// Code is defined in RFC 6455, section 11.7.
	Code int
	// Text is the optional text payload.
	Text string
}

CloseError represents a close message.

func (*CloseError) Error

func (e *CloseError) Error() string

CloseError implements the error interface.

type ConsumerClient added in v1.3.2

type ConsumerClient struct {
	Conn *nats.Conn
	// contains filtered or unexported fields
}

ConsumerClient represents a NATS streaming client.

func NewConsumerClient added in v1.3.2

func NewConsumerClient(params ConsumerClientParams) (*ConsumerClient, error)

NewConsumerClient creates a new NATS client.

func (*ConsumerClient) NewWriter added in v1.3.2

func (nc *ConsumerClient) NewWriter(subject string) *Writer

NewWriter creates a new streaming writer.

func (*ConsumerClient) OpenStream added in v1.3.2

func (nc *ConsumerClient) OpenStream(
	ctx context.Context, subj string,
	data []byte) (<-chan *concurrency.AsyncResult[[]byte], error)

OpenStream takes a context, a subject and payload in bytes and expects a channel with multiple responses.

type ConsumerClientParams added in v1.3.2

type ConsumerClientParams struct {
	Conn   *nats.Conn
	Config StreamConsumerClientConfig
}

type ConsumerHeartBeatResponse added in v1.3.2

type ConsumerHeartBeatResponse struct {
	// NonActiveStreamIds represents a map, where key is the request subject where consumer sent
	// request for opening a stream, and value is the list of streamIDs which should no longer be
	// active.
	NonActiveStreamIds map[string][]string
}

ConsumerHeartBeatResponse represents a heart beat response from the consumer client.

type HeartBeatRequest added in v1.3.2

type HeartBeatRequest struct {
	// ActiveStreamIds is a map of active stream ids on producer client, where key is the RequestSubject, where
	// the original request to initiate a streaming connection was sent.
	ActiveStreamIds map[string][]string
}

HeartBeatRequest sent by producer client to the consumer client.

type ProducerClient added in v1.3.2

type ProducerClient struct {
	Conn *nats.Conn
	// contains filtered or unexported fields
}

func NewProducerClient added in v1.3.2

func NewProducerClient(ctx context.Context, params ProducerClientParams) (*ProducerClient, error)

func (*ProducerClient) AddStream added in v1.3.2

func (pc *ProducerClient) AddStream(
	consumerID string,
	streamID string,
	requestSub string,
	heartBeatRequestSub string,
	cancelFunc context.CancelFunc,
) error

func (*ProducerClient) NewWriter added in v1.3.2

func (pc *ProducerClient) NewWriter(subject string) *Writer

func (*ProducerClient) RemoveStream added in v1.3.2

func (pc *ProducerClient) RemoveStream(consumerID string, streamID string) error

type ProducerClientParams added in v1.3.2

type ProducerClientParams struct {
	Conn   *nats.Conn
	Config StreamProducerClientConfig
}

type Request added in v1.3.2

type Request struct {
	// ConsumerID is the connection id of the consumer streaming client originating the request.
	ConsumerID string `json:"consumerId"`
	// StreamId is the id of the stream being created.
	StreamID string `json:"streamId"`
	// HeartBeatSub is the heart beat subject where the producer client will send its heart beat.
	HeartBeatRequestSub string `json:"heartBeatRequestSub"`
	// Data represents request of different stream type. For example currently we support Log request
	// in that case it would be ExecutionLogRequest
	Data []byte `json:"body"`
}

type StreamConsumerClientConfig added in v1.3.2

type StreamConsumerClientConfig struct {
	StreamCancellationBufferDuration time.Duration
}

StreamConsumerClientConfig represents the configuration of NATS based streaming client acting as a consumer.

type StreamInfo added in v1.3.2

type StreamInfo struct {
	// ID is the identifier of the stream.
	ID string
	// RequestSub is the subject on which the request for this stream was sent.
	RequestSub string
	// CreatedAt represents the time the stream was created.
	CreatedAt time.Time
	// Function to cancel the stream. This is useful in the event the consumer client
	// is no longer interested in the stream. The cancel function is inovked informing the
	// producer to no longer serve the stream.
	Cancel context.CancelFunc
}

StreamInfo represents information about the stream.

type StreamProducerClientConfig added in v1.3.2

type StreamProducerClientConfig struct {
	// HeartBeatIntervalDuration represents the duration between two heart beats from the producer client
	// to consumer client.
	HeartBeatIntervalDuration time.Duration
	// HeartBeatRequestTimeout represents the time within which the producer client should receive the
	// response from the consumer client.
	HeartBeatRequestTimeout time.Duration
	// StreamCancellationBufferDuration represents the time interval for which consumer or producer client
	// should wait before killing the stream in case of race conditions on heart beats and request origination.
	StreamCancellationBufferDuration time.Duration
}

StreamProducerClientConfig represents the configuration of NATS based streaming client acting as a producer.

type StreamingMsg

type StreamingMsg struct {
	// Type is the type of the message.
	Type StreamingMsgType `json:"type"`
	// Data is the optional data payload. It is only used if Type is streamingMsgTypeData.
	Data []byte `json:"data,omitempty"`
	// CloseError is the optional close message. It is only used if Type is streamingMsgTypeClose.
	CloseError *CloseError `json:"closeError,omitempty"`
}

StreamingMsg represents a streaming message that can be sent over NATS. It can be a data message or a close message.

type StreamingMsgType

type StreamingMsgType int

StreamingMsgType represents the type of a streaming message.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(conn *nats.Conn, subject string) *Writer

NewWriter creates a new Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the stream.

func (*Writer) CloseWithCode

func (w *Writer) CloseWithCode(code int, text ...string) error

CloseWithCode closes the stream with a specific code.

func (*Writer) Write

func (w *Writer) Write(data []byte) (int, error)

WriteData writes data to the stream.

func (*Writer) WriteObject

func (w *Writer) WriteObject(obj interface{}) (int, error)

WriteObject writes an object to the stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL