streambuffer

package
v0.0.0-...-b349366 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2020 License: MIT Imports: 7 Imported by: 1

Documentation

Overview

Package streambuffer implements a buffered streaming RPC that drops the oldest messages on buffer overflow.

Example
// Set up gRPC as usual
conn, err := grpc.Dial(addr, opts...)
if err != nil {
	panic(err)
}
testClient := NewTestClient(conn)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

syncBuffer := streambuffer.New(5, func() (grpc.ClientStream, error) {
	// maybe extend the context?
	return testClient.Push(ctx)
})

// From now on the syncBuffer starts buffering
syncBuffer.SendMsg(&Foo{})

// The Run func actually starts flushing the buffer out to the stream
errCh := make(chan error)
go func() {
	for {
		err := syncBuffer.Run()
		if err == nil || err == context.Canceled || errIsNotRetryable {
			errCh <- err
			close(errCh)
			return
		}
		time.Sleep(backoffTime)
	}
}()

// Just keep sending those messages
syncBuffer.SendMsg(&Foo{})
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream implements a buffered overlay on top of a streaming RPC.

If the buffer is full, the oldest items in the buffer will be dropped.

- Create a new Stream with the New() func. - You must call Run() in a separate goroutine to actually start handling the stream. Run calls the setup func you provided in New(). - The goroutine that calls Run() is responsible for handling backoff. - You can start calling SendMsg() immediately after New(), the stream will start buffering until the stream is started by Run(). - If you want to receive on the stream, Recv() must be called after New(), but before Run().

func New

func New(bufferSize int, setup func() (grpc.ClientStream, error)) *Stream

New returns a new Stream with the given buffer size and setup function.

func (*Stream) CloseRecv

func (s *Stream) CloseRecv()

CloseRecv closes the receive channel

func (*Stream) Recv

func (s *Stream) Recv(recv func() interface{}) <-chan interface{}

Recv returns a buffered channel (of the size given to New) that receives messages from the stream. The given recv func should return a new proto of the type that you want to receive If you want to receive, Recv() must be called BEFORE Run()

func (*Stream) Run

func (s *Stream) Run() (err error)

Run the stream.

This calls the underlying grpc.ClientStreams methods to send and receive messages over the stream. Run returns the error returned by any of those functions, or context.Canceled if the context is canceled.

func (*Stream) SendMsg

func (s *Stream) SendMsg(msg interface{})

SendMsg sends a message (possibly dropping a message on full buffers)

func (*Stream) SetLogger

func (s *Stream) SetLogger(log ttnlog.Interface)

SetLogger sets the logger for this streambuffer

func (*Stream) Stats

func (s *Stream) Stats() (sent, dropped uint64)

Stats of the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL