stream

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2025 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// MetadataStreamClose can be added by producers when publishing messages to a topic, indicating no more messages will be sent to the stream.
	// This should never be sent if there is more than one producer for the stream (e.g. only send from gRPC, not from websockets).
	MetadataStreamClose = "stream-close"
)

metadataID defines the ID of the entity for the stream, enabling multiple ents to be multiplexed over a topic. metadataOrderKey defines the key for specifying a unique stream ID, which all published messages should be ordered (per order key). metadataOrderIndex defines the index of a published message within the context of an order key.

Variables

This section is empty.

Functions

func NewShellHandler

func NewShellHandler(graph *ent.Client, mux *Mux) http.HandlerFunc

NewShellHandler provides an HTTP handler which handles a websocket for shell io. It requires a query param "shell_id" be specified (must be an integer). This ID represents which Shell ent the websocket will connect to.

func PreventPubSubColdStarts

func PreventPubSubColdStarts(ctx context.Context, interval time.Duration, topicShellOutput string, topicShellInput string)

PreventPubSubColdStarts by publishing noop messages on an interval. This reduces cold-start latency for GCP PubSub which can improve shell user experience. In other environments, this functionality may not be necessary.

Types

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

A Mux enables multiplexing subscription messages to multiple Streams. Streams will only receive a Message if their configured ID matches the incoming metadata of a Message. Additionally, new messages may be published using the Mux.

func NewMux

func NewMux(pub *pubsub.Topic, sub *pubsub.Subscription, options ...MuxOption) *Mux

NewMux initializes and returns a new Mux with the provided pubsub info.

func (*Mux) Register

func (mux *Mux) Register(s *Stream)

Register a new stream with the Mux, which will receive broadcast messages from a pubsub subscription if the Message metadata ID matches the stream ID.

func (*Mux) Start

func (mux *Mux) Start(ctx context.Context) error

Start the mux, returning an error if polling ever fails.

func (*Mux) Unregister

func (mux *Mux) Unregister(s *Stream)

Unregister a stream when it should no longer receive Messages from the Mux. Typically this is done via defer after registering a Stream. Unregistering a stream that is not registered will still close the stream channel.

type MuxOption

type MuxOption func(*Mux)

A MuxOption is used to provide further configuration to the Mux.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

A Stream is registered with a Mux to receive filtered messages from a pubsub subscription.

func New

func New(id string) *Stream

New initializes a new stream that will only receive messages with the provided ID. It must be registered on a Mux to begin receiving messages. This method panics if it fails to generate a random string for the order-key.

func (*Stream) Close

func (s *Stream) Close() error

Close the stream, preventing it from receiving any new messages. The Mux a stream is registered with will call Close() when it is unregistered.

func (*Stream) Messages

func (s *Stream) Messages() <-chan *pubsub.Message

Messages returns a channel for receiving new pubsub messages.

func (*Stream) SendMessage

func (s *Stream) SendMessage(ctx context.Context, msg *pubsub.Message, mux *Mux) error

SendMessage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL