fxgrpcserver

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2024 License: MIT Imports: 22 Imported by: 0

README

Fx gRPC Server Module

ci go report codecov Deps PkgGoDev

Fx module for grpcserver.

Installation

go get github.com/ankorstore/yokai/fxgrpcserver

Features

This module provides the possibility to provide to your Fx application a gRPC server with:

  • automatic panic recovery
  • automatic reflection
  • automatic logging and tracing (method, duration, status, ...)
  • automatic metrics
  • automatic healthcheck
  • possibility to register gRPC server options, interceptors and services

Documentation

Dependencies

This module is intended to be used alongside:

Loading

To load the module in your Fx application:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule, // load the module
	).Run()
}
Configuration

Configuration reference:

# ./configs/config.yaml
app:
  name: app
  env: dev
  version: 0.1.0
  debug: true
modules:
  log:
    level: info
    output: stdout
  trace:
    processor:
      type: stdout
  grpc:
    server:
      address: ":50051"             # gRPC server listener address (default :50051)
      log:
        metadata:                   # list of gRPC metadata to add to logs on top of x-request-id, empty by default
          x-foo: foo                # to log for example the metadata x-foo in the log field foo
          x-bar: bar
        exclude:                    # list of gRPC methods to exclude from logging, empty by default
          - /test.Service/Unary
      trace:
        enabled: true               # to trace gRPC calls, disabled by default
        exclude:                    # list of gRPC methods to exclude from tracing, empty by default
          - /test.Service/Bidi
      metrics:
        collect:
          enabled: true             # to collect gRPC server metrics, disabled by default
          namespace: foo            # gRPC server metrics namespace (empty by default)
          subsystem: bar            # gRPC server metrics subsystem (empty by default)
        buckets: 0.1, 1, 10         # to override default request duration buckets (default prometheus.DefBuckets)
      reflection:
        enabled: true               # to expose gRPC reflection service, disabled by default
      healthcheck:
        enabled: true               # to expose gRPC healthcheck service, disabled by default
      test:
        bufconn:
          size: 1048576             # test gRPC bufconn size, 1024*1024 by default

Notes:

  • the gRPC calls logging will be based on the fxlog module configuration
  • the gRPC calls tracing will be based on the fxtrace module configuration
  • if a request to an excluded gRPC method fails, the gRPC server will still log for observability purposes.
Registration

This module offers the possibility to easily register gRPC server options, interceptors and services.

gRPC server options

This module offers the AsGrpcServerOptions() function to easily register your gRPC server options.

For example:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/proto"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/service"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"go.uber.org/fx"
	"google.golang.org/grpc"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule, // load the module
		fx.Provide(
			// configure the server send and receive max message size
			fxgrpcserver.AsGrpcServerOptions(
				grpc.MaxSendMsgSize(1000),
				grpc.MaxRecvMsgSize(1000),
			),
		),
	).Run()
}
gRPC server interceptors

This module offers the possibility to easily register your gRPC server interceptors:

  • AsGrpcServerUnaryInterceptor() to register a server unary interceptor
  • AsGrpcServerStreamInterceptor() to register a server stream interceptor

For example, with UnaryInterceptor and StreamInterceptor interceptors:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/interceptor"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/proto"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/service"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule, // load the module
		fx.Provide(
			// registers UnaryInterceptor as server unary interceptor
			fxgrpcserver.AsGrpcServerUnaryInterceptor(interceptor.NewUnaryInterceptor),
			// registers StreamInterceptor as server stream interceptor
			fxgrpcserver.AsGrpcServerStreamInterceptor(interceptor.NewStreamInterceptor), 
		),
	).Run()
}
gRPC server services

This module offers the AsGrpcServerService() function to easily register your gRPC server services and their definitions.

For example, with the TestService, server implementation for the test.proto:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/proto"
	"github.com/ankorstore/yokai/fxgrpcserver/testdata/service"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"go.uber.org/fx"
)

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule, // load the module
		fx.Provide(
			fxgrpcserver.AsGrpcServerService(service.NewTestServiceServer, &proto.Service_ServiceDesc), // register the TestServiceServer for the proto.Service_ServiceDesc
		),
	).Run()
}
Reflection

This module provides the possibility to enable gRPC server reflection with modules.grpc.server.reflection.enabled=true.

Reflection usage is helpful for developing or testing your gRPC services, but it is not recommended for production usage (disabled by default).

Healthcheck

This module automatically expose the GrpcHealthCheckService with modules.grpc.server.healthcheck.enabled=true, to offer the Check and Watch RPCs, suitable for k8s gRPC startup, readiness or liveness probes.

You can use the fxhealthcheck.AsCheckerProbe() function to register several CheckerProbe (more details on the fxhealthcheck module documentation).

package main

import (
	"context"

	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"github.com/ankorstore/yokai/healthcheck"
	"go.uber.org/fx"
)

// success probe
type SuccessProbe struct{}

func NewSuccessProbe() *SuccessProbe {
	return &SuccessProbe{}
}

func (p *SuccessProbe) Name() string {
	return "successProbe"
}

func (p *SuccessProbe) Check(ctx context.Context) *healthcheck.CheckerProbeResult {
	return healthcheck.NewCheckerProbeResult(true, "success")
}

// failure probe
type FailureProbe struct{}

func NewFailureProbe() *FailureProbe {
	return &FailureProbe{}
}

func (p *FailureProbe) Name() string {
	return "failureProbe"
}

func (p *FailureProbe) Check(ctx context.Context) *healthcheck.CheckerProbeResult {
	return healthcheck.NewCheckerProbeResult(false, "failure")
}

// usage
func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule, // load the module
		fx.Provide(
			fxhealthcheck.AsCheckerProbe(NewSuccessProbe),                       // register the SuccessProbe probe for startup, liveness and readiness checks
			fxhealthcheck.AsCheckerProbe(NewFailureProbe, healthcheck.Liveness), // register the FailureProbe probe for liveness checks only
		),
	).Run()
}

In this example, the GrpcHealthCheckService will:

  • run the liveness probes checks if the request service name contains liveness (like kubernetes::liveness) and will return a check failure
  • or run the readiness probes checks if the request service name contains readiness (like kubernetes::readiness) and will return a check success
  • or run the startup probes checks otherwise, and will return a check success
Override

By default, the grpc.Server is created by the DefaultGrpcServerFactory.

If needed, you can provide your own factory and override the module:

package main

import (
	"github.com/ankorstore/yokai/fxconfig"
	"github.com/ankorstore/yokai/fxgenerate"
	"github.com/ankorstore/yokai/fxgrpcserver"
	"github.com/ankorstore/yokai/fxhealthcheck"
	"github.com/ankorstore/yokai/fxlog"
	"github.com/ankorstore/yokai/fxmetrics"
	"github.com/ankorstore/yokai/fxtrace"
	"github.com/ankorstore/yokai/grpcserver"
	"go.uber.org/fx"
	"google.golang.org/grpc"
)

type CustomGrpcServerFactory struct{}

func NewCustomGrpcServerFactory() grpcserver.GrpcServerFactory {
	return &CustomGrpcServerFactory{}
}

func (f *CustomGrpcServerFactory) Create(options ...grpcserver.GrpcServerOption) (*grpc.Server, error) {
	return grpc.NewServer(...), nil
}

func main() {
	fx.New(
		fxconfig.FxConfigModule, // load the module dependencies
		fxlog.FxLogModule,
		fxtrace.FxTraceModule,
		fxgenerate.FxGenerateModule,
		fxmetrics.FxMetricsModule,
		fxhealthcheck.FxHealthcheckModule,
		fxgrpcserver.FxGrpcServerModule,          // load the module
		fx.Decorate(NewCustomGrpcServerFactory),  // override the module with a custom factory
		fx.Invoke(func(grpcServer *grpc.Server) { // invoke the gRPC server
			// ...
		}),
	).Run()
}
Testing

This module provides a *bufconn.Listener that will automatically be used by the gRPC server in test mode.

You can create connections for your gRPC clients, using this listener, with the TestBufconnConnectionFactory.

You can find tests examples in this module own tests.

Documentation

Index

Constants

View Source
const (
	ModuleName         = "grpcserver"
	DefaultAddress     = ":50051"
	DefaultBufconnSize = 1024 * 1024
)

Variables

FxGrpcServerModule is the Fx grpcserver module.

Functions

func AsGrpcServerOptions

func AsGrpcServerOptions(options ...grpc.ServerOption) fx.Option

AsGrpcServerOptions registers a list of grpc server options into Fx.

func AsGrpcServerService

func AsGrpcServerService(constructor any, description *grpc.ServiceDesc) fx.Option

AsGrpcServerService registers a grpc server service into Fx.

func AsGrpcServerStreamInterceptor

func AsGrpcServerStreamInterceptor(constructor any) fx.Option

AsGrpcServerStreamInterceptor registers a grpc server stream interceptor into Fx.

func AsGrpcServerUnaryInterceptor

func AsGrpcServerUnaryInterceptor(constructor any) fx.Option

AsGrpcServerUnaryInterceptor registers a grpc server unary interceptor into Fx.

func GetReturnType

func GetReturnType(target any) string

GetReturnType returns the return type of a target.

func GetType

func GetType(target any) string

GetType returns the type of a target.

func NewFxGrpcDefaultTestBufconnConnectionFactory added in v1.3.0

NewFxGrpcDefaultTestBufconnConnectionFactory returns a new grpcservertest.DefaultTestBufconnConnectionFactory.

func NewFxGrpcServer

func NewFxGrpcServer(p FxGrpcServerParam) (*grpc.Server, error)

NewFxGrpcServer returns a new grpc.Server.

func NewFxGrpcTestBufconnListener added in v1.3.0

func NewFxGrpcTestBufconnListener(p FxGrpcTestBufconnListenerParam) *bufconn.Listener

NewFxGrpcTestBufconnListener returns a new bufconn.Listener.

func Sanitize

func Sanitize(str string) string

Sanitize transforms a given string to not contain spaces or dashes, and to be in lower case.

func Split

func Split(str string) []string

Split trims and splits a provided string by comma.

Types

type FxGrpcServerModuleInfo

type FxGrpcServerModuleInfo struct {
	Address  string
	Services map[string]grpc.ServiceInfo
}

FxGrpcServerModuleInfo is a module info collector for fxcore.

func NewFxGrpcServerModuleInfo

func NewFxGrpcServerModuleInfo(grpcServer *grpc.Server, cfg *config.Config) *FxGrpcServerModuleInfo

NewFxGrpcServerModuleInfo returns a new FxGrpcServerModuleInfo.

func (*FxGrpcServerModuleInfo) Data

func (i *FxGrpcServerModuleInfo) Data() map[string]interface{}

Data return the data of the module info.

func (*FxGrpcServerModuleInfo) Name

func (i *FxGrpcServerModuleInfo) Name() string

Name return the name of the module info.

type FxGrpcServerParam

type FxGrpcServerParam struct {
	fx.In
	LifeCycle       fx.Lifecycle
	Factory         grpcserver.GrpcServerFactory
	Generator       uuid.UuidGenerator
	Listener        *bufconn.Listener
	Registry        *GrpcServerRegistry
	Config          *config.Config
	Logger          *log.Logger
	Checker         *healthcheck.Checker
	TracerProvider  trace.TracerProvider
	MetricsRegistry *prometheus.Registry
}

FxGrpcServerParam allows injection of the required dependencies in [NewFxGrpcBufconnListener].

type FxGrpcServiceRegistryParam

type FxGrpcServiceRegistryParam struct {
	fx.In
	Options            []grpc.ServerOption           `group:"grpc-server-options"`
	UnaryInterceptors  []GrpcServerUnaryInterceptor  `group:"grpc-server-unary-interceptors"`
	StreamInterceptors []GrpcServerStreamInterceptor `group:"grpc-server-stream-interceptors"`
	Services           []any                         `group:"grpc-server-services"`
	Definitions        []GrpcServerServiceDefinition `group:"grpc-server-service-definitions"`
}

FxGrpcServiceRegistryParam allows injection of the required dependencies in NewFxGrpcServerRegistry.

type FxGrpcTestBufconnConnectionFactoryParam added in v1.3.0

type FxGrpcTestBufconnConnectionFactoryParam struct {
	fx.In
	Listener *bufconn.Listener
}

FxGrpcTestBufconnConnectionFactoryParam allows injection of the required dependencies in NewFxGrpcDefaultTestBufconnConnectionFactory.

type FxGrpcTestBufconnListenerParam added in v1.3.0

type FxGrpcTestBufconnListenerParam struct {
	fx.In
	Config *config.Config
}

FxGrpcTestBufconnListenerParam allows injection of the required dependencies in NewFxGrpcTestBufconnListener.

type GrpcServerRegistry

type GrpcServerRegistry struct {
	// contains filtered or unexported fields
}

GrpcServerRegistry is the registry collecting grpc server options, interceptors, services and their definitions.

func NewFxGrpcServerRegistry

func NewFxGrpcServerRegistry(p FxGrpcServiceRegistryParam) *GrpcServerRegistry

NewFxGrpcServerRegistry returns as new GrpcServerRegistry.

func (*GrpcServerRegistry) ResolveGrpcServerOptions

func (r *GrpcServerRegistry) ResolveGrpcServerOptions() []grpc.ServerOption

ResolveGrpcServerOptions resolves a list of grpc server options.

func (*GrpcServerRegistry) ResolveGrpcServerServices

func (r *GrpcServerRegistry) ResolveGrpcServerServices() ([]*ResolvedGrpcServerService, error)

ResolveGrpcServerServices resolves a list of ResolvedGrpcServerService from their definitions.

func (*GrpcServerRegistry) ResolveGrpcServerStreamInterceptors

func (r *GrpcServerRegistry) ResolveGrpcServerStreamInterceptors() []GrpcServerStreamInterceptor

ResolveGrpcServerStreamInterceptors resolves a list of grpc server stream interceptors.

func (*GrpcServerRegistry) ResolveGrpcServerUnaryInterceptors

func (r *GrpcServerRegistry) ResolveGrpcServerUnaryInterceptors() []GrpcServerUnaryInterceptor

ResolveGrpcServerUnaryInterceptors resolves a list of grpc server unary interceptors.

type GrpcServerServiceDefinition

type GrpcServerServiceDefinition interface {
	ReturnType() string
	Description() *grpc.ServiceDesc
}

GrpcServerServiceDefinition is the interface for grpc server services definitions.

func NewGrpcServiceDefinition

func NewGrpcServiceDefinition(returnType string, description *grpc.ServiceDesc) GrpcServerServiceDefinition

NewGrpcServiceDefinition returns a new GrpcServerServiceDefinition instance.

type GrpcServerStreamInterceptor

type GrpcServerStreamInterceptor interface {
	HandleStream() grpc.StreamServerInterceptor
}

GrpcServerStreamInterceptor is the interface for grpc server stream interceptors.

type GrpcServerUnaryInterceptor

type GrpcServerUnaryInterceptor interface {
	HandleUnary() grpc.UnaryServerInterceptor
}

GrpcServerUnaryInterceptor is the interface for grpc server unary interceptors.

type ResolvedGrpcServerService

type ResolvedGrpcServerService struct {
	// contains filtered or unexported fields
}

ResolvedGrpcServerService is an interface for the resolved grpc server services.

func NewResolvedGrpcService

func NewResolvedGrpcService(implementation any, description *grpc.ServiceDesc) *ResolvedGrpcServerService

NewResolvedGrpcService returns a new ResolvedGrpcServerService.

func (*ResolvedGrpcServerService) Description

func (r *ResolvedGrpcServerService) Description() *grpc.ServiceDesc

Description return the resolved grpc server service description.

func (*ResolvedGrpcServerService) Implementation

func (r *ResolvedGrpcServerService) Implementation() any

Implementation return the resolved grpc server service implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL