grpcdispatcher

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2024 License: Apache-2.0 Imports: 23 Imported by: 4

README

gRPC-Dispatcher

gRPC-Dispatcher is a Go library that facilitates sending queries to multiple gRPC servers running on Kubernetes simultaneously

Introduction

gRPC-Dispatcher is a Go library that facilitates sending queries to multiple gRPC servers running on Kubernetes simultaneously. It simplifies the process of managing connections to gRPC servers hosted on Kubernetes pods by providing a dispatcher that keeps track of pod IP addresses and sends fanout queries to all servers simultaneously. The dispatcher supports both instantaneous fanout queries (sending queries to all currently available servers) and subscription-based fanout queries (sending queries also as new servers become available).

Currently, the library supports gRPC servers running behind a Kubernetes service and it has the following features:

  • Uses Kubernetes API to get pod IPs using highly efficient EndpointSlices
  • Connects to new servers eagerly in background so new queries are instantaneous
  • Supports long-running streaming queries to ephemeral servers

Try it out and let us know what you think! If you notice any bugs or have any feature requests just create a GitHub Issue.

Quickstart

First, add the library to your Go project:

go get github.com/kubetail-org/grpc-dispatcher-go

Next write your dispatch code:

import (
  "context"

  "github.com/kubetail-org/grpc-dispatcher-go"
  "google.golang.org/grpc"
)

func main() {
  // initialize new dispatcher
  dispatcher, err := grpcdispatcher.NewDispatcher(
    "kubernetes://my-service.my-namespace",
    grpcdispatcher.WithDialOptions(
      grpc.WithTransportCredentials(insecure.NewCredentials()),
    ),
  )
  if err != nil {
    return
  }
  defer dispatcher.Shutdown()

  // start background processes and initialize connections to servers
  dispatcher.Start()
  
  // wait until dispatcher is ready
  ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  defer cancel()
  dispatcher.Ready(ctx)

  // all handler contexts will inherit from this context
  rootCtx := context.Background()
  
  // send query to all current grpc servers
  dispatcher.Fanout(rootCtx, func(ctx context.Context, conn *grpc.ClientConn) {
    // init grpc client
    client := examplepb.NewExampleServiceClient(conn)

    // execute grpc request
    resp, err := client.Echo(ctx, &examplepb.EchoRequest{Message: "hello"})
    if err != nil {
      // do something with error
      fmt.Println(err)
      return
    }

    // do something with response
    fmt.Println(resp)
  })
  
  // send query to all current and future grpc servers
  sub, err := dispatcher.FanoutSubscribe(rootCtx, func(ctx context.Context, conn *grpc.ClientConn) error {
    // init grpc client
    client := examplepb.NewExampleServiceClient(conn)

    // execute grpc request
    resp, err := client.Echo(ctx, &examplepb.EchoRequest{Message: "hello"})
    if err != nil {
      // do something with error
      fmt.Println(err)
      return
    }

    // do something with response
    fmt.Println(resp)
  })
  if err != nil {
    panic(err)
  }
  defer sub.Unsubscribe()
}

[!IMPORTANT] gRPC-Dispatcher needs list and watch permisions for the endpointslices resource in the Kubernetes API

Docs

See Go docs for library documentation.

Example

You can see an example implementation in the example/ directory. To run the example in a Kubernetes environment see the Develop section below.

Develop

To develop gRPC-Dispatcher, first create a Kubernetes dev cluster using a dev cluster tool that works with Tilt. To automate the process you can also use ctlptl and one of the configs available in the hack/ctlptl directory. For example, to create a dev cluster using minikube you can use this command:

ctlptl apply -f hack/ctlptl/minikube.yaml

Once the dev cluster is running and kubectl is pointing to it, you can bring up the dev environment using Tilt. This will create a web app that queries multiple gRPC servers using the code in the example directory:

tilt up

Once the web app is running you can access it on port 4000 on your localhost: http://localhost:4000

To teardown the dev environment run these commands:

tilt down
ctlptl delete -f hack/ctlptl/minikube.yaml

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DispatchHandler

type DispatchHandler func(ctx context.Context, conn *grpc.ClientConn)

Represents the callback argument to the dispatch methods

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

A Dispatcher is a utility that facilitates sending queries to multiple grpc servers simultaneously. It maintains an up-to-date list of all pod ips that are part of a Kubernetes service and directs queries to the ips.

func NewDispatcher

func NewDispatcher(connectUrl string, options ...DispatcherOption) (*Dispatcher, error)

func (*Dispatcher) Fanout

func (d *Dispatcher) Fanout(ctx context.Context, fn DispatchHandler)

Sends queries to all available ips at query-time

func (*Dispatcher) FanoutSubscribe

func (d *Dispatcher) FanoutSubscribe(ctx context.Context, fn DispatchHandler) (*Subscription, error)

Sends queries to all available ips at query-time and all subsequent ips when they become available until Unsubscribe() is called

func (*Dispatcher) Ready

func (d *Dispatcher) Ready(ctx context.Context)

Wait until informer has synced with Kubernetes API

func (*Dispatcher) Shutdown

func (d *Dispatcher) Shutdown() error

Closes connection which also stops resolver background processes

func (*Dispatcher) Start

func (d *Dispatcher) Start()

Start background processes and initialize subconns

type DispatcherOption

type DispatcherOption func(*dispatcherOptions)

DispatcherOption configures how we set up the dispatcher

func WithDialOptions

func WithDialOptions(dialOpts ...grpc.DialOption) DispatcherOption

WithDialOptions configures the DialOptions to use when initializing a new grpc connection

func WithKubernetesClientset

func WithKubernetesClientset(clientset kubernetes.Interface) DispatcherOption

WithKubernetesClientset configures the dispatcher to use the clientset when querying Kubernetes API

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Represents interest in pod ips that are part of a Kubernetes service

func (*Subscription) Unsubscribe

func (sub *Subscription) Unsubscribe()

Ends subscription

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL