connect

package module
v1.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: Apache-2.0 Imports: 28 Imported by: 2,664

README

Connect

Build Report Card GoDoc Slack OpenSSF Best Practices

Connect is a slim library for building browser and gRPC-compatible HTTP APIs. You write a short Protocol Buffer schema and implement your application logic, and Connect generates code to handle marshaling, routing, compression, and content type negotiation. It also generates an idiomatic, type-safe client. Handlers and clients support three protocols: gRPC, gRPC-Web, and Connect's own protocol.

The Connect protocol is a simple protocol that works over HTTP/1.1 or HTTP/2. It takes the best portions of gRPC and gRPC-Web, including streaming, and packages them into a protocol that works equally well in browsers, monoliths, and microservices. Calling a Connect API is as easy as using curl. Try it with our live demo:

curl \
    --header "Content-Type: application/json" \
    --data '{"sentence": "I feel happy."}' \
    https://demo.connectrpc.com/connectrpc.eliza.v1.ElizaService/Say

Handlers and clients also support the gRPC and gRPC-Web protocols, including streaming, headers, trailers, and error details. gRPC-compatible server reflection and health checks are available as standalone packages. Instead of cURL, we could call our API with a gRPC client:

go install github.com/bufbuild/buf/cmd/buf@latest
buf curl --protocol grpc \
    --data '{"sentence": "I feel happy."}' \
    https://demo.connectrpc.com/connectrpc.eliza.v1.ElizaService/Say

Under the hood, Connect is just Protocol Buffers and the standard library: no custom HTTP implementation, no new name resolution or load balancing APIs, and no surprises. Everything you already know about net/http still applies, and any package that works with an http.Server, http.Client, or http.Handler also works with Connect.

For more on Connect, see the announcement blog post, the documentation on connectrpc.com (especially the Getting Started guide for Go), the demo service, or the protocol specification.

A small example

Curious what all this looks like in practice? From a Protobuf schema, we generate a small RPC package. Using that package, we can build a server:

package main

import (
  "context"
  "log"
  "net/http"

  "connectrpc.com/connect"
  pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
  "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
  "golang.org/x/net/http2"
  "golang.org/x/net/http2/h2c"
)

type PingServer struct {
  pingv1connect.UnimplementedPingServiceHandler // returns errors from all methods
}

func (ps *PingServer) Ping(
  ctx context.Context,
  req *connect.Request[pingv1.PingRequest],
) (*connect.Response[pingv1.PingResponse], error) {
  // connect.Request and connect.Response give you direct access to headers and
  // trailers. No context-based nonsense!
  log.Println(req.Header().Get("Some-Header"))
  res := connect.NewResponse(&pingv1.PingResponse{
    // req.Msg is a strongly-typed *pingv1.PingRequest, so we can access its
    // fields without type assertions.
    Number: req.Msg.Number,
  })
  res.Header().Set("Some-Other-Header", "hello!")
  return res, nil
}

func main() {
  mux := http.NewServeMux()
  // The generated constructors return a path and a plain net/http
  // handler.
  mux.Handle(pingv1connect.NewPingServiceHandler(&PingServer{}))
  err := http.ListenAndServe(
    "localhost:8080",
    // For gRPC clients, it's convenient to support HTTP/2 without TLS. You can
    // avoid x/net/http2 by using http.ListenAndServeTLS.
    h2c.NewHandler(mux, &http2.Server{}),
  )
  log.Fatalf("listen failed: %v", err)
}

With that server running, you can make requests with any gRPC or Connect client. To write a client using Connect,

package main

import (
  "context"
  "log"
  "net/http"

  "connectrpc.com/connect"
  pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
  "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

func main() {
  client := pingv1connect.NewPingServiceClient(
    http.DefaultClient,
    "http://localhost:8080/",
  )
  req := connect.NewRequest(&pingv1.PingRequest{
    Number: 42,
  })
  req.Header().Set("Some-Header", "hello from connect")
  res, err := client.Ping(context.Background(), req)
  if err != nil {
    log.Fatalln(err)
  }
  log.Println(res.Msg)
  log.Println(res.Header().Get("Some-Other-Header"))
}

Of course, http.ListenAndServe and http.DefaultClient aren't fit for production use! See Connect's deployment docs for a guide to configuring timeouts, connection pools, observability, and h2c.

Ecosystem

  • grpchealth: gRPC-compatible health checks
  • grpcreflect: gRPC-compatible server reflection
  • examples-go: service powering demo.connectrpc.com, including bidi streaming
  • connect-es: Type-safe APIs with Protobuf and TypeScript
  • Buf Studio: web UI for ad-hoc RPCs
  • conformance: Connect, gRPC, and gRPC-Web interoperability tests

Status: Stable

This module is stable. It supports:

Within those parameters, connect follows semantic versioning. We will not make breaking changes in the 1.x series of releases.

Offered under the Apache 2 license.

Documentation

Overview

Package connect is a slim RPC framework built on Protocol Buffers and net/http. In addition to supporting its own protocol, Connect handlers and clients are wire-compatible with gRPC and gRPC-Web, including streaming.

This documentation is intended to explain each type and function in isolation. Walkthroughs, FAQs, and other narrative docs are available on the Connect website, and there's a working demonstration service on Github.

Example (Client)
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
// To keep this example runnable, we'll use an HTTP server and client
// that communicate over in-memory pipes. The client is still a plain
// *http.Client!
var httpClient *http.Client = examplePingServer.Client()

// By default, clients use the Connect protocol. Add connect.WithGRPC() or
// connect.WithGRPCWeb() to switch protocols.
client := pingv1connect.NewPingServiceClient(
	httpClient,
	examplePingServer.URL(),
)
response, err := client.Ping(
	context.Background(),
	connect.NewRequest(&pingv1.PingRequest{Number: 42}),
)
if err != nil {
	logger.Println("error:", err)
	return
}
logger.Println("response content-type:", response.Header().Get("Content-Type"))
logger.Println("response message:", response.Msg)
Output:

response content-type: application/proto
response message: number:42
Example (Handler)
package main

import (
	"context"
	"errors"
	"io"
	"net/http"

	connect "connectrpc.com/connect"
	pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
	"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

// ExamplePingServer implements some trivial business logic. The Protobuf
// definition for this API is in proto/connect/ping/v1/ping.proto.
type ExamplePingServer struct {
	pingv1connect.UnimplementedPingServiceHandler
}

// Ping implements pingv1connect.PingServiceHandler.
func (*ExamplePingServer) Ping(
	_ context.Context,
	request *connect.Request[pingv1.PingRequest],
) (*connect.Response[pingv1.PingResponse], error) {
	return connect.NewResponse(
		&pingv1.PingResponse{
			Number: request.Msg.GetNumber(),
			Text:   request.Msg.GetText(),
		},
	), nil
}

// Sum implements pingv1connect.PingServiceHandler.
func (p *ExamplePingServer) Sum(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) {
	var sum int64
	for stream.Receive() {
		sum += stream.Msg().GetNumber()
	}
	if stream.Err() != nil {
		return nil, stream.Err()
	}
	return connect.NewResponse(&pingv1.SumResponse{Sum: sum}), nil
}

// CountUp implements pingv1connect.PingServiceHandler.
func (p *ExamplePingServer) CountUp(ctx context.Context, request *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error {
	for number := int64(1); number <= request.Msg.GetNumber(); number++ {
		if err := stream.Send(&pingv1.CountUpResponse{Number: number}); err != nil {
			return err
		}
	}
	return nil
}

// CumSum implements pingv1connect.PingServiceHandler.
func (p *ExamplePingServer) CumSum(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error {
	var sum int64
	for {
		msg, err := stream.Receive()
		if errors.Is(err, io.EOF) {
			return nil
		} else if err != nil {
			return err
		}
		sum += msg.GetNumber()
		if err := stream.Send(&pingv1.CumSumResponse{Sum: sum}); err != nil {
			return err
		}
	}
}

func main() {
	// protoc-gen-connect-go generates constructors that return plain net/http
	// Handlers, so they're compatible with most Go HTTP routers and middleware
	// (for example, net/http's StripPrefix). Each handler automatically supports
	// the Connect, gRPC, and gRPC-Web protocols.
	mux := http.NewServeMux()
	mux.Handle(
		pingv1connect.NewPingServiceHandler(
			&ExamplePingServer{}, // our business logic
		),
	)
	// You can serve gRPC's health and server reflection APIs using
	// connectrpc.com/grpchealth and connectrpc.com/grpcreflect.
	_ = http.ListenAndServeTLS(
		"localhost:8080",
		"internal/testdata/server.crt",
		"internal/testdata/server.key",
		mux,
	)
	// To serve HTTP/2 requests without TLS (as many gRPC clients expect), import
	// golang.org/x/net/http2/h2c and golang.org/x/net/http2 and change to:
	// _ = http.ListenAndServe(
	// 	"localhost:8080",
	// 	h2c.NewHandler(mux, &http2.Server{}),
	// )
}
Output:

Index

Examples

Constants

View Source
const (
	IsAtLeastVersion0_0_1  = true
	IsAtLeastVersion0_1_0  = true
	IsAtLeastVersion1_7_0  = true
	IsAtLeastVersion1_13_0 = true
)

These constants are used in compile-time handshakes with connect's generated code.

View Source
const (
	ProtocolConnect = "connect"
	ProtocolGRPC    = "grpc"
	ProtocolGRPCWeb = "grpcweb"
)

The names of the Connect, gRPC, and gRPC-Web protocols (as exposed by [Peer.Protocol]). Additional protocols may be added in the future.

View Source
const Version = "1.17.0"

Version is the semantic version of the connect module.

Variables

This section is empty.

Functions

func DecodeBinaryHeader

func DecodeBinaryHeader(data string) ([]byte, error)

DecodeBinaryHeader base64-decodes the data. It can decode padded or unpadded values. Following usual HTTP semantics, multiple base64-encoded values may be joined with a comma. When receiving such comma-separated values, split them with strings.Split before calling DecodeBinaryHeader.

Binary headers sent using the Connect, gRPC, and gRPC-Web protocols have keys ending in "-Bin".

func EncodeBinaryHeader

func EncodeBinaryHeader(data []byte) string

EncodeBinaryHeader base64-encodes the data. It always emits unpadded values.

In the Connect, gRPC, and gRPC-Web protocols, binary headers must have keys ending in "-Bin".

func IsNotModifiedError added in v1.7.0

func IsNotModifiedError(err error) bool

IsNotModifiedError checks whether the supplied error indicates that the requested resource hasn't changed. It only returns true if the server used NewNotModifiedError in response to a Connect-protocol RPC made with an HTTP GET.

Example
package main

import (
	"context"
	"fmt"
	"net/http"

	connect "connectrpc.com/connect"
	pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
	"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

func main() {
	// Assume that the server from NewNotModifiedError's example is running on
	// localhost:8080.
	client := pingv1connect.NewPingServiceClient(
		http.DefaultClient,
		"http://localhost:8080",
		// Enable client-side support for HTTP GETs.
		connect.WithHTTPGet(),
	)
	req := connect.NewRequest(&pingv1.PingRequest{Number: 42})
	first, err := client.Ping(context.Background(), req)
	if err != nil {
		fmt.Println(err)
		return
	}
	// If the server set an Etag, we can use it to cache the response.
	etag := first.Header().Get("Etag")
	if etag == "" {
		fmt.Println("no Etag in response headers")
		return
	}
	fmt.Println("cached response with Etag", etag)
	// Now we'd like to make the same request again, but avoid re-fetching the
	// response if possible.
	req.Header().Set("If-None-Match", etag)
	_, err = client.Ping(context.Background(), req)
	if connect.IsNotModifiedError(err) {
		fmt.Println("can reuse cached response")
	}
}
Output:

func IsWireError added in v1.1.0

func IsWireError(err error) bool

IsWireError checks whether the error was returned by the server, as opposed to being synthesized by the client.

Clients may find this useful when deciding how to propagate errors. For example, an RPC-to-HTTP proxy might expose a server-sent CodeUnknown as an HTTP 500 but a client-synthesized CodeUnknown as a 503.

Handlers will strip Error.Meta headers propagated from wire errors to avoid leaking response headers. To propagate headers recreate the error as a non-wire error.

Types

type AnyRequest

type AnyRequest interface {
	Any() any
	Spec() Spec
	Peer() Peer
	Header() http.Header
	HTTPMethod() string
	// contains filtered or unexported methods
}

AnyRequest is the common method set of every Request, regardless of type parameter. It's used in unary interceptors.

Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.

To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.

type AnyResponse

type AnyResponse interface {
	Any() any
	Header() http.Header
	Trailer() http.Header
	// contains filtered or unexported methods
}

AnyResponse is the common method set of every Response, regardless of type parameter. It's used in unary interceptors.

Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.

To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyResponse.

type BidiStream

type BidiStream[Req, Res any] struct {
	// contains filtered or unexported fields
}

BidiStream is the handler's view of a bidirectional streaming RPC.

It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.

func (*BidiStream[Req, Res]) Conn added in v0.4.0

func (b *BidiStream[Req, Res]) Conn() StreamingHandlerConn

Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*BidiStream[_, _]) Peer added in v0.5.0

func (b *BidiStream[_, _]) Peer() Peer

Peer describes the client for this RPC.

func (*BidiStream[Req, Res]) Receive

func (b *BidiStream[Req, Res]) Receive() (*Req, error)

Receive a message. When the client is done sending messages, Receive will return an error that wraps io.EOF.

func (*BidiStream[Req, Res]) RequestHeader

func (b *BidiStream[Req, Res]) RequestHeader() http.Header

RequestHeader returns the headers received from the client.

func (*BidiStream[Req, Res]) ResponseHeader

func (b *BidiStream[Req, Res]) ResponseHeader() http.Header

ResponseHeader returns the response headers. Headers are sent with the first call to Send.

Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*BidiStream[Req, Res]) ResponseTrailer

func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header

ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.

Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*BidiStream[Req, Res]) Send

func (b *BidiStream[Req, Res]) Send(msg *Res) error

Send a message to the client. The first call to Send also sends the response headers.

func (*BidiStream[_, _]) Spec added in v0.5.0

func (b *BidiStream[_, _]) Spec() Spec

Spec returns the specification for the RPC.

type BidiStreamForClient

type BidiStreamForClient[Req, Res any] struct {
	// contains filtered or unexported fields
}

BidiStreamForClient is the client's view of a bidirectional streaming RPC.

It's returned from Client.CallBidiStream, but doesn't currently have an exported constructor function.

func (*BidiStreamForClient[Req, Res]) CloseRequest added in v0.2.0

func (b *BidiStreamForClient[Req, Res]) CloseRequest() error

CloseRequest closes the send side of the stream.

func (*BidiStreamForClient[Req, Res]) CloseResponse added in v0.2.0

func (b *BidiStreamForClient[Req, Res]) CloseResponse() error

CloseResponse closes the receive side of the stream.

func (*BidiStreamForClient[Req, Res]) Conn added in v0.4.0

func (b *BidiStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)

Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*BidiStreamForClient[_, _]) Peer added in v0.5.0

func (b *BidiStreamForClient[_, _]) Peer() Peer

Peer describes the server for the RPC.

func (*BidiStreamForClient[Req, Res]) Receive

func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)

Receive a message. When the server is done sending messages and no other errors have occurred, Receive will return an error that wraps io.EOF.

func (*BidiStreamForClient[Req, Res]) RequestHeader

func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header

RequestHeader returns the request headers. Headers are sent with the first call to Send.

Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*BidiStreamForClient[Req, Res]) ResponseHeader

func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header

ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.

func (*BidiStreamForClient[Req, Res]) ResponseTrailer

func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header

ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.

func (*BidiStreamForClient[Req, Res]) Send

func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error

Send a message to the server. The first call to Send also sends the request headers. To send just the request headers, without a body, call Send with a nil pointer.

If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for EOF using the standard library's errors.Is and call Receive to retrieve the error.

func (*BidiStreamForClient[_, _]) Spec added in v0.5.0

func (b *BidiStreamForClient[_, _]) Spec() Spec

Spec returns the specification for the RPC.

type Client

type Client[Req, Res any] struct {
	// contains filtered or unexported fields
}

Client is a reusable, concurrency-safe client for a single procedure. Depending on the procedure's type, use the CallUnary, CallClientStream, CallServerStream, or CallBidiStream method.

By default, clients use the Connect protocol with the binary Protobuf Codec, ask for gzipped responses, and send uncompressed requests. To use the gRPC or gRPC-Web protocols, use the WithGRPC or WithGRPCWeb options.

func NewClient

func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...ClientOption) *Client[Req, Res]

NewClient constructs a new Client.

func (*Client[Req, Res]) CallBidiStream

func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]

CallBidiStream calls a bidirectional streaming procedure.

func (*Client[Req, Res]) CallClientStream

func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]

CallClientStream calls a client streaming procedure.

func (*Client[Req, Res]) CallServerStream

func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Request[Req]) (*ServerStreamForClient[Res], error)

CallServerStream calls a server streaming procedure.

func (*Client[Req, Res]) CallUnary

func (c *Client[Req, Res]) CallUnary(ctx context.Context, request *Request[Req]) (*Response[Res], error)

CallUnary calls a request-response procedure.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

A ClientOption configures a Client.

In addition to any options grouped in the documentation below, remember that any Option is also a valid ClientOption.

func WithAcceptCompression

func WithAcceptCompression(
	name string,
	newDecompressor func() Decompressor,
	newCompressor func() Compressor,
) ClientOption

WithAcceptCompression makes a compression algorithm available to a client. Clients ask servers to compress responses using any of the registered algorithms. The first registered algorithm is treated as the least preferred, and the last registered algorithm is the most preferred.

It's safe to use this option liberally: servers will ignore any compression algorithms they don't support. To compress requests, pair this option with WithSendCompression. To remove support for a previously-registered compression algorithm, use WithAcceptCompression with nil decompressor and compressor constructors.

Clients accept gzipped responses by default, using a compressor backed by the standard library's gzip package with the default compression level. Use WithSendGzip to compress requests with gzip.

Calling WithAcceptCompression with an empty name is a no-op.

func WithClientOptions

func WithClientOptions(options ...ClientOption) ClientOption

WithClientOptions composes multiple ClientOptions into one.

func WithGRPC

func WithGRPC() ClientOption

WithGRPC configures clients to use the HTTP/2 gRPC protocol.

func WithGRPCWeb

func WithGRPCWeb() ClientOption

WithGRPCWeb configures clients to use the gRPC-Web protocol.

func WithHTTPGet added in v1.7.0

func WithHTTPGet() ClientOption

WithHTTPGet allows Connect-protocol clients to use HTTP GET requests for side-effect free unary RPC calls. Typically, the service schema indicates which procedures are idempotent (see WithIdempotency for an example protobuf schema). The gRPC and gRPC-Web protocols are POST-only, so this option has no effect when combined with WithGRPC or WithGRPCWeb.

Using HTTP GET requests makes it easier to take advantage of CDNs, caching reverse proxies, and browsers' built-in caching. Note, however, that servers don't automatically set any cache headers; you can set cache headers using interceptors or by adding headers in individual procedure implementations.

By default, all requests are made as HTTP POSTs.

func WithHTTPGetMaxURLSize added in v1.9.1

func WithHTTPGetMaxURLSize(bytes int, fallback bool) ClientOption

WithHTTPGetMaxURLSize sets the maximum allowable URL length for GET requests made using the Connect protocol. It has no effect on gRPC or gRPC-Web clients, since those protocols are POST-only.

Limiting the URL size is useful as most user agents, proxies, and servers have limits on the allowable length of a URL. For example, Apache and Nginx limit the size of a request line to around 8 KiB, meaning that maximum length of a URL is a bit smaller than this. If you run into URL size limitations imposed by your network infrastructure and don't know the maximum allowable size, or if you'd prefer to be cautious from the start, a 4096 byte (4 KiB) limit works with most common proxies and CDNs.

If fallback is set to true and the URL would be longer than the configured maximum value, the request will be sent as an HTTP POST instead. If fallback is set to false, the request will fail with CodeResourceExhausted.

By default, Connect-protocol clients with GET requests enabled may send a URL of any size.

func WithProtoJSON

func WithProtoJSON() ClientOption

WithProtoJSON configures a client to send JSON-encoded data instead of binary Protobuf. It uses the standard Protobuf JSON mapping as implemented by google.golang.org/protobuf/encoding/protojson: fields are named using lowerCamelCase, zero values are omitted, missing required fields are errors, enums are emitted as strings, etc.

func WithResponseInitializer added in v1.13.0

func WithResponseInitializer(initializer func(spec Spec, message any) error) ClientOption

WithResponseInitializer provides a function that initializes a new message. It may be used to dynamically construct response messages. It is called on client receives to construct the message to be unmarshaled into. The message will be a non nil pointer to the type created by the client. Use the Schema field of the Spec to determine the type of the message.

func WithSendCompression

func WithSendCompression(name string) ClientOption

WithSendCompression configures the client to use the specified algorithm to compress request messages. If the algorithm has not been registered using WithAcceptCompression, the client will return errors at runtime.

Because some servers don't support compression, clients default to sending uncompressed requests.

func WithSendGzip

func WithSendGzip() ClientOption

WithSendGzip configures the client to gzip requests. Since clients have access to a gzip compressor by default, WithSendGzip doesn't require WithSendCompression.

Some servers don't support gzip, so clients default to sending uncompressed requests.

type ClientStream

type ClientStream[Req any] struct {
	// contains filtered or unexported fields
}

ClientStream is the handler's view of a client streaming RPC.

It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.

func (*ClientStream[Req]) Conn added in v0.4.0

func (c *ClientStream[Req]) Conn() StreamingHandlerConn

Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*ClientStream[Req]) Err

func (c *ClientStream[Req]) Err() error

Err returns the first non-EOF error that was encountered by Receive.

func (*ClientStream[Req]) Msg

func (c *ClientStream[Req]) Msg() *Req

Msg returns the most recent message unmarshaled by a call to Receive.

func (*ClientStream[_]) Peer added in v0.5.0

func (c *ClientStream[_]) Peer() Peer

Peer describes the client for this RPC.

func (*ClientStream[Req]) Receive

func (c *ClientStream[Req]) Receive() bool

Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.

func (*ClientStream[Req]) RequestHeader

func (c *ClientStream[Req]) RequestHeader() http.Header

RequestHeader returns the headers received from the client.

func (*ClientStream[_]) Spec added in v0.5.0

func (c *ClientStream[_]) Spec() Spec

Spec returns the specification for the RPC.

type ClientStreamForClient

type ClientStreamForClient[Req, Res any] struct {
	// contains filtered or unexported fields
}

ClientStreamForClient is the client's view of a client streaming RPC.

It's returned from Client.CallClientStream, but doesn't currently have an exported constructor function.

func (*ClientStreamForClient[Req, Res]) CloseAndReceive

func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], error)

CloseAndReceive closes the send side of the stream and waits for the response.

func (*ClientStreamForClient[Req, Res]) Conn added in v0.4.0

func (c *ClientStreamForClient[Req, Res]) Conn() (StreamingClientConn, error)

Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*ClientStreamForClient[_, _]) Peer added in v0.5.0

func (c *ClientStreamForClient[_, _]) Peer() Peer

Peer describes the server for the RPC.

func (*ClientStreamForClient[Req, Res]) RequestHeader

func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header

RequestHeader returns the request headers. Headers are sent to the server with the first call to Send.

Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*ClientStreamForClient[Req, Res]) Send

func (c *ClientStreamForClient[Req, Res]) Send(request *Req) error

Send a message to the server. The first call to Send also sends the request headers.

If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using CloseAndReceive.

func (*ClientStreamForClient[_, _]) Spec added in v0.5.0

func (c *ClientStreamForClient[_, _]) Spec() Spec

Spec returns the specification for the RPC.

type Code

type Code uint32

A Code is one of the Connect protocol's error codes. There are no user-defined codes, so only the codes enumerated below are valid. In both name and semantics, these codes match the gRPC status codes.

The descriptions below are optimized for brevity rather than completeness. See the Connect protocol specification for detailed descriptions of each code and example usage.

const (

	// CodeCanceled indicates that the operation was canceled, typically by the
	// caller.
	CodeCanceled Code = 1

	// CodeUnknown indicates that the operation failed for an unknown reason.
	CodeUnknown Code = 2

	// CodeInvalidArgument indicates that client supplied an invalid argument.
	CodeInvalidArgument Code = 3

	// CodeDeadlineExceeded indicates that deadline expired before the operation
	// could complete.
	CodeDeadlineExceeded Code = 4

	// CodeNotFound indicates that some requested entity (for example, a file or
	// directory) was not found.
	CodeNotFound Code = 5

	// CodeAlreadyExists indicates that client attempted to create an entity (for
	// example, a file or directory) that already exists.
	CodeAlreadyExists Code = 6

	// CodePermissionDenied indicates that the caller doesn't have permission to
	// execute the specified operation.
	CodePermissionDenied Code = 7

	// CodeResourceExhausted indicates that some resource has been exhausted. For
	// example, a per-user quota may be exhausted or the entire file system may
	// be full.
	CodeResourceExhausted Code = 8

	// CodeFailedPrecondition indicates that the system is not in a state
	// required for the operation's execution.
	CodeFailedPrecondition Code = 9

	// CodeAborted indicates that operation was aborted by the system, usually
	// because of a concurrency issue such as a sequencer check failure or
	// transaction abort.
	CodeAborted Code = 10

	// CodeOutOfRange indicates that the operation was attempted past the valid
	// range (for example, seeking past end-of-file).
	CodeOutOfRange Code = 11

	// CodeUnimplemented indicates that the operation isn't implemented,
	// supported, or enabled in this service.
	CodeUnimplemented Code = 12

	// CodeInternal indicates that some invariants expected by the underlying
	// system have been broken. This code is reserved for serious errors.
	CodeInternal Code = 13

	// CodeUnavailable indicates that the service is currently unavailable. This
	// is usually temporary, so clients can back off and retry idempotent
	// operations.
	CodeUnavailable Code = 14

	// CodeDataLoss indicates that the operation has resulted in unrecoverable
	// data loss or corruption.
	CodeDataLoss Code = 15

	// CodeUnauthenticated indicates that the request does not have valid
	// authentication credentials for the operation.
	CodeUnauthenticated Code = 16
)

func CodeOf

func CodeOf(err error) Code

CodeOf returns the error's status code if it is or wraps an *Error and CodeUnknown otherwise.

func (Code) MarshalText

func (c Code) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler.

func (Code) String

func (c Code) String() string

func (*Code) UnmarshalText

func (c *Code) UnmarshalText(data []byte) error

UnmarshalText implements encoding.TextUnmarshaler.

type Codec

type Codec interface {
	// Name returns the name of the Codec.
	//
	// This may be used as part of the Content-Type within HTTP. For example,
	// with gRPC this is the content subtype, so "application/grpc+proto" will
	// map to the Codec with name "proto".
	//
	// Names must not be empty.
	Name() string
	// Marshal marshals the given message.
	//
	// Marshal may expect a specific type of message, and will error if this type
	// is not given.
	Marshal(any) ([]byte, error)
	// Unmarshal unmarshals the given message.
	//
	// Unmarshal may expect a specific type of message, and will error if this
	// type is not given.
	Unmarshal([]byte, any) error
}

Codec marshals structs (typically generated from a schema) to and from bytes.

type Compressor

type Compressor interface {
	io.Writer

	// Close flushes any buffered data to the underlying sink, then closes the
	// Compressor. It must not close the underlying sink.
	Close() error

	// Reset discards the Compressor's internal state, if any, and prepares it to
	// write compressed data to a new sink.
	Reset(io.Writer)
}

A Compressor is a reusable wrapper that compresses data written to an underlying sink. The standard library's *gzip.Writer implements Compressor.

type Decompressor

type Decompressor interface {
	io.Reader

	// Close closes the Decompressor, but not the underlying data source. It may
	// return an error if the Decompressor wasn't read to EOF.
	Close() error

	// Reset discards the Decompressor's internal state, if any, and prepares it
	// to read from a new source of compressed data.
	Reset(io.Reader) error
}

A Decompressor is a reusable wrapper that decompresses an underlying data source. The standard library's *gzip.Reader implements Decompressor.

type Error

type Error struct {
	// contains filtered or unexported fields
}

An Error captures four key pieces of information: a Code, an underlying Go error, a map of metadata, and an optional collection of arbitrary Protobuf messages called "details" (more on those below). Servers send the code, the underlying error's Error() output, the metadata, and details over the wire to clients. Remember that the underlying error's message will be sent to clients - take care not to leak sensitive information from public APIs!

Service implementations and interceptors should return errors that can be cast to an *Error (using the standard library's errors.As). If the returned error can't be cast to an *Error, connect will use CodeUnknown and the returned error's message.

Error details are an optional mechanism for servers, interceptors, and proxies to attach arbitrary Protobuf messages to the error code and message. They're a clearer and more performant alternative to HTTP header microformats. See the documentation on errors for more details.

func NewError

func NewError(c Code, underlying error) *Error

NewError annotates any Go error with a status code.

func NewNotModifiedError added in v1.7.0

func NewNotModifiedError(headers http.Header) *Error

NewNotModifiedError indicates that the requested resource hasn't changed. It should be used only when handlers wish to respond to conditional HTTP GET requests with a 304 Not Modified. In all other circumstances, including all RPCs using the gRPC or gRPC-Web protocols, it's equivalent to sending an error with CodeUnknown. The supplied headers should include Etag, Cache-Control, or any other headers required by RFC 9110 § 15.4.5.

Clients should check for this error using IsNotModifiedError.

Example
package main

import (
	"context"
	"net/http"
	"strconv"

	connect "connectrpc.com/connect"
	pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
	"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

// ExampleCachingServer is an example of how servers can take advantage the
// Connect protocol's support for HTTP-level caching. The Protobuf
// definition for this API is in proto/connect/ping/v1/ping.proto.
type ExampleCachingPingServer struct {
	pingv1connect.UnimplementedPingServiceHandler
}

// Ping is idempotent and free of side effects (and the Protobuf schema
// indicates this), so clients using the Connect protocol may call it with HTTP
// GET requests. This implementation uses Etags to manage client-side caching.
func (*ExampleCachingPingServer) Ping(
	_ context.Context,
	req *connect.Request[pingv1.PingRequest],
) (*connect.Response[pingv1.PingResponse], error) {
	resp := connect.NewResponse(&pingv1.PingResponse{
		Number: req.Msg.GetNumber(),
	})
	// Our hashing logic is simple: we use the number in the PingResponse.
	hash := strconv.FormatInt(resp.Msg.GetNumber(), 10)
	// If the request was an HTTP GET, we'll need to check if the client already
	// has the response cached.
	if req.HTTPMethod() == http.MethodGet && req.Header().Get("If-None-Match") == hash {
		return nil, connect.NewNotModifiedError(http.Header{
			"Etag": []string{hash},
		})
	}
	resp.Header().Set("Etag", hash)
	return resp, nil
}

func main() {
	mux := http.NewServeMux()
	mux.Handle(pingv1connect.NewPingServiceHandler(&ExampleCachingPingServer{}))
	_ = http.ListenAndServe("localhost:8080", mux)
}
Output:

func NewWireError added in v1.5.0

func NewWireError(c Code, underlying error) *Error

NewWireError is similar to NewError, but the resulting *Error returns true when tested with IsWireError.

This is useful for clients trying to propagate partial failures from streaming RPCs. Often, these RPCs include error information in their response messages (for example, gRPC server reflection and OpenTelemetry's OTLP). Clients propagating these errors up the stack should use NewWireError to clarify that the error code, message, and details (if any) were explicitly sent by the server rather than inferred from a lower-level networking error or timeout.

func (*Error) AddDetail

func (e *Error) AddDetail(d *ErrorDetail)

AddDetail appends to the error's details.

func (*Error) Code

func (e *Error) Code() Code

Code returns the error's status code.

func (*Error) Details

func (e *Error) Details() []*ErrorDetail

Details returns the error's details.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Message

func (e *Error) Message() string

Message returns the underlying error message. It may be empty if the original error was created with a status code and a nil error.

Example
package main

import (
	"errors"
	"fmt"

	connect "connectrpc.com/connect"
)

func main() {
	err := fmt.Errorf(
		"another: %w",
		connect.NewError(connect.CodeUnavailable, errors.New("failed to foo")),
	)
	if connectErr := (&connect.Error{}); errors.As(err, &connectErr) {
		fmt.Println("underlying error message:", connectErr.Message())
	}

}
Output:

underlying error message: failed to foo

func (*Error) Meta

func (e *Error) Meta() http.Header

Meta allows the error to carry additional information as key-value pairs.

Metadata attached to errors returned by unary handlers is always sent as HTTP headers, regardless of the protocol. Metadata attached to errors returned by streaming handlers may be sent as HTTP headers, HTTP trailers, or a block of in-body metadata, depending on the protocol in use and whether or not the handler has already written messages to the stream.

Protocol-specific headers and trailers may be removed to avoid breaking protocol semantics. For example, Content-Length and Content-Type headers won't be propagated. See the documentation for each protocol for more datails.

When clients receive errors, the metadata contains the union of the HTTP headers and the protocol-specific trailers (either HTTP trailers or in-body metadata).

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap allows errors.Is and errors.As access to the underlying error.

type ErrorDetail

type ErrorDetail struct {
	// contains filtered or unexported fields
}

An ErrorDetail is a self-describing Protobuf message attached to an *Error. Error details are sent over the network to clients, which can then work with strongly-typed data rather than trying to parse a complex error message. For example, you might use details to send a localized error message or retry parameters to the client.

The google.golang.org/genproto/googleapis/rpc/errdetails package contains a variety of Protobuf messages commonly used as error details.

func NewErrorDetail added in v0.3.0

func NewErrorDetail(msg proto.Message) (*ErrorDetail, error)

NewErrorDetail constructs a new error detail. If msg is an *anypb.Any then it is used as is. Otherwise, it is first marshalled into an *anypb.Any value. This returns an error if msg cannot be marshalled.

func (*ErrorDetail) Bytes added in v0.3.0

func (d *ErrorDetail) Bytes() []byte

Bytes returns a copy of the Protobuf-serialized detail.

func (*ErrorDetail) Type added in v0.3.0

func (d *ErrorDetail) Type() string

Type is the fully-qualified name of the detail's Protobuf message (for example, acme.foo.v1.FooDetail).

func (*ErrorDetail) Value added in v0.3.0

func (d *ErrorDetail) Value() (proto.Message, error)

Value uses the Protobuf runtime's package-global registry to unmarshal the Detail into a strongly-typed message. Typically, clients use Go type assertions to cast from the proto.Message interface to concrete types.

type ErrorWriter added in v0.5.0

type ErrorWriter struct {
	// contains filtered or unexported fields
}

An ErrorWriter writes errors to an http.ResponseWriter in the format expected by an RPC client. This is especially useful in server-side net/http middleware, where you may wish to handle requests from RPC and non-RPC clients with the same code.

ErrorWriters are safe to use concurrently.

Example
package main

import (
	"errors"
	"io"
	"log"
	"net/http"

	connect "connectrpc.com/connect"
)

// NewHelloHandler is an example HTTP handler. In a real application, it might
// handle RPCs, requests for HTML, or anything else.
func NewHelloHandler() http.Handler {
	return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
		io.WriteString(response, "Hello, world!")
	})
}

// NewAuthenticatedHandler is an example of middleware that works with both RPC
// and non-RPC clients.
func NewAuthenticatedHandler(handler http.Handler) http.Handler {
	errorWriter := connect.NewErrorWriter()
	return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
		// Dummy authentication logic.
		if request.Header.Get("Token") == "super-secret" {
			handler.ServeHTTP(response, request)
			return
		}
		defer request.Body.Close()
		defer io.Copy(io.Discard, request.Body)
		if errorWriter.IsSupported(request) {
			// Send a protocol-appropriate error to RPC clients, so that they receive
			// the right code, message, and any metadata or error details.
			unauthenticated := connect.NewError(connect.CodeUnauthenticated, errors.New("invalid token"))
			errorWriter.Write(response, request, unauthenticated)
		} else {
			// Send an error to non-RPC clients.
			response.WriteHeader(http.StatusUnauthorized)
			io.WriteString(response, "invalid token")
		}
	})
}

func main() {
	mux := http.NewServeMux()
	mux.Handle("/", NewHelloHandler())
	srv := &http.Server{
		Addr:    ":8080",
		Handler: NewAuthenticatedHandler(mux),
	}
	if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
		log.Fatalln(err)
	}
}
Output:

func NewErrorWriter added in v0.5.0

func NewErrorWriter(opts ...HandlerOption) *ErrorWriter

NewErrorWriter constructs an ErrorWriter. Handler options may be passed to configure the error writer behaviour to match the handlers. [WithRequiredConnectProtocolHeader] will assert that Connect protocol requests include the version header allowing the error writer to correctly classify the request. Options supplied via WithConditionalHandlerOptions are ignored.

func (*ErrorWriter) IsSupported added in v0.5.0

func (w *ErrorWriter) IsSupported(request *http.Request) bool

IsSupported checks whether a request is using one of the ErrorWriter's supported RPC protocols.

func (*ErrorWriter) Write added in v0.5.0

func (w *ErrorWriter) Write(response http.ResponseWriter, request *http.Request, err error) error

Write an error, using the format appropriate for the RPC protocol in use. Callers should first use IsSupported to verify that the request is using one of the ErrorWriter's supported RPC protocols. If the protocol is unknown, Write will send the error as unprefixed, Connect-formatted JSON.

Write does not read or close the request body.

type HTTPClient

type HTTPClient interface {
	Do(*http.Request) (*http.Response, error)
}

HTTPClient is the interface connect expects HTTP clients to implement. The standard library's *http.Client implements HTTPClient.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

A Handler is the server-side implementation of a single RPC defined by a service schema.

By default, Handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf and JSON codecs. They support gzip compression using the standard library's compress/gzip.

func NewBidiStreamHandler

func NewBidiStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *BidiStream[Req, Res]) error,
	options ...HandlerOption,
) *Handler

NewBidiStreamHandler constructs a Handler for a bidirectional streaming procedure.

func NewClientStreamHandler

func NewClientStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *ClientStream[Req]) (*Response[Res], error),
	options ...HandlerOption,
) *Handler

NewClientStreamHandler constructs a Handler for a client streaming procedure.

func NewServerStreamHandler

func NewServerStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *Request[Req], *ServerStream[Res]) error,
	options ...HandlerOption,
) *Handler

NewServerStreamHandler constructs a Handler for a server streaming procedure.

func NewUnaryHandler

func NewUnaryHandler[Req, Res any](
	procedure string,
	unary func(context.Context, *Request[Req]) (*Response[Res], error),
	options ...HandlerOption,
) *Handler

NewUnaryHandler constructs a Handler for a request-response procedure.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request)

ServeHTTP implements http.Handler.

type HandlerOption

type HandlerOption interface {
	// contains filtered or unexported methods
}

A HandlerOption configures a Handler.

In addition to any options grouped in the documentation below, remember that any Option is also a HandlerOption.

func WithCompression

func WithCompression(
	name string,
	newDecompressor func() Decompressor,
	newCompressor func() Compressor,
) HandlerOption

WithCompression configures handlers to support a compression algorithm. Clients may send messages compressed with that algorithm and/or request compressed responses. The Compressor and Decompressor produced by the supplied constructors must use the same algorithm. Internally, Connect pools compressors and decompressors.

By default, handlers support gzip using the standard library's compress/gzip package at the default compression level. To remove support for a previously-registered compression algorithm, use WithCompression with nil decompressor and compressor constructors.

Calling WithCompression with an empty name is a no-op.

func WithConditionalHandlerOptions added in v1.10.1

func WithConditionalHandlerOptions(conditional func(spec Spec) []HandlerOption) HandlerOption

WithConditionalHandlerOptions allows procedures in the same service to have different configurations: for example, one procedure may need a much larger WithReadMaxBytes setting than the others.

WithConditionalHandlerOptions takes a function which may inspect each procedure's Spec before deciding which options to apply. Returning a nil slice is safe.

Example
package main

import (
	connect "connectrpc.com/connect"
	"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
)

func main() {
	connect.WithConditionalHandlerOptions(func(spec connect.Spec) []connect.HandlerOption {
		var options []connect.HandlerOption
		if spec.Procedure == pingv1connect.PingServicePingProcedure {
			options = append(options, connect.WithReadMaxBytes(1024))
		}
		return options
	})
}
Output:

func WithHandlerOptions

func WithHandlerOptions(options ...HandlerOption) HandlerOption

WithHandlerOptions composes multiple HandlerOptions into one.

func WithRecover added in v0.2.0

func WithRecover(handle func(context.Context, Spec, http.Header, any) error) HandlerOption

WithRecover adds an interceptor that recovers from panics. The supplied function receives the context, Spec, request headers, and the recovered value (which may be nil). It must return an error to send back to the client. It may also log the panic, emit metrics, or execute other error-handling logic. Handler functions must be safe to call concurrently.

To preserve compatibility with net/http's semantics, this interceptor doesn't handle panics with http.ErrAbortHandler.

By default, handlers don't recover from panics. Because the standard library's http.Server recovers from panics by default, this option isn't usually necessary to prevent crashes. Instead, it helps servers collect RPC-specific data during panics and send a more detailed error to clients.

func WithRequestInitializer added in v1.13.0

func WithRequestInitializer(initializer func(spec Spec, message any) error) HandlerOption

WithRequestInitializer provides a function that initializes a new message. It may be used to dynamically construct request messages. It is called on server receives to construct the message to be unmarshaled into. The message will be a non nil pointer to the type created by the handler. Use the Schema field of the Spec to determine the type of the message.

func WithRequireConnectProtocolHeader added in v1.4.0

func WithRequireConnectProtocolHeader() HandlerOption

WithRequireConnectProtocolHeader configures the Handler to require requests using the Connect RPC protocol to include the Connect-Protocol-Version header. This ensures that HTTP proxies and net/http middleware can easily identify valid Connect requests, even if they use a common Content-Type like application/json. However, it makes ad-hoc requests with tools like cURL more laborious. Streaming requests are not affected by this option.

This option has no effect if the client uses the gRPC or gRPC-Web protocols.

type IdempotencyLevel added in v1.7.0

type IdempotencyLevel int

An IdempotencyLevel is a value that declares how "idempotent" an RPC is. This value can affect RPC behaviors, such as determining whether it is safe to retry a request, or what kinds of request modalities are allowed for a given procedure.

const (
	// IdempotencyUnknown is the default idempotency level. A procedure with
	// this idempotency level may not be idempotent. This is appropriate for
	// any kind of procedure.
	IdempotencyUnknown IdempotencyLevel = 0

	// IdempotencyNoSideEffects is the idempotency level that specifies that a
	// given call has no side-effects. This is equivalent to [RFC 9110 § 9.2.1]
	// "safe" methods in terms of semantics. This procedure should not mutate
	// any state. This idempotency level is appropriate for queries, or anything
	// that would be suitable for an HTTP GET request. In addition, due to the
	// lack of side-effects, such a procedure would be suitable to retry and
	// expect that the results will not be altered by preceding attempts.
	//
	// [RFC 9110 § 9.2.1]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.1
	IdempotencyNoSideEffects IdempotencyLevel = 1

	// IdempotencyIdempotent is the idempotency level that specifies that a
	// given call is "idempotent", such that multiple instances of the same
	// request to this procedure would have the same side-effects as a single
	// request. This is equivalent to [RFC 9110 § 9.2.2] "idempotent" methods.
	// This level is a subset of the previous level. This idempotency level is
	// appropriate for any procedure that is safe to retry multiple times
	// and be guaranteed that the response and side-effects will not be altered
	// as a result of multiple attempts, for example, entity deletion requests.
	//
	// [RFC 9110 § 9.2.2]: https://www.rfc-editor.org/rfc/rfc9110.html#section-9.2.2
	IdempotencyIdempotent IdempotencyLevel = 2
)

func (IdempotencyLevel) String added in v1.7.0

func (i IdempotencyLevel) String() string

type Interceptor

type Interceptor interface {
	WrapUnary(UnaryFunc) UnaryFunc
	WrapStreamingClient(StreamingClientFunc) StreamingClientFunc
	WrapStreamingHandler(StreamingHandlerFunc) StreamingHandlerFunc
}

An Interceptor adds logic to a generated handler or client, like the decorators or middleware you may have seen in other libraries. Interceptors may replace the context, mutate requests and responses, handle errors, retry, recover from panics, emit logs and metrics, or do nearly anything else.

The returned functions must be safe to call concurrently.

type Option

type Option interface {
	ClientOption
	HandlerOption
}

Option implements both ClientOption and HandlerOption, so it can be applied both client-side and server-side.

func WithCodec

func WithCodec(codec Codec) Option

WithCodec registers a serialization method with a client or handler. Handlers may have multiple codecs registered, and use whichever the client chooses. Clients may only have a single codec.

By default, handlers and clients support binary Protocol Buffer data using google.golang.org/protobuf/proto. Handlers also support JSON by default, using the standard Protobuf JSON mapping. Users with more specialized needs may override the default codecs by registering a new codec under the "proto" or "json" names. When supplying a custom "proto" codec, keep in mind that some unexported, protocol-specific messages are serialized using Protobuf - take care to fall back to the standard Protobuf implementation if necessary.

Registering a codec with an empty name is a no-op.

func WithCompressMinBytes

func WithCompressMinBytes(minBytes int) Option

WithCompressMinBytes sets a minimum size threshold for compression: regardless of compressor configuration, messages smaller than the configured minimum are sent uncompressed.

The default minimum is zero. Setting a minimum compression threshold may improve overall performance, because the CPU cost of compressing very small messages usually isn't worth the small reduction in network I/O.

func WithIdempotency added in v1.7.0

func WithIdempotency(idempotencyLevel IdempotencyLevel) Option

WithIdempotency declares the idempotency of the procedure. This can determine whether a procedure call can safely be retried, and may affect which request modalities are allowed for a given procedure call.

In most cases, you should not need to manually set this. It is normally set by the code generator for your schema. For protobuf schemas, it can be set like this:

rpc Ping(PingRequest) returns (PingResponse) {
  option idempotency_level = NO_SIDE_EFFECTS;
}

func WithInterceptors

func WithInterceptors(interceptors ...Interceptor) Option

WithInterceptors configures a client or handler's interceptor stack. Repeated WithInterceptors options are applied in order, so

WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)

Unary interceptors compose like an onion. The first interceptor provided is the outermost layer of the onion: it acts first on the context and request, and last on the response and error.

Stream interceptors also behave like an onion: the first interceptor provided is the outermost wrapper for the StreamingClientConn or StreamingHandlerConn. It's the first to see sent messages and the last to see received messages.

Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:

 client.Send()       client.Receive()
       |                   ^
       v                   |
    A ---                 --- A
    B ---                 --- B
    : ...                 ... :
    Y ---                 --- Y
    Z ---                 --- Z
       |                   ^
       v                   |
  = = = = = = = = = = = = = = = =
               network
  = = = = = = = = = = = = = = = =
       |                   ^
       v                   |
    A ---                 --- A
    B ---                 --- B
    : ...                 ... :
    Y ---                 --- Y
    Z ---                 --- Z
       |                   ^
       v                   |
handler.Receive()   handler.Send()
       |                   ^
       |                   |
       '-> handler logic >-'

Note that in clients, Send handles the request message(s) and Receive handles the response message(s). For handlers, it's the reverse. Depending on your interceptor's logic, you may need to wrap one method in clients and the other in handlers.

Example
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
outer := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("outer interceptor: before call")
			res, err := next(ctx, req)
			logger.Println("outer interceptor: after call")
			return res, err
		})
	},
)
inner := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("inner interceptor: before call")
			res, err := next(ctx, req)
			logger.Println("inner interceptor: after call")
			return res, err
		})
	},
)
client := pingv1connect.NewPingServiceClient(
	examplePingServer.Client(),
	examplePingServer.URL(),
	connect.WithInterceptors(outer, inner),
)
if _, err := client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{})); err != nil {
	logger.Println("error:", err)
	return
}
Output:

outer interceptor: before call
inner interceptor: before call
inner interceptor: after call
outer interceptor: after call

func WithOptions

func WithOptions(options ...Option) Option

WithOptions composes multiple Options into one.

func WithReadMaxBytes added in v0.2.0

func WithReadMaxBytes(maxBytes int) Option

WithReadMaxBytes limits the performance impact of pathologically large messages sent by the other party. For handlers, WithReadMaxBytes limits the size of a message that the client can send. For clients, WithReadMaxBytes limits the size of a message that the server can respond with. Limits apply to each Protobuf message, not to the stream as a whole.

Setting WithReadMaxBytes to zero allows any message size. Both clients and handlers default to allowing any request size.

Handlers may also use http.MaxBytesHandler to limit the total size of the HTTP request stream (rather than the per-message size). Connect handles http.MaxBytesError specially, so clients still receive errors with the appropriate error code and informative messages.

func WithSchema added in v1.13.0

func WithSchema(schema any) Option

WithSchema provides a parsed representation of the schema for an RPC to a client or handler. The supplied schema is exposed as [Spec.Schema]. This option is typically added by generated code.

For services using protobuf schemas, the supplied schema should be a [protoreflect.MethodDescriptor].

func WithSendMaxBytes added in v0.4.0

func WithSendMaxBytes(maxBytes int) Option

WithSendMaxBytes prevents sending messages too large for the client/handler to handle without significant performance overhead. For handlers, WithSendMaxBytes limits the size of a message that the handler can respond with. For clients, WithSendMaxBytes limits the size of a message that the client can send. Limits apply to each message, not to the stream as a whole.

Setting WithSendMaxBytes to zero allows any message size. Both clients and handlers default to allowing any message size.

type Peer added in v0.5.0

type Peer struct {
	Addr     string
	Protocol string
	Query    url.Values // server-only
}

Peer describes the other party to an RPC.

When accessed client-side, Addr contains the host or host:port from the server's URL. When accessed server-side, Addr contains the client's address in IP:port format.

On both the client and the server, Protocol is the RPC protocol in use. Currently, it's either ProtocolConnect, ProtocolGRPC, or ProtocolGRPCWeb, but additional protocols may be added in the future.

Query contains the query parameters for the request. For the server, this will reflect the actual query parameters sent. For the client, it is unset.

type Request

type Request[T any] struct {
	Msg *T
	// contains filtered or unexported fields
}

Request is a wrapper around a generated request message. It provides access to metadata like headers and the RPC specification, as well as strongly-typed access to the message itself.

func NewRequest

func NewRequest[T any](message *T) *Request[T]

NewRequest wraps a generated request message.

func (*Request[_]) Any

func (r *Request[_]) Any() any

Any returns the concrete request message as an empty interface, so that *Request implements the AnyRequest interface.

func (*Request[_]) HTTPMethod added in v1.8.0

func (r *Request[_]) HTTPMethod() string

HTTPMethod returns the HTTP method for this request. This is nearly always POST, but side-effect-free unary RPCs could be made via a GET.

On a newly created request, via NewRequest, this will return the empty string until the actual request is actually sent and the HTTP method determined. This means that client interceptor functions will see the empty string until *after* they delegate to the handler they wrapped. It is even possible for this to return the empty string after such delegation, if the request was never actually sent to the server (and thus no determination ever made about the HTTP method).

func (*Request[_]) Header

func (r *Request[_]) Header() http.Header

Header returns the HTTP headers for this request. Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.

func (*Request[_]) Peer added in v0.5.0

func (r *Request[_]) Peer() Peer

Peer describes the other party for this RPC.

func (*Request[_]) Spec

func (r *Request[_]) Spec() Spec

Spec returns a description of this RPC.

type Response

type Response[T any] struct {
	Msg *T
	// contains filtered or unexported fields
}

Response is a wrapper around a generated response message. It provides access to metadata like headers and trailers, as well as strongly-typed access to the message itself.

func NewResponse

func NewResponse[T any](message *T) *Response[T]

NewResponse wraps a generated response message.

func (*Response[_]) Any

func (r *Response[_]) Any() any

Any returns the concrete response message as an empty interface, so that *Response implements the AnyResponse interface.

func (*Response[_]) Header

func (r *Response[_]) Header() http.Header

Header returns the HTTP headers for this response. Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.

func (*Response[_]) Trailer

func (r *Response[_]) Trailer() http.Header

Trailer returns the trailers for this response. Depending on the underlying RPC protocol, trailers may be sent as HTTP trailers or a protocol-specific block of in-body metadata.

Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols: applications may read them but shouldn't write them.

type ServerStream

type ServerStream[Res any] struct {
	// contains filtered or unexported fields
}

ServerStream is the handler's view of a server streaming RPC.

It's constructed as part of Handler invocation, but doesn't currently have an exported constructor.

func (*ServerStream[Res]) Conn added in v0.4.0

func (s *ServerStream[Res]) Conn() StreamingHandlerConn

Conn exposes the underlying StreamingHandlerConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*ServerStream[Res]) ResponseHeader

func (s *ServerStream[Res]) ResponseHeader() http.Header

ResponseHeader returns the response headers. Headers are sent with the first call to Send.

Headers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*ServerStream[Res]) ResponseTrailer

func (s *ServerStream[Res]) ResponseTrailer() http.Header

ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.

Trailers beginning with "Connect-" and "Grpc-" are reserved for use by the Connect and gRPC protocols. Applications shouldn't write them.

func (*ServerStream[Res]) Send

func (s *ServerStream[Res]) Send(msg *Res) error

Send a message to the client. The first call to Send also sends the response headers.

type ServerStreamForClient

type ServerStreamForClient[Res any] struct {
	// contains filtered or unexported fields
}

ServerStreamForClient is the client's view of a server streaming RPC.

It's returned from Client.CallServerStream, but doesn't currently have an exported constructor function.

func (*ServerStreamForClient[Res]) Close

func (s *ServerStreamForClient[Res]) Close() error

Close the receive side of the stream.

func (*ServerStreamForClient[Res]) Conn added in v0.4.0

Conn exposes the underlying StreamingClientConn. This may be useful if you'd prefer to wrap the connection in a different high-level API.

func (*ServerStreamForClient[Res]) Err

func (s *ServerStreamForClient[Res]) Err() error

Err returns the first non-EOF error that was encountered by Receive.

func (*ServerStreamForClient[Res]) Msg

func (s *ServerStreamForClient[Res]) Msg() *Res

Msg returns the most recent message unmarshaled by a call to Receive.

func (*ServerStreamForClient[Res]) Receive

func (s *ServerStreamForClient[Res]) Receive() bool

Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.

func (*ServerStreamForClient[Res]) ResponseHeader

func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header

ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.

func (*ServerStreamForClient[Res]) ResponseTrailer

func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header

ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.

type Spec

type Spec struct {
	StreamType       StreamType
	Schema           any    // for protobuf RPCs, a protoreflect.MethodDescriptor
	Procedure        string // for example, "/acme.foo.v1.FooService/Bar"
	IsClient         bool   // otherwise we're in a handler
	IdempotencyLevel IdempotencyLevel
}

Spec is a description of a client call or a handler invocation.

If you're using Protobuf, protoc-gen-connect-go generates a constant for the fully-qualified Procedure corresponding to each RPC in your schema.

type StreamType

type StreamType uint8

StreamType describes whether the client, server, neither, or both is streaming.

const (
	StreamTypeUnary  StreamType = 0b00
	StreamTypeClient StreamType = 0b01
	StreamTypeServer StreamType = 0b10
	StreamTypeBidi              = StreamTypeClient | StreamTypeServer
)

func (StreamType) String added in v1.8.0

func (s StreamType) String() string

type StreamingClientConn added in v0.2.0

type StreamingClientConn interface {
	// Spec and Peer must be safe to call concurrently with all other methods.
	Spec() Spec
	Peer() Peer

	// Send, RequestHeader, and CloseRequest may race with each other, but must
	// be safe to call concurrently with all other methods.
	Send(any) error
	RequestHeader() http.Header
	CloseRequest() error

	// Receive, ResponseHeader, ResponseTrailer, and CloseResponse may race with
	// each other, but must be safe to call concurrently with all other methods.
	Receive(any) error
	ResponseHeader() http.Header
	ResponseTrailer() http.Header
	CloseResponse() error
}

StreamingClientConn is the client's view of a bidirectional message exchange. Interceptors for streaming RPCs may wrap StreamingClientConns.

StreamingClientConns write request headers to the network with the first call to Send. Any subsequent mutations are effectively no-ops. When the server is done sending data, the StreamingClientConn's Receive method returns an error wrapping io.EOF. Clients should check for this using the standard library's errors.Is. If the server encounters an error during processing, subsequent calls to the StreamingClientConn's Send method will return an error wrapping io.EOF; clients may then call Receive to unmarshal the error.

Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.

StreamingClientConn implementations provided by this module guarantee that all returned errors can be cast to *Error using the standard library's errors.As.

In order to support bidirectional streaming RPCs, all StreamingClientConn implementations must support limited concurrent use. See the comments on each group of methods for details.

type StreamingClientFunc added in v0.2.0

type StreamingClientFunc func(context.Context, Spec) StreamingClientConn

StreamingClientFunc is the generic signature of a streaming RPC from the client's perspective. Interceptors may wrap StreamingClientFuncs.

type StreamingHandlerConn added in v0.2.0

type StreamingHandlerConn interface {
	Spec() Spec
	Peer() Peer

	Receive(any) error
	RequestHeader() http.Header

	Send(any) error
	ResponseHeader() http.Header
	ResponseTrailer() http.Header
}

StreamingHandlerConn is the server's view of a bidirectional message exchange. Interceptors for streaming RPCs may wrap StreamingHandlerConns.

Like the standard library's http.ResponseWriter, StreamingHandlerConns write response headers to the network with the first call to Send. Any subsequent mutations are effectively no-ops. Handlers may mutate response trailers at any time before returning. When the client has finished sending data, Receive returns an error wrapping io.EOF. Handlers should check for this using the standard library's errors.Is.

Headers and trailers beginning with "Connect-" and "Grpc-" are reserved for use by the gRPC and Connect protocols: applications may read them but shouldn't write them.

StreamingHandlerConn implementations provided by this module guarantee that all returned errors can be cast to *Error using the standard library's errors.As.

StreamingHandlerConn implementations do not need to be safe for concurrent use.

type StreamingHandlerFunc added in v0.2.0

type StreamingHandlerFunc func(context.Context, StreamingHandlerConn) error

StreamingHandlerFunc is the generic signature of a streaming RPC from the handler's perspective. Interceptors may wrap StreamingHandlerFuncs.

type UnaryFunc

type UnaryFunc func(context.Context, AnyRequest) (AnyResponse, error)

UnaryFunc is the generic signature of a unary RPC. Interceptors may wrap Funcs.

The type of the request and response structs depend on the codec being used. When using Protobuf, request.Any() and response.Any() will always be proto.Message implementations.

type UnaryInterceptorFunc

type UnaryInterceptorFunc func(UnaryFunc) UnaryFunc

UnaryInterceptorFunc is a simple Interceptor implementation that only wraps unary RPCs. It has no effect on streaming RPCs.

Example
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
loggingInterceptor := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, request connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("calling:", request.Spec().Procedure)
			logger.Println("request:", request.Any())
			response, err := next(ctx, request)
			if err != nil {
				logger.Println("error:", err)
			} else {
				logger.Println("response:", response.Any())
			}
			return response, err
		})
	},
)
client := pingv1connect.NewPingServiceClient(
	examplePingServer.Client(),
	examplePingServer.URL(),
	connect.WithInterceptors(loggingInterceptor),
)
if _, err := client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42})); err != nil {
	logger.Println("error:", err)
	return
}
Output:

calling: /connect.ping.v1.PingService/Ping
request: number:42
response: number:42

func (UnaryInterceptorFunc) WrapStreamingClient added in v0.2.0

func (f UnaryInterceptorFunc) WrapStreamingClient(next StreamingClientFunc) StreamingClientFunc

WrapStreamingClient implements Interceptor with a no-op.

func (UnaryInterceptorFunc) WrapStreamingHandler added in v0.2.0

func (f UnaryInterceptorFunc) WrapStreamingHandler(next StreamingHandlerFunc) StreamingHandlerFunc

WrapStreamingHandler implements Interceptor with a no-op.

func (UnaryInterceptorFunc) WrapUnary

func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc

WrapUnary implements Interceptor by applying the interceptor function.

Directories

Path Synopsis
cmd
protoc-gen-connect-go
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
internal
assert
Package assert is a minimal assert package using generics.
Package assert is a minimal assert package using generics.
gen/connect/ping/v1/pingv1connect
The connect.ping.v1 package contains an echo service designed to test the connect-go implementation.
The connect.ping.v1 package contains an echo service designed to test the connect-go implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL