grpcshim

package
v0.19.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStreamClosed = errors.New("stream closed for sending")

ErrStreamClosed is the error types embedding BidiStream should return when their Send method is called and IsClosed returns true.

Functions

This section is empty.

Types

type BidiStream

type BidiStream struct {
	// ErrCh receives errors mid-stream, and should be selected on with the
	// same priority as stream.ch and stream/send contexts' cancellations in
	// Recv methods.
	ErrCh chan error
	// contains filtered or unexported fields
}

BidiStream is a shim struct implementing both the grpc.ClientStream and grpc.ServerStream interfaces. It can be embedded into other types that need all of those methods to satisfy the compiler, but are only interested in the parameterized Send/Recv methods typically called by gRPC streaming servers and clients. For example, in the localvtctldclient:

type backupStreamAdapter struct {
	*grpcshim.BidiStream
	ch chan *vtctldatapb.BackupResponse
}

func (stream *backupStreamAdapter) Recv() (*vtctldatapb.BackupResponse, error) {
	select {
	case <-stream.Context().Done():
		return nil, stream.Context().Err()
	case <-stream.Closed():
		// Stream has been closed for future sends. If there are messages that
		// have already been sent, receive them until there are no more. After
		// all sent messages have been received, Recv will return the CloseErr.
		select {
		case msg := <-stream.ch:
			return msg, nil
		default:
			return nil, stream.CloseErr()
		}
	case err := <-stream.ErrCh:
		return nil, err
	case msg := <-stream.ch:
		return msg, nil
	}
}

func (stream *backupStreamAdapter) Send(msg *vtctldatapb.BackupResponse) error {
	select {
	case <-stream.Context().Done():
		return stream.Context().Err()
	case <-stream.Closed():
		return grpcshim.ErrStreamClosed
	case stream.ch <- msg:
		return nil
	}
}

// Backup is part of the vtctlservicepb.VtctldClient interface.
func (client *localVtctldClient) Backup(ctx context.Context, in *vtctldatapb.BackupRequest, opts ...grpc.CallOption) (vtctlservicepb.Vtctld_BackupClient, error) {
	stream := &backupStreamAdapter{
		BidiStream: grpcshim.NewBidiStream(ctx),
		ch:         make(chan *vtctldatapb.BackupResponse, 1),
	}
	go func() {
		err := client.s.Backup(in, stream)
		stream.CloseWithError(err)
	}()
	return stream, nil
}

func NewBidiStream

func NewBidiStream(ctx context.Context) *BidiStream

NewBidiStream returns a BidiStream ready for embedded use. The provided ctx will be used for the stream context, and types embedding BidiStream should check context cancellation/expiration in their respective Recv and Send methods.

See the documentation on BidiStream for example usage.

func (*BidiStream) CloseErr

func (bs *BidiStream) CloseErr() error

CloseErr returns the error set by CloseWithError, or nil if the stream is not closed.

func (*BidiStream) CloseSend

func (bs *BidiStream) CloseSend() error

func (*BidiStream) CloseWithError

func (bs *BidiStream) CloseWithError(err error)

CloseWithError closes the stream for future sends, and sets the error returned by bs.CloseErr(). If the passed err is nil, io.EOF is set instead. If the stream is already closed, this is a no-op.

func (*BidiStream) Closed

func (bs *BidiStream) Closed() <-chan struct{}

Closed returns a channel which will be itself be closed when the stream has been closed for sending.

func (*BidiStream) Context

func (bs *BidiStream) Context() context.Context

func (*BidiStream) Header

func (bs *BidiStream) Header() (metadata.MD, error)

func (*BidiStream) IsClosed

func (bs *BidiStream) IsClosed() bool

IsClosed returns true if the stream has been closed for sending.

It is a convenience function for attempting to select on the channel returned by bs.Closed().

func (*BidiStream) RecvMsg

func (bs *BidiStream) RecvMsg(m any) error

func (*BidiStream) SendHeader

func (bs *BidiStream) SendHeader(md metadata.MD) error

func (*BidiStream) SendMsg

func (bs *BidiStream) SendMsg(m any) error

func (*BidiStream) SetHeader

func (bs *BidiStream) SetHeader(md metadata.MD) error

func (*BidiStream) SetTrailer

func (bs *BidiStream) SetTrailer(md metadata.MD)

func (*BidiStream) Trailer

func (bs *BidiStream) Trailer() metadata.MD

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL