Documentation ¶
Overview ¶
Package streambuffer implements a buffered streaming RPC that drops the oldest messages on buffer overflow.
Example ¶
// Set up gRPC as usual conn, err := grpc.Dial(addr, opts...) if err != nil { panic(err) } testClient := NewTestClient(conn) ctx, cancel := context.WithCancel(context.Background()) defer cancel() syncBuffer := streambuffer.New(5, func() (grpc.ClientStream, error) { // maybe extend the context? return testClient.Push(ctx) }) // From now on the syncBuffer starts buffering syncBuffer.SendMsg(&Foo{}) // The Run func actually starts flushing the buffer out to the stream errCh := make(chan error) go func() { for { err := syncBuffer.Run() if err == nil || err == context.Canceled || errIsNotRetryable { errCh <- err close(errCh) return } time.Sleep(backoffTime) } }() // Just keep sending those messages syncBuffer.SendMsg(&Foo{})
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream implements a buffered overlay on top of a streaming RPC.
If the buffer is full, the oldest items in the buffer will be dropped.
- Create a new Stream with the New() func. - You must call Run() in a separate goroutine to actually start handling the stream. Run calls the setup func you provided in New(). - The goroutine that calls Run() is responsible for handling backoff. - You can start calling SendMsg() immediately after New(), the stream will start buffering until the stream is started by Run(). - If you want to receive on the stream, Recv() must be called after New(), but before Run().
func New ¶
func New(bufferSize int, setup func() (grpc.ClientStream, error)) *Stream
New returns a new Stream with the given buffer size and setup function.
func (*Stream) Recv ¶
func (s *Stream) Recv(recv func() interface{}) <-chan interface{}
Recv returns a buffered channel (of the size given to New) that receives messages from the stream. The given recv func should return a new proto of the type that you want to receive If you want to receive, Recv() must be called BEFORE Run()
func (*Stream) Run ¶
Run the stream.
This calls the underlying grpc.ClientStreams methods to send and receive messages over the stream. Run returns the error returned by any of those functions, or context.Canceled if the context is canceled.
func (*Stream) SendMsg ¶
func (s *Stream) SendMsg(msg interface{})
SendMsg sends a message (possibly dropping a message on full buffers)