Documentation ¶
Overview ¶
Package connect is a slim RPC framework built on Protocol Buffers and net/http. In addition to supporting its own protocol, Connect handlers and clients are wire-compatible with gRPC and gRPC-Web, including streaming.
This documentation is intended to explain each type and function in isolation. Walkthroughs, FAQs, and other narrative docs are available on the Connect website, and there's a working demonstration service on Github.
Example (Client) ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) // To keep this example runnable, we'll use an HTTP server and client // that communicate over in-memory pipes. The client is still a plain // *http.Client! var httpClient *http.Client = examplePingServer.Client() // By default, clients use the Connect protocol. Add connect.WithGRPC() or // connect.WithGRPCWeb() to switch protocols. client := pingv1connect.NewPingServiceClient( httpClient, examplePingServer.URL(), ) response, err := client.Ping( context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42}), ) if err != nil { logger.Println("error:", err) return } logger.Println("response content-type:", response.Header().Get("Content-Type")) logger.Println("response message:", response.Msg)
Output: response content-type: application/proto response message: number:42
Example (Handler) ¶
package main import ( "context" "errors" "io" "net/http" connect "connectrpc.com/connect" pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1" "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect" ) // ExamplePingServer implements some trivial business logic. The Protobuf // definition for this API is in proto/connect/ping/v1/ping.proto. type ExamplePingServer struct { pingv1connect.UnimplementedPingServiceHandler } // Ping implements pingv1connect.PingServiceHandler. func (*ExamplePingServer) Ping( _ context.Context, request *connect.Request[pingv1.PingRequest], ) (*connect.Response[pingv1.PingResponse], error) { return connect.NewResponse( &pingv1.PingResponse{ Number: request.Msg.GetNumber(), Text: request.Msg.GetText(), }, ), nil } // Sum implements pingv1connect.PingServiceHandler. func (p *ExamplePingServer) Sum(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) { var sum int64 for stream.Receive() { sum += stream.Msg().GetNumber() } if stream.Err() != nil { return nil, stream.Err() } return connect.NewResponse(&pingv1.SumResponse{Sum: sum}), nil } // CountUp implements pingv1connect.PingServiceHandler. func (p *ExamplePingServer) CountUp(ctx context.Context, request *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error { for number := int64(1); number <= request.Msg.GetNumber(); number++ { if err := stream.Send(&pingv1.CountUpResponse{Number: number}); err != nil { return err } } return nil } // CumSum implements pingv1connect.PingServiceHandler. func (p *ExamplePingServer) CumSum(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error { var sum int64 for { msg, err := stream.Receive() if errors.Is(err, io.EOF) { return nil } else if err != nil { return err } sum += msg.GetNumber() if err := stream.Send(&pingv1.CumSumResponse{Sum: sum}); err != nil { return err } } } func main() { // protoc-gen-connect-go generates constructors that return plain net/http // Handlers, so they're compatible with most Go HTTP routers and middleware // (for example, net/http's StripPrefix). Each handler automatically supports // the Connect, gRPC, and gRPC-Web protocols. mux := http.NewServeMux() mux.Handle( pingv1connect.NewPingServiceHandler( &ExamplePingServer{}, // our business logic ), ) // You can serve gRPC's health and server reflection APIs using // connectrpc.com/grpchealth and connectrpc.com/grpcreflect. _ = http.ListenAndServeTLS( "localhost:8080", "internal/testdata/server.crt", "internal/testdata/server.key", mux, ) // To serve HTTP/2 requests without TLS (as many gRPC clients expect), import // golang.org/x/net/http2/h2c and golang.org/x/net/http2 and change to: // _ = http.ListenAndServe( // "localhost:8080", // h2c.NewHandler(mux, &http2.Server{}), // ) }
Output:
Index ¶
- Constants
- func DecodeBinaryHeader(data string) ([]byte, error)
- func EncodeBinaryHeader(data []byte) string
- func IsNotModifiedError(err error) bool
- func IsWireError(err error) bool
- type AnyRequest
- type AnyResponse
- type BidiStream
- func (b *BidiStream[Req, Res]) Conn() StreamingHandlerConn
- func (b *BidiStream[_, _]) Peer() Peer
- func (b *BidiStream[Req, Res]) Receive() (*Req, error)
- func (b *BidiStream[Req, Res]) RequestHeader() http.Header
- func (b *BidiStream[Req, Res]) ResponseHeader() http.Header
- func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header
- func (b *BidiStream[Req, Res]) Send(msg *Res) error
- func (b *BidiStream[_, _]) Spec() Spec
- type BidiStreamForClient
- func (b *BidiStreamForClient[Req, Res]) CloseRequest() error
- func (b *BidiStreamForClient[Req, Res]) CloseResponse() error
- func (b *BidiStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)
- func (b *BidiStreamForClient[_, _]) Peer() Peer
- func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)
- func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header
- func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header
- func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header
- func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error
- func (b *BidiStreamForClient[_, _]) Spec() Spec
- type Client
- func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]
- func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]
- func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Request[Req]) (*ServerStreamForClient[Res], error)
- func (c *Client[Req, Res]) CallUnary(ctx context.Context, request *Request[Req]) (*Response[Res], error)
- type ClientOption
- func WithAcceptCompression(name string, newDecompressor func() Decompressor, ...) ClientOption
- func WithClientOptions(options ...ClientOption) ClientOption
- func WithGRPC() ClientOption
- func WithGRPCWeb() ClientOption
- func WithHTTPGet() ClientOption
- func WithHTTPGetMaxURLSize(bytes int, fallback bool) ClientOption
- func WithProtoJSON() ClientOption
- func WithResponseInitializer(initializer func(spec Spec, message any) error) ClientOption
- func WithSendCompression(name string) ClientOption
- func WithSendGzip() ClientOption
- type ClientStream
- func (c *ClientStream[Req]) Conn() StreamingHandlerConn
- func (c *ClientStream[Req]) Err() error
- func (c *ClientStream[Req]) Msg() *Req
- func (c *ClientStream[_]) Peer() Peer
- func (c *ClientStream[Req]) Receive() bool
- func (c *ClientStream[Req]) RequestHeader() http.Header
- func (c *ClientStream[_]) Spec() Spec
- type ClientStreamForClient
- func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], error)
- func (c *ClientStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)
- func (c *ClientStreamForClient[_, _]) Peer() Peer
- func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header
- func (c *ClientStreamForClient[Req, Res]) Send(request *Req) error
- func (c *ClientStreamForClient[_, _]) Spec() Spec
- type Code
- type Codec
- type Compressor
- type Decompressor
- type Error
- type ErrorDetail
- type ErrorWriter
- type HTTPClient
- type Handler
- type HandlerOption
- func WithCompression(name string, newDecompressor func() Decompressor, ...) HandlerOption
- func WithConditionalHandlerOptions(conditional func(spec Spec) []HandlerOption) HandlerOption
- func WithHandlerOptions(options ...HandlerOption) HandlerOption
- func WithRecover(handle func(context.Context, Spec, http.Header, any) error) HandlerOption
- func WithRequestInitializer(initializer func(spec Spec, message any) error) HandlerOption
- func WithRequireConnectProtocolHeader() HandlerOption
- type IdempotencyLevel
- type Interceptor
- type Option
- func WithCodec(codec Codec) Option
- func WithCompressMinBytes(minBytes int) Option
- func WithIdempotency(idempotencyLevel IdempotencyLevel) Option
- func WithInterceptors(interceptors ...Interceptor) Option
- func WithOptions(options ...Option) Option
- func WithReadMaxBytes(maxBytes int) Option
- func WithSchema(schema any) Option
- func WithSendMaxBytes(maxBytes int) Option
- type Peer
- type Request
- type Response
- type ServerStream
- type ServerStreamForClient
- func (s *ServerStreamForClient[Res]) Close() error
- func (s *ServerStreamForClient[Res]) Conn() (StreamingClientConn, error)
- func (s *ServerStreamForClient[Res]) Err() error
- func (s *ServerStreamForClient[Res]) Msg() *Res
- func (s *ServerStreamForClient[Res]) Receive() bool
- func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header
- func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header
- type Spec
- type StreamType
- type StreamingClientConn
- type StreamingClientFunc
- type StreamingHandlerConn
- type StreamingHandlerFunc
- type UnaryFunc
- type UnaryInterceptorFunc
Examples ¶
Constants ¶
const ( IsAtLeastVersion0_0_1 = true IsAtLeastVersion0_1_0 = true IsAtLeastVersion1_7_0 = true IsAtLeastVersion1_13_0 = true )
These constants are used in compile-time handshakes with connect's generated code.
const ( ProtocolConnect = "connect" ProtocolGRPC = "grpc" ProtocolGRPCWeb = "grpcweb" )
The names of the Connect, gRPC, and gRPC-Web protocols (as exposed by [Peer.Protocol]). Additional protocols may be added in the future.
const Version = "1.17.0"
Version is the semantic version of the connect module.
Variables ¶
This section is empty.
Functions ¶
func DecodeBinaryHeader ¶
DecodeBinaryHeader base64-decodes the data. It can decode padded or unpadded values. Following usual HTTP semantics, multiple base64-encoded values may be joined with a comma. When receiving such comma-separated values, split them with strings.Split before calling DecodeBinaryHeader.
Binary headers sent using the Connect, gRPC, and gRPC-Web protocols have keys ending in "-Bin".
func EncodeBinaryHeader ¶
EncodeBinaryHeader base64-encodes the data. It always emits unpadded values.
In the Connect, gRPC, and gRPC-Web protocols, binary headers must have keys ending in "-Bin".
func IsNotModifiedError ¶ added in v1.7.0
IsNotModifiedError checks whether the supplied error indicates that the requested resource hasn't changed. It only returns true if the server used NewNotModifiedError in response to a Connect-protocol RPC made with an HTTP GET.
Example ¶
package main import ( "context" "fmt" "net/http" connect "connectrpc.com/connect" pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1" "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect" ) func main() { // Assume that the server from NewNotModifiedError's example is running on // localhost:8080. client := pingv1connect.NewPingServiceClient( http.DefaultClient, "http://localhost:8080", // Enable client-side support for HTTP GETs. connect.WithHTTPGet(), ) req := connect.NewRequest(&pingv1.PingRequest{Number: 42}) first, err := client.Ping(context.Background(), req) if err != nil { fmt.Println(err) return } // If the server set an Etag, we can use it to cache the response. etag := first.Header().Get("Etag") if etag == "" { fmt.Println("no Etag in response headers") return } fmt.Println("cached response with Etag", etag) // Now we'd like to make the same request again, but avoid re-fetching the // response if possible. req.Header().Set("If-None-Match", etag) _, err = client.Ping(context.Background(), req) if connect.IsNotModifiedError(err) { fmt.Println("can reuse cached response") } }
Output:
func IsWireError ¶ added in v1.1.0
IsWireError checks whether the error was returned by the server, as opposed to being synthesized by the client.
Clients may find this useful when deciding how to propagate errors. For example, an RPC-to-HTTP proxy might expose a server-sent CodeUnknown as an HTTP 500 but a client-synthesized CodeUnknown as a 503.
Handlers will strip Error.Meta headers propagated from wire errors to avoid leaking response headers. To propagate headers recreate the error as a non-wire error.
Types ¶
type AnyRequest ¶
type AnyRequest interface { Any() any Spec() Spec Peer() Peer Header() http.Header HTTPMethod() string // contains filtered or unexported methods }
AnyRequest is the common method set of every Request, regardless of type parameter. It's used in unary interceptors.
Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.
To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.
type AnyResponse ¶
type AnyResponse interface { Any() any Header() http.Header Trailer() http.Header // contains filtered or unexported methods }
AnyResponse is the common method set of every Response, regardless of type parameter. It's used in unary interceptors.
Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.
To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyResponse.
type BidiStream ¶
type BidiStream[Req, Res any] struct { // contains filtered or unexported fields }
BidiStream is the handler's view of a bidirectional streaming RPC.
It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.
func (*BidiStream[Req, Res]) Conn ¶ added in v0.4.0
func (b *BidiStream[Req, Res]) Conn() StreamingHandlerConn
Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*BidiStream[_, _]) Peer ¶ added in v0.5.0
func (b *BidiStream[_, _]) Peer() Peer
Peer describes the client for this RPC.
func (*BidiStream[Req, Res]) Receive ¶
func (b *BidiStream[Req, Res]) Receive() (*Req, error)
Receive a message. When the client is done sending messages, Receive will return an error that wraps io.EOF.
func (*BidiStream[Req, Res]) RequestHeader ¶
func (b *BidiStream[Req, Res]) RequestHeader() http.Header
RequestHeader returns the headers received from the client.
func (*BidiStream[Req, Res]) ResponseHeader ¶
func (b *BidiStream[Req, Res]) ResponseHeader() http.Header
ResponseHeader returns the response headers. Headers are sent with the first call to Send.
Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*BidiStream[Req, Res]) ResponseTrailer ¶
func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header
ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.
Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*BidiStream[Req, Res]) Send ¶
func (b *BidiStream[Req, Res]) Send(msg *Res) error
Send a message to the client. The first call to Send also sends the response headers.
func (*BidiStream[_, _]) Spec ¶ added in v0.5.0
func (b *BidiStream[_, _]) Spec() Spec
Spec returns the specification for the RPC.
type BidiStreamForClient ¶
type BidiStreamForClient[Req, Res any] struct { // contains filtered or unexported fields }
BidiStreamForClient is the client's view of a bidirectional streaming RPC.
It's returned from Client.CallBidiStream, but doesn't currently have an exported constructor function.
func (*BidiStreamForClient[Req, Res]) CloseRequest ¶ added in v0.2.0
func (b *BidiStreamForClient[Req, Res]) CloseRequest() error
CloseRequest closes the send side of the stream.
func (*BidiStreamForClient[Req, Res]) CloseResponse ¶ added in v0.2.0
func (b *BidiStreamForClient[Req, Res]) CloseResponse() error
CloseResponse closes the receive side of the stream.
func (*BidiStreamForClient[Req, Res]) Conn ¶ added in v0.4.0
func (b *BidiStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)
Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*BidiStreamForClient[_, _]) Peer ¶ added in v0.5.0
func (b *BidiStreamForClient[_, _]) Peer() Peer
Peer describes the server for the RPC.
func (*BidiStreamForClient[Req, Res]) Receive ¶
func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)
Receive a message. When the server is done sending messages and no other errors have occurred, Receive will return an error that wraps io.EOF.
func (*BidiStreamForClient[Req, Res]) RequestHeader ¶
func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header
RequestHeader returns the request headers. Headers are sent with the first call to Send.
Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*BidiStreamForClient[Req, Res]) ResponseHeader ¶
func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header
ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.
func (*BidiStreamForClient[Req, Res]) ResponseTrailer ¶
func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header
ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.
func (*BidiStreamForClient[Req, Res]) Send ¶
func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error
Send a message to the server. The first call to Send also sends the request headers. To send just the request headers, without a body, call Send with a nil pointer.
If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for EOF using the standard library's errors.Is and call Receive to retrieve the error.
func (*BidiStreamForClient[_, _]) Spec ¶ added in v0.5.0
func (b *BidiStreamForClient[_, _]) Spec() Spec
Spec returns the specification for the RPC.
type Client ¶
type Client[Req, Res any] struct { // contains filtered or unexported fields }
Client is a reusable, concurrency-safe client for a single procedure. Depending on the procedure's type, use the CallUnary, CallClientStream, CallServerStream, or CallBidiStream method.
By default, clients use the Connect protocol with the binary Protobuf Codec, ask for gzipped responses, and send uncompressed requests. To use the gRPC or gRPC-Web protocols, use the WithGRPC or WithGRPCWeb options.
func NewClient ¶
func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...ClientOption) *Client[Req, Res]
NewClient constructs a new Client.
func (*Client[Req, Res]) CallBidiStream ¶
func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]
CallBidiStream calls a bidirectional streaming procedure.
func (*Client[Req, Res]) CallClientStream ¶
func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]
CallClientStream calls a client streaming procedure.
func (*Client[Req, Res]) CallServerStream ¶
func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Request[Req]) (*ServerStreamForClient[Res], error)
CallServerStream calls a server streaming procedure.
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
A ClientOption configures a Client.
In addition to any options grouped in the documentation below, remember that any Option is also a valid ClientOption.
func WithAcceptCompression ¶
func WithAcceptCompression( name string, newDecompressor func() Decompressor, newCompressor func() Compressor, ) ClientOption
WithAcceptCompression makes a compression algorithm available to a client. Clients ask servers to compress responses using any of the registered algorithms. The first registered algorithm is treated as the least preferred, and the last registered algorithm is the most preferred.
It's safe to use this option liberally: servers will ignore any compression algorithms they don't support. To compress requests, pair this option with WithSendCompression. To remove support for a previously-registered compression algorithm, use WithAcceptCompression with nil decompressor and compressor constructors.
Clients accept gzipped responses by default, using a compressor backed by the standard library's gzip package with the default compression level. Use WithSendGzip to compress requests with gzip.
Calling WithAcceptCompression with an empty name is a no-op.
func WithClientOptions ¶
func WithClientOptions(options ...ClientOption) ClientOption
WithClientOptions composes multiple ClientOptions into one.
func WithGRPC ¶
func WithGRPC() ClientOption
WithGRPC configures clients to use the HTTP/2 gRPC protocol.
func WithGRPCWeb ¶
func WithGRPCWeb() ClientOption
WithGRPCWeb configures clients to use the gRPC-Web protocol.
func WithHTTPGet ¶ added in v1.7.0
func WithHTTPGet() ClientOption
WithHTTPGet allows Connect-protocol clients to use HTTP GET requests for side-effect free unary RPC calls. Typically, the service schema indicates which procedures are idempotent (see WithIdempotency for an example protobuf schema). The gRPC and gRPC-Web protocols are POST-only, so this option has no effect when combined with WithGRPC or WithGRPCWeb.
Using HTTP GET requests makes it easier to take advantage of CDNs, caching reverse proxies, and browsers' built-in caching. Note, however, that servers don't automatically set any cache headers; you can set cache headers using interceptors or by adding headers in individual procedure implementations.
By default, all requests are made as HTTP POSTs.
func WithHTTPGetMaxURLSize ¶ added in v1.9.1
func WithHTTPGetMaxURLSize(bytes int, fallback bool) ClientOption
WithHTTPGetMaxURLSize sets the maximum allowable URL length for GET requests made using the Connect protocol. It has no effect on gRPC or gRPC-Web clients, since those protocols are POST-only.
Limiting the URL size is useful as most user agents, proxies, and servers have limits on the allowable length of a URL. For example, Apache and Nginx limit the size of a request line to around 8 KiB, meaning that maximum length of a URL is a bit smaller than this. If you run into URL size limitations imposed by your network infrastructure and don't know the maximum allowable size, or if you'd prefer to be cautious from the start, a 4096 byte (4 KiB) limit works with most common proxies and CDNs.
If fallback is set to true and the URL would be longer than the configured maximum value, the request will be sent as an HTTP POST instead. If fallback is set to false, the request will fail with CodeResourceExhausted.
By default, Connect-protocol clients with GET requests enabled may send a URL of any size.
func WithProtoJSON ¶
func WithProtoJSON() ClientOption
WithProtoJSON configures a client to send JSON-encoded data instead of binary Protobuf. It uses the standard Protobuf JSON mapping as implemented by google.golang.org/protobuf/encoding/protojson: fields are named using lowerCamelCase, zero values are omitted, missing required fields are errors, enums are emitted as strings, etc.
func WithResponseInitializer ¶ added in v1.13.0
func WithResponseInitializer(initializer func(spec Spec, message any) error) ClientOption
WithResponseInitializer provides a function that initializes a new message. It may be used to dynamically construct response messages. It is called on client receives to construct the message to be unmarshaled into. The message will be a non nil pointer to the type created by the client. Use the Schema field of the Spec to determine the type of the message.
func WithSendCompression ¶
func WithSendCompression(name string) ClientOption
WithSendCompression configures the client to use the specified algorithm to compress request messages. If the algorithm has not been registered using WithAcceptCompression, the client will return errors at runtime.
Because some servers don't support compression, clients default to sending uncompressed requests.
func WithSendGzip ¶
func WithSendGzip() ClientOption
WithSendGzip configures the client to gzip requests. Since clients have access to a gzip compressor by default, WithSendGzip doesn't require WithSendCompression.
Some servers don't support gzip, so clients default to sending uncompressed requests.
type ClientStream ¶
type ClientStream[Req any] struct { // contains filtered or unexported fields }
ClientStream is the handler's view of a client streaming RPC.
It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.
func (*ClientStream[Req]) Conn ¶ added in v0.4.0
func (c *ClientStream[Req]) Conn() StreamingHandlerConn
Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*ClientStream[Req]) Err ¶
func (c *ClientStream[Req]) Err() error
Err returns the first non-EOF error that was encountered by Receive.
func (*ClientStream[Req]) Msg ¶
func (c *ClientStream[Req]) Msg() *Req
Msg returns the most recent message unmarshaled by a call to Receive.
func (*ClientStream[_]) Peer ¶ added in v0.5.0
func (c *ClientStream[_]) Peer() Peer
Peer describes the client for this RPC.
func (*ClientStream[Req]) Receive ¶
func (c *ClientStream[Req]) Receive() bool
Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.
func (*ClientStream[Req]) RequestHeader ¶
func (c *ClientStream[Req]) RequestHeader() http.Header
RequestHeader returns the headers received from the client.
func (*ClientStream[_]) Spec ¶ added in v0.5.0
func (c *ClientStream[_]) Spec() Spec
Spec returns the specification for the RPC.
type ClientStreamForClient ¶
type ClientStreamForClient[Req, Res any] struct { // contains filtered or unexported fields }
ClientStreamForClient is the client's view of a client streaming RPC.
It's returned from Client.CallClientStream, but doesn't currently have an exported constructor function.
func (*ClientStreamForClient[Req, Res]) CloseAndReceive ¶
func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], error)
CloseAndReceive closes the send side of the stream and waits for the response.
func (*ClientStreamForClient[Req, Res]) Conn ¶ added in v0.4.0
func (c *ClientStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)
Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*ClientStreamForClient[_, _]) Peer ¶ added in v0.5.0
func (c *ClientStreamForClient[_, _]) Peer() Peer
Peer describes the server for the RPC.
func (*ClientStreamForClient[Req, Res]) RequestHeader ¶
func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header
RequestHeader returns the request headers. Headers are sent to the server with the first call to Send.
Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*ClientStreamForClient[Req, Res]) Send ¶
func (c *ClientStreamForClient[Req, Res]) Send(request *Req) error
Send a message to the server. The first call to Send also sends the request headers.
If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using CloseAndReceive.
func (*ClientStreamForClient[_, _]) Spec ¶ added in v0.5.0
func (c *ClientStreamForClient[_, _]) Spec() Spec
Spec returns the specification for the RPC.
type Code ¶
type Code uint32
A Code is one of the Connect protocol's error codes. There are no user-defined codes, so only the codes enumerated below are valid. In both name and semantics, these codes match the gRPC status codes.
The descriptions below are optimized for brevity rather than completeness. See the Connect protocol specification for detailed descriptions of each code and example usage.
const ( // CodeCanceled indicates that the operation was canceled, typically by the // caller. CodeCanceled Code = 1 // CodeUnknown indicates that the operation failed for an unknown reason. CodeUnknown Code = 2 // CodeInvalidArgument indicates that client supplied an invalid argument. CodeInvalidArgument Code = 3 // CodeDeadlineExceeded indicates that deadline expired before the operation // could complete. CodeDeadlineExceeded Code = 4 // CodeNotFound indicates that some requested entity (for example, a file or // directory) was not found. CodeNotFound Code = 5 // CodeAlreadyExists indicates that client attempted to create an entity (for // example, a file or directory) that already exists. CodeAlreadyExists Code = 6 // CodePermissionDenied indicates that the caller doesn't have permission to // execute the specified operation. CodePermissionDenied Code = 7 // CodeResourceExhausted indicates that some resource has been exhausted. For // example, a per-user quota may be exhausted or the entire file system may // be full. CodeResourceExhausted Code = 8 // CodeFailedPrecondition indicates that the system is not in a state // required for the operation's execution. CodeFailedPrecondition Code = 9 // CodeAborted indicates that operation was aborted by the system, usually // because of a concurrency issue such as a sequencer check failure or // transaction abort. CodeAborted Code = 10 // CodeOutOfRange indicates that the operation was attempted past the valid // range (for example, seeking past end-of-file). CodeOutOfRange Code = 11 // CodeUnimplemented indicates that the operation isn't implemented, // supported, or enabled in this service. CodeUnimplemented Code = 12 // CodeInternal indicates that some invariants expected by the underlying // system have been broken. This code is reserved for serious errors. CodeInternal Code = 13 // is usually temporary, so clients can back off and retry idempotent // operations. CodeUnavailable Code = 14 // CodeDataLoss indicates that the operation has resulted in unrecoverable // data loss or corruption. CodeDataLoss Code = 15 // CodeUnauthenticated indicates that the request does not have valid // authentication credentials for the operation. CodeUnauthenticated Code = 16 )
func CodeOf ¶
CodeOf returns the error's status code if it is or wraps an *Error and CodeUnknown otherwise.
func (Code) MarshalText ¶
MarshalText implements encoding.TextMarshaler.
func (*Code) UnmarshalText ¶
UnmarshalText implements encoding.TextUnmarshaler.
type Codec ¶
type Codec interface { // Name returns the name of the Codec. // // This may be used as part of the Content-Type within HTTP. For example, // with gRPC this is the content subtype, so "application/grpc+proto" will // map to the Codec with name "proto". // // Names must not be empty. Name() string // Marshal marshals the given message. // // Marshal may expect a specific type of message, and will error if this type // is not given. Marshal(any) ([]byte, error) // Unmarshal unmarshals the given message. // // Unmarshal may expect a specific type of message, and will error if this // type is not given. Unmarshal([]byte, any) error }
Codec marshals structs (typically generated from a schema) to and from bytes.
type Compressor ¶
type Compressor interface { io.Writer // Close flushes any buffered data to the underlying sink, then closes the // Compressor. It must not close the underlying sink. Close() error // Reset discards the Compressor's internal state, if any, and prepares it to // write compressed data to a new sink. Reset(io.Writer) }
A Compressor is a reusable wrapper that compresses data written to an underlying sink. The standard library's *gzip.Writer implements Compressor.
type Decompressor ¶
type Decompressor interface { io.Reader // Close closes the Decompressor, but not the underlying data source. It may // return an error if the Decompressor wasn't read to EOF. Close() error // Reset discards the Decompressor's internal state, if any, and prepares it // to read from a new source of compressed data. Reset(io.Reader) error }
A Decompressor is a reusable wrapper that decompresses an underlying data source. The standard library's *gzip.Reader implements Decompressor.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
An Error captures four key pieces of information: a Code, an underlying Go error, a map of metadata, and an optional collection of arbitrary Protobuf messages called "details" (more on those below). Servers send the code, the underlying error's Error() output, the metadata, and details over the wire to clients. Remember that the underlying error's message will be sent to clients - take care not to leak sensitive information from public APIs!
Service implementations and interceptors should return errors that can be cast to an *Error (using the standard library's errors.As). If the returned error can't be cast to an *Error, connect will use CodeUnknown and the returned error's message.
Error details are an optional mechanism for servers, interceptors, and proxies to attach arbitrary Protobuf messages to the error code and message. They're a clearer and more performant alternative to HTTP header microformats. See the documentation on errors for more details.
func NewNotModifiedError ¶ added in v1.7.0
NewNotModifiedError indicates that the requested resource hasn't changed. It should be used only when handlers wish to respond to conditional HTTP GET requests with a 304 Not Modified. In all other circumstances, including all RPCs using the gRPC or gRPC-Web protocols, it's equivalent to sending an error with CodeUnknown. The supplied headers should include Etag, Cache-Control, or any other headers required by RFC 9110 § 15.4.5.
Clients should check for this error using IsNotModifiedError.
Example ¶
package main import ( "context" "net/http" "strconv" connect "connectrpc.com/connect" pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1" "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect" ) // ExampleCachingServer is an example of how servers can take advantage the // Connect protocol's support for HTTP-level caching. The Protobuf // definition for this API is in proto/connect/ping/v1/ping.proto. type ExampleCachingPingServer struct { pingv1connect.UnimplementedPingServiceHandler } // Ping is idempotent and free of side effects (and the Protobuf schema // indicates this), so clients using the Connect protocol may call it with HTTP // GET requests. This implementation uses Etags to manage client-side caching. func (*ExampleCachingPingServer) Ping( _ context.Context, req *connect.Request[pingv1.PingRequest], ) (*connect.Response[pingv1.PingResponse], error) { resp := connect.NewResponse(&pingv1.PingResponse{ Number: req.Msg.GetNumber(), }) // Our hashing logic is simple: we use the number in the PingResponse. hash := strconv.FormatInt(resp.Msg.GetNumber(), 10) // If the request was an HTTP GET, we'll need to check if the client already // has the response cached. if req.HTTPMethod() == http.MethodGet && req.Header().Get("If-None-Match") == hash { return nil, connect.NewNotModifiedError(http.Header{ "Etag": []string{hash}, }) } resp.Header().Set("Etag", hash) return resp, nil } func main() { mux := http.NewServeMux() mux.Handle(pingv1connect.NewPingServiceHandler(&ExampleCachingPingServer{})) _ = http.ListenAndServe("localhost:8080", mux) }
Output:
func NewWireError ¶ added in v1.5.0
NewWireError is similar to NewError, but the resulting *Error returns true when tested with IsWireError.
This is useful for clients trying to propagate partial failures from streaming RPCs. Often, these RPCs include error information in their response messages (for example, gRPC server reflection and OpenTelemetry's OTLP). Clients propagating these errors up the stack should use NewWireError to clarify that the error code, message, and details (if any) were explicitly sent by the server rather than inferred from a lower-level networking error or timeout.
func (*Error) AddDetail ¶
func (e *Error) AddDetail(d *ErrorDetail)
AddDetail appends to the error's details.
func (*Error) Details ¶
func (e *Error) Details() []*ErrorDetail
Details returns the error's details.
func (*Error) Message ¶
Message returns the underlying error message. It may be empty if the original error was created with a status code and a nil error.
Example ¶
package main import ( "errors" "fmt" connect "connectrpc.com/connect" ) func main() { err := fmt.Errorf( "another: %w", connect.NewError(connect.CodeUnavailable, errors.New("failed to foo")), ) if connectErr := (&connect.Error{}); errors.As(err, &connectErr) { fmt.Println("underlying error message:", connectErr.Message()) } }
Output: underlying error message: failed to foo
func (*Error) Meta ¶
Meta allows the error to carry additional information as key-value pairs.
Metadata attached to errors returned by unary handlers is always sent as HTTP headers, regardless of the protocol. Metadata attached to errors returned by streaming handlers may be sent as HTTP headers, HTTP trailers, or a block of in-body metadata, depending on the protocol in use and whether or not the handler has already written messages to the stream.
Protocol-specific headers and trailers may be removed to avoid breaking protocol semantics. For example, Content-Length and Content-Type headers won't be propagated. See the documentation for each protocol for more datails.
When clients receive errors, the metadata contains the union of the HTTP headers and the protocol-specific trailers (either HTTP trailers or in-body metadata).
type ErrorDetail ¶
type ErrorDetail struct {
// contains filtered or unexported fields
}
An ErrorDetail is a self-describing Protobuf message attached to an *Error. Error details are sent over the network to clients, which can then work with strongly-typed data rather than trying to parse a complex error message. For example, you might use details to send a localized error message or retry parameters to the client.
The google.golang.org/genproto/googleapis/rpc/errdetails package contains a variety of Protobuf messages commonly used as error details.
func NewErrorDetail ¶ added in v0.3.0
func NewErrorDetail(msg proto.Message) (*ErrorDetail, error)
NewErrorDetail constructs a new error detail. If msg is an *anypb.Any then it is used as is. Otherwise, it is first marshalled into an *anypb.Any value. This returns an error if msg cannot be marshalled.
func (*ErrorDetail) Bytes ¶ added in v0.3.0
func (d *ErrorDetail) Bytes() []byte
Bytes returns a copy of the Protobuf-serialized detail.
func (*ErrorDetail) Type ¶ added in v0.3.0
func (d *ErrorDetail) Type() string
Type is the fully-qualified name of the detail's Protobuf message (for example, acme.foo.v1.FooDetail).
func (*ErrorDetail) Value ¶ added in v0.3.0
func (d *ErrorDetail) Value() (proto.Message, error)
Value uses the Protobuf runtime's package-global registry to unmarshal the Detail into a strongly-typed message. Typically, clients use Go type assertions to cast from the proto.Message interface to concrete types.
type ErrorWriter ¶ added in v0.5.0
type ErrorWriter struct {
// contains filtered or unexported fields
}
An ErrorWriter writes errors to an http.ResponseWriter in the format expected by an RPC client. This is especially useful in server-side net/http middleware, where you may wish to handle requests from RPC and non-RPC clients with the same code.
ErrorWriters are safe to use concurrently.
Example ¶
package main import ( "errors" "io" "log" "net/http" connect "connectrpc.com/connect" ) // NewHelloHandler is an example HTTP handler. In a real application, it might // handle RPCs, requests for HTML, or anything else. func NewHelloHandler() http.Handler { return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { io.WriteString(response, "Hello, world!") }) } // NewAuthenticatedHandler is an example of middleware that works with both RPC // and non-RPC clients. func NewAuthenticatedHandler(handler http.Handler) http.Handler { errorWriter := connect.NewErrorWriter() return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { // Dummy authentication logic. if request.Header.Get("Token") == "super-secret" { handler.ServeHTTP(response, request) return } defer request.Body.Close() defer io.Copy(io.Discard, request.Body) if errorWriter.IsSupported(request) { // Send a protocol-appropriate error to RPC clients, so that they receive // the right code, message, and any metadata or error details. unauthenticated := connect.NewError(connect.CodeUnauthenticated, errors.New("invalid token")) errorWriter.Write(response, request, unauthenticated) } else { // Send an error to non-RPC clients. response.WriteHeader(http.StatusUnauthorized) io.WriteString(response, "invalid token") } }) } func main() { mux := http.NewServeMux() mux.Handle("/", NewHelloHandler()) srv := &http.Server{ Addr: ":8080", Handler: NewAuthenticatedHandler(mux), } if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalln(err) } }
Output:
func NewErrorWriter ¶ added in v0.5.0
func NewErrorWriter(opts ...HandlerOption) *ErrorWriter
NewErrorWriter constructs an ErrorWriter. Handler options may be passed to configure the error writer behaviour to match the handlers. [WithRequiredConnectProtocolHeader] will assert that Connect protocol requests include the version header allowing the error writer to correctly classify the request. Options supplied via WithConditionalHandlerOptions are ignored.
func (*ErrorWriter) IsSupported ¶ added in v0.5.0
func (w *ErrorWriter) IsSupported(request *http.Request) bool
IsSupported checks whether a request is using one of the ErrorWriter's supported RPC protocols.
func (*ErrorWriter) Write ¶ added in v0.5.0
func (w *ErrorWriter) Write(response http.ResponseWriter, request *http.Request, err error) error
Write an error, using the format appropriate for the RPC protocol in use. Callers should first use IsSupported to verify that the request is using one of the ErrorWriter's supported RPC protocols. If the protocol is unknown, Write will send the error as unprefixed, Connect-formatted JSON.
Write does not read or close the request body.
type HTTPClient ¶
HTTPClient is the interface connect expects HTTP clients to implement. The standard library's *http.Client implements HTTPClient.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
A Handler is the server-side implementation of a single RPC defined by a service schema.
By default, Handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf and JSON codecs. They support gzip compression using the standard library's compress/gzip.
func NewBidiStreamHandler ¶
func NewBidiStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *BidiStream[Req, Res]) error, options ...HandlerOption, ) *Handler
NewBidiStreamHandler constructs a Handler for a bidirectional streaming procedure.
func NewClientStreamHandler ¶
func NewClientStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *ClientStream[Req]) (*Response[Res], error), options ...HandlerOption, ) *Handler
NewClientStreamHandler constructs a Handler for a client streaming procedure.
func NewServerStreamHandler ¶
func NewServerStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *Request[Req], *ServerStream[Res]) error, options ...HandlerOption, ) *Handler
NewServerStreamHandler constructs a Handler for a server streaming procedure.
func NewUnaryHandler ¶
func NewUnaryHandler[Req, Res any]( procedure string, unary func(context.Context, *Request[Req]) (*Response[Res], error), options ...HandlerOption, ) *Handler
NewUnaryHandler constructs a Handler for a request-response procedure.
func (*Handler) ServeHTTP ¶
func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request)
ServeHTTP implements http.Handler.
type HandlerOption ¶
type HandlerOption interface {
// contains filtered or unexported methods
}
A HandlerOption configures a Handler.
In addition to any options grouped in the documentation below, remember that any Option is also a HandlerOption.
func WithCompression ¶
func WithCompression( name string, newDecompressor func() Decompressor, newCompressor func() Compressor, ) HandlerOption
WithCompression configures handlers to support a compression algorithm. Clients may send messages compressed with that algorithm and/or request compressed responses. The Compressor and Decompressor produced by the supplied constructors must use the same algorithm. Internally, Connect pools compressors and decompressors.
By default, handlers support gzip using the standard library's compress/gzip package at the default compression level. To remove support for a previously-registered compression algorithm, use WithCompression with nil decompressor and compressor constructors.
Calling WithCompression with an empty name is a no-op.
func WithConditionalHandlerOptions ¶ added in v1.10.1
func WithConditionalHandlerOptions(conditional func(spec Spec) []HandlerOption) HandlerOption
WithConditionalHandlerOptions allows procedures in the same service to have different configurations: for example, one procedure may need a much larger WithReadMaxBytes setting than the others.
WithConditionalHandlerOptions takes a function which may inspect each procedure's Spec before deciding which options to apply. Returning a nil slice is safe.
Example ¶
package main import ( connect "connectrpc.com/connect" "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect" ) func main() { connect.WithConditionalHandlerOptions(func(spec connect.Spec) []connect.HandlerOption { var options []connect.HandlerOption if spec.Procedure == pingv1connect.PingServicePingProcedure { options = append(options, connect.WithReadMaxBytes(1024)) } return options }) }
Output:
func WithHandlerOptions ¶
func WithHandlerOptions(options ...HandlerOption) HandlerOption
WithHandlerOptions composes multiple HandlerOptions into one.
func WithRecover ¶ added in v0.2.0
WithRecover adds an interceptor that recovers from panics. The supplied function receives the context, Spec, request headers, and the recovered value (which may be nil). It must return an error to send back to the client. It may also log the panic, emit metrics, or execute other error-handling logic. Handler functions must be safe to call concurrently.
To preserve compatibility with net/http's semantics, this interceptor doesn't handle panics with http.ErrAbortHandler.
By default, handlers don't recover from panics. Because the standard library's http.Server recovers from panics by default, this option isn't usually necessary to prevent crashes. Instead, it helps servers collect RPC-specific data during panics and send a more detailed error to clients.
func WithRequestInitializer ¶ added in v1.13.0
func WithRequestInitializer(initializer func(spec Spec, message any) error) HandlerOption
WithRequestInitializer provides a function that initializes a new message. It may be used to dynamically construct request messages. It is called on server receives to construct the message to be unmarshaled into. The message will be a non nil pointer to the type created by the handler. Use the Schema field of the Spec to determine the type of the message.
func WithRequireConnectProtocolHeader ¶ added in v1.4.0
func WithRequireConnectProtocolHeader() HandlerOption
WithRequireConnectProtocolHeader configures the Handler to require requests using the Connect RPC protocol to include the Connect-Protocol-Version header. This ensures that HTTP proxies and net/http middleware can easily identify valid Connect requests, even if they use a common Content-Type like application/json. However, it makes ad-hoc requests with tools like cURL more laborious. Streaming requests are not affected by this option.
This option has no effect if the client uses the gRPC or gRPC-Web protocols.
type IdempotencyLevel ¶ added in v1.7.0
type IdempotencyLevel int
An IdempotencyLevel is a value that declares how "idempotent" an RPC is. This value can affect RPC behaviors, such as determining whether it is safe to retry a request, or what kinds of request modalities are allowed for a given procedure.
const ( // IdempotencyUnknown is the default idempotency level. A procedure with // this idempotency level may not be idempotent. This is appropriate for // any kind of procedure. IdempotencyUnknown IdempotencyLevel = 0 // IdempotencyNoSideEffects is the idempotency level that specifies that a // given call has no side-effects. This is equivalent to [RFC 9110 § 9.2.1] // "safe" methods in terms of semantics. This procedure should not mutate // any state. This idempotency level is appropriate for queries, or anything // that would be suitable for an HTTP GET request. In addition, due to the // lack of side-effects, such a procedure would be suitable to retry and // expect that the results will not be altered by preceding attempts. // // [RFC 9110 § 9.2.1]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.1 IdempotencyNoSideEffects IdempotencyLevel = 1 // IdempotencyIdempotent is the idempotency level that specifies that a // given call is "idempotent", such that multiple instances of the same // request to this procedure would have the same side-effects as a single // request. This is equivalent to [RFC 9110 § 9.2.2] "idempotent" methods. // This level is a subset of the previous level. This idempotency level is // appropriate for any procedure that is safe to retry multiple times // and be guaranteed that the response and side-effects will not be altered // as a result of multiple attempts, for example, entity deletion requests. // // [RFC 9110 § 9.2.2]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.2 IdempotencyIdempotent IdempotencyLevel = 2 )
func (IdempotencyLevel) String ¶ added in v1.7.0
func (i IdempotencyLevel) String() string
type Interceptor ¶
type Interceptor interface { WrapUnary(UnaryFunc) UnaryFunc WrapStreamingClient(StreamingClientFunc) StreamingClientFunc WrapStreamingHandler(StreamingHandlerFunc) StreamingHandlerFunc }
An Interceptor adds logic to a generated handler or client, like the decorators or middleware you may have seen in other libraries. Interceptors may replace the context, mutate requests and responses, handle errors, retry, recover from panics, emit logs and metrics, or do nearly anything else.
The returned functions must be safe to call concurrently.
type Option ¶
type Option interface { ClientOption HandlerOption }
Option implements both ClientOption and HandlerOption, so it can be applied both client-side and server-side.
func WithCodec ¶
WithCodec registers a serialization method with a client or handler. Handlers may have multiple codecs registered, and use whichever the client chooses. Clients may only have a single codec.
By default, handlers and clients support binary Protocol Buffer data using google.golang.org/protobuf/proto. Handlers also support JSON by default, using the standard Protobuf JSON mapping. Users with more specialized needs may override the default codecs by registering a new codec under the "proto" or "json" names. When supplying a custom "proto" codec, keep in mind that some unexported, protocol-specific messages are serialized using Protobuf - take care to fall back to the standard Protobuf implementation if necessary.
Registering a codec with an empty name is a no-op.
func WithCompressMinBytes ¶
WithCompressMinBytes sets a minimum size threshold for compression: regardless of compressor configuration, messages smaller than the configured minimum are sent uncompressed.
The default minimum is zero. Setting a minimum compression threshold may improve overall performance, because the CPU cost of compressing very small messages usually isn't worth the small reduction in network I/O.
func WithIdempotency ¶ added in v1.7.0
func WithIdempotency(idempotencyLevel IdempotencyLevel) Option
WithIdempotency declares the idempotency of the procedure. This can determine whether a procedure call can safely be retried, and may affect which request modalities are allowed for a given procedure call.
In most cases, you should not need to manually set this. It is normally set by the code generator for your schema. For protobuf schemas, it can be set like this:
rpc Ping(PingRequest) returns (PingResponse) { option idempotency_level = NO_SIDE_EFFECTS; }
func WithInterceptors ¶
func WithInterceptors(interceptors ...Interceptor) Option
WithInterceptors configures a client or handler's interceptor stack. Repeated WithInterceptors options are applied in order, so
WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)
Unary interceptors compose like an onion. The first interceptor provided is the outermost layer of the onion: it acts first on the context and request, and last on the response and error.
Stream interceptors also behave like an onion: the first interceptor provided is the outermost wrapper for the StreamingClientConn or StreamingHandlerConn. It's the first to see sent messages and the last to see received messages.
Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:
client.Send() client.Receive() | ^ v | A --- --- A B --- --- B : ... ... : Y --- --- Y Z --- --- Z | ^ v | = = = = = = = = = = = = = = = = network = = = = = = = = = = = = = = = = | ^ v | A --- --- A B --- --- B : ... ... : Y --- --- Y Z --- --- Z | ^ v | handler.Receive() handler.Send() | ^ | | '-> handler logic >-'
Note that in clients, Send handles the request message(s) and Receive handles the response message(s). For handlers, it's the reverse. Depending on your interceptor's logic, you may need to wrap one method in clients and the other in handlers.
Example ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) outer := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("outer interceptor: before call") res, err := next(ctx, req) logger.Println("outer interceptor: after call") return res, err }) }, ) inner := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("inner interceptor: before call") res, err := next(ctx, req) logger.Println("inner interceptor: after call") return res, err }) }, ) client := pingv1connect.NewPingServiceClient( examplePingServer.Client(), examplePingServer.URL(), connect.WithInterceptors(outer, inner), ) if _, err := client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{})); err != nil { logger.Println("error:", err) return }
Output: outer interceptor: before call inner interceptor: before call inner interceptor: after call outer interceptor: after call
func WithOptions ¶
WithOptions composes multiple Options into one.
func WithReadMaxBytes ¶ added in v0.2.0
WithReadMaxBytes limits the performance impact of pathologically large messages sent by the other party. For handlers, WithReadMaxBytes limits the size of a message that the client can send. For clients, WithReadMaxBytes limits the size of a message that the server can respond with. Limits apply to each Protobuf message, not to the stream as a whole.
Setting WithReadMaxBytes to zero allows any message size. Both clients and handlers default to allowing any request size.
Handlers may also use http.MaxBytesHandler to limit the total size of the HTTP request stream (rather than the per-message size). Connect handles http.MaxBytesError specially, so clients still receive errors with the appropriate error code and informative messages.
func WithSchema ¶ added in v1.13.0
WithSchema provides a parsed representation of the schema for an RPC to a client or handler. The supplied schema is exposed as [Spec.Schema]. This option is typically added by generated code.
For services using protobuf schemas, the supplied schema should be a [protoreflect.MethodDescriptor].
func WithSendMaxBytes ¶ added in v0.4.0
WithSendMaxBytes prevents sending messages too large for the client/handler to handle without significant performance overhead. For handlers, WithSendMaxBytes limits the size of a message that the handler can respond with. For clients, WithSendMaxBytes limits the size of a message that the client can send. Limits apply to each message, not to the stream as a whole.
Setting WithSendMaxBytes to zero allows any message size. Both clients and handlers default to allowing any message size.
type Peer ¶ added in v0.5.0
Peer describes the other party to an RPC.
When accessed client-side, Addr contains the host or host:port from the server's URL. When accessed server-side, Addr contains the client's address in IP:port format.
On both the client and the server, Protocol is the RPC protocol in use. Currently, it's either ProtocolConnect, ProtocolGRPC, or ProtocolGRPCWeb, but additional protocols may be added in the future.
Query contains the query parameters for the request. For the server, this will reflect the actual query parameters sent. For the client, it is unset.
type Request ¶
type Request[T any] struct { Msg *T // contains filtered or unexported fields }
Request is a wrapper around a generated request message. It provides access to metadata like headers and the RPC specification, as well as strongly-typed access to the message itself.
func NewRequest ¶
NewRequest wraps a generated request message.
func (*Request[_]) Any ¶
Any returns the concrete request message as an empty interface, so that *Request implements the AnyRequest interface.
func (*Request[_]) HTTPMethod ¶ added in v1.8.0
HTTPMethod returns the HTTP method for this request. This is nearly always POST, but side-effect-free unary RPCs could be made via a GET.
On a newly created request, via NewRequest, this will return the empty string until the actual request is actually sent and the HTTP method determined. This means that client interceptor functions will see the empty string until *after* they delegate to the handler they wrapped. It is even possible for this to return the empty string after such delegation, if the request was never actually sent to the server (and thus no determination ever made about the HTTP method).
func (*Request[_]) Header ¶
Header returns the HTTP headers for this request. Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.
type Response ¶
type Response[T any] struct { Msg *T // contains filtered or unexported fields }
Response is a wrapper around a generated response message. It provides access to metadata like headers and trailers, as well as strongly-typed access to the message itself.
func NewResponse ¶
NewResponse wraps a generated response message.
func (*Response[_]) Any ¶
Any returns the concrete response message as an empty interface, so that *Response implements the AnyResponse interface.
func (*Response[_]) Header ¶
Header returns the HTTP headers for this response. Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.
func (*Response[_]) Trailer ¶
Trailer returns the trailers for this response. Depending on the underlying RPC protocol, trailers may be sent as HTTP trailers or a protocol-specific block of in-body metadata.
Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.
type ServerStream ¶
type ServerStream[Res any] struct { // contains filtered or unexported fields }
ServerStream is the handler's view of a server streaming RPC.
It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.
func (*ServerStream[Res]) Conn ¶ added in v0.4.0
func (s *ServerStream[Res]) Conn() StreamingHandlerConn
Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*ServerStream[Res]) ResponseHeader ¶
func (s *ServerStream[Res]) ResponseHeader() http.Header
ResponseHeader returns the response headers. Headers are sent with the first call to Send.
Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*ServerStream[Res]) ResponseTrailer ¶
func (s *ServerStream[Res]) ResponseTrailer() http.Header
ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.
Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.
func (*ServerStream[Res]) Send ¶
func (s *ServerStream[Res]) Send(msg *Res) error
Send a message to the client. The first call to Send also sends the response headers.
type ServerStreamForClient ¶
type ServerStreamForClient[Res any] struct { // contains filtered or unexported fields }
ServerStreamForClient is the client's view of a server streaming RPC.
It's returned from Client.CallServerStream, but doesn't currently have an exported constructor function.
func (*ServerStreamForClient[Res]) Close ¶
func (s *ServerStreamForClient[Res]) Close() error
Close the receive side of the stream.
func (*ServerStreamForClient[Res]) Conn ¶ added in v0.4.0
func (s *ServerStreamForClient[Res]) Conn() (StreamingClientConn, error)
Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.
func (*ServerStreamForClient[Res]) Err ¶
func (s *ServerStreamForClient[Res]) Err() error
Err returns the first non-EOF error that was encountered by Receive.
func (*ServerStreamForClient[Res]) Msg ¶
func (s *ServerStreamForClient[Res]) Msg() *Res
Msg returns the most recent message unmarshaled by a call to Receive.
func (*ServerStreamForClient[Res]) Receive ¶
func (s *ServerStreamForClient[Res]) Receive() bool
Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.
func (*ServerStreamForClient[Res]) ResponseHeader ¶
func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header
ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.
func (*ServerStreamForClient[Res]) ResponseTrailer ¶
func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header
ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.
type Spec ¶
type Spec struct { StreamType StreamType Schema any // for protobuf RPCs, a protoreflect.MethodDescriptor Procedure string // for example, "/acme.foo.v1.FooService/Bar" IsClient bool // otherwise we're in a handler IdempotencyLevel IdempotencyLevel }
Spec is a description of a client call or a handler invocation.
If you're using Protobuf, protoc-gen-connect-go generates a constant for the fully-qualified Procedure corresponding to each RPC in your schema.
type StreamType ¶
type StreamType uint8
StreamType describes whether the client, server, neither, or both is streaming.
const ( StreamTypeUnary StreamType = 0b00 StreamTypeClient StreamType = 0b01 StreamTypeServer StreamType = 0b10 StreamTypeBidi = StreamTypeClient | StreamTypeServer )
func (StreamType) String ¶ added in v1.8.0
func (s StreamType) String() string
type StreamingClientConn ¶ added in v0.2.0
type StreamingClientConn interface { // Spec and Peer must be safe to call concurrently with all other methods. Spec() Spec Peer() Peer // Send, RequestHeader, and CloseRequest may race with each other, but must // be safe to call concurrently with all other methods. Send(any) error RequestHeader() http.Header CloseRequest() error // Receive, ResponseHeader, ResponseTrailer, and CloseResponse may race with // each other, but must be safe to call concurrently with all other methods. Receive(any) error ResponseHeader() http.Header ResponseTrailer() http.Header CloseResponse() error }
StreamingClientConn is the client's view of a bidirectional message exchange. Interceptors for streaming RPCs may wrap StreamingClientConns.
StreamingClientConns write request headers to the network with the first call to Send. Any subsequent mutations are effectively no-ops. When the server is done sending data, the StreamingClientConn's Receive method returns an error wrapping io.EOF. Clients should check for this using the standard library's errors.Is. If the server encounters an error during processing, subsequent calls to the StreamingClientConn's Send method will return an error wrapping io.EOF; clients may then call Receive to unmarshal the error.
Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.
StreamingClientConn implementations provided by this module guarantee that all returned errors can be cast to *Error using the standard library's errors.As.
In order to support bidirectional streaming RPCs, all StreamingClientConn implementations must support limited concurrent use. See the comments on each group of methods for details.
type StreamingClientFunc ¶ added in v0.2.0
type StreamingClientFunc func(context.Context, Spec) StreamingClientConn
StreamingClientFunc is the generic signature of a streaming RPC from the client's perspective. Interceptors may wrap StreamingClientFuncs.
type StreamingHandlerConn ¶ added in v0.2.0
type StreamingHandlerConn interface { Spec() Spec Peer() Peer Receive(any) error RequestHeader() http.Header Send(any) error ResponseHeader() http.Header ResponseTrailer() http.Header }
StreamingHandlerConn is the server's view of a bidirectional message exchange. Interceptors for streaming RPCs may wrap StreamingHandlerConns.
Like the standard library's http.ResponseWriter, StreamingHandlerConns write response headers to the network with the first call to Send. Any subsequent mutations are effectively no-ops. Handlers may mutate response trailers at any time before returning. When the client has finished sending data, Receive returns an error wrapping io.EOF. Handlers should check for this using the standard library's errors.Is.
Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.
StreamingHandlerConn implementations provided by this module guarantee that all returned errors can be cast to *Error using the standard library's errors.As.
StreamingHandlerConn implementations do not need to be safe for concurrent use.
type StreamingHandlerFunc ¶ added in v0.2.0
type StreamingHandlerFunc func(context.Context, StreamingHandlerConn) error
StreamingHandlerFunc is the generic signature of a streaming RPC from the handler's perspective. Interceptors may wrap StreamingHandlerFuncs.
type UnaryFunc ¶
type UnaryFunc func(context.Context, AnyRequest) (AnyResponse, error)
UnaryFunc is the generic signature of a unary RPC. Interceptors may wrap Funcs.
The type of the request and response structs depend on the codec being used. When using Protobuf, request.Any() and response.Any() will always be proto.Message implementations.
type UnaryInterceptorFunc ¶
UnaryInterceptorFunc is a simple Interceptor implementation that only wraps unary RPCs. It has no effect on streaming RPCs.
Example ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) loggingInterceptor := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("calling:", request.Spec().Procedure) logger.Println("request:", request.Any()) response, err := next(ctx, request) if err != nil { logger.Println("error:", err) } else { logger.Println("response:", response.Any()) } return response, err }) }, ) client := pingv1connect.NewPingServiceClient( examplePingServer.Client(), examplePingServer.URL(), connect.WithInterceptors(loggingInterceptor), ) if _, err := client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42})); err != nil { logger.Println("error:", err) return }
Output: calling: /connect.ping.v1.PingService/Ping request: number:42 response: number:42
func (UnaryInterceptorFunc) WrapStreamingClient ¶ added in v0.2.0
func (f UnaryInterceptorFunc) WrapStreamingClient(next StreamingClientFunc) StreamingClientFunc
WrapStreamingClient implements Interceptor with a no-op.
func (UnaryInterceptorFunc) WrapStreamingHandler ¶ added in v0.2.0
func (f UnaryInterceptorFunc) WrapStreamingHandler(next StreamingHandlerFunc) StreamingHandlerFunc
WrapStreamingHandler implements Interceptor with a no-op.
func (UnaryInterceptorFunc) WrapUnary ¶
func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc
WrapUnary implements Interceptor by applying the interceptor function.
Source Files ¶
- buffer_pool.go
- client.go
- client_stream.go
- code.go
- codec.go
- compression.go
- connect.go
- duplex_http_call.go
- envelope.go
- error.go
- error_writer.go
- handler.go
- handler_stream.go
- header.go
- idempotency_level.go
- interceptor.go
- option.go
- protobuf_util.go
- protocol.go
- protocol_connect.go
- protocol_grpc.go
- recover.go
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
protoc-gen-connect-go
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
|
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code. |
internal
|
|
assert
Package assert is a minimal assert package using generics.
|
Package assert is a minimal assert package using generics. |
gen/connect/ping/v1/pingv1connect
The connect.ping.v1 package contains an echo service designed to test the connect-go implementation.
|
The connect.ping.v1 package contains an echo service designed to test the connect-go implementation. |