grpc

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2023 License: MIT Imports: 25 Imported by: 5

README

gRPC module

The package contains basic implementation of gRPC server and client.

Usage

Usage of the package will be descripted by the example where time service was realized.

To import module in your code write following line:

import "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
Protocol

First of all you have to write proto file for your protocol. For example:

syntax = "proto3";

package proto;

option go_package = "github.com/dipdup-net/indexer-sdk/examples/grpc/pb";

import "github.com/dipdup-net/indexer-sdk/pkg/modules/grpc/proto/general.proto";

service TimeService {
    rpc SubscribeOnTime(Request) returns (stream Response);
    rpc UnsubscribeFromTime(UnsubscribeRequest) returns (UnsubscribeResponse);
}

message Request {}

message Response{
    string time = 1;
}

After that generates pb files by your proto file:

protoc -I=. -I=${GOPATH}/src --go-grpc_out=${GOPATH}/src --go_out=${GOPATH}/src ${GOPATH}/src/github.com/dipdup-net/indexer-sdk/examples/grpc/proto/*.proto
Server

Inherit Server module to implement custom gRPC server module. It realizes default Module interface. Also you have to inherit of you unimplemented server generated by proto. For example:

type Server struct {
	*grpc.Server
	pb.UnimplementedTimeServiceServer

	subscriptions *grpc.Subscriptions[time.Time, *pb.Response]

	wg *sync.WaitGroup
}

Subscriptions is the structure describing as Subscription[T any, P any]. It's generic struct containing map with all subscriptions for the stream. It filters models of type T according to subscription pattern and send it to stream if message is applied by pattern. You have to realize Subscription interface to your custom fiteration of message and notification. Example is here.

After creation of server you have to register generated server in you Start function. For example:

func (server *Server) Start(ctx context.Context) {
	pb.RegisterTimeServiceServer(server.Server.Server(), server)

	server.Server.Start(ctx)

	server.wg.Add(1)
	go server.listen(ctx)
}

listen is the function waiting any events to notify subscribers. In example it sends time to subscribers every 5 second.

Next step is definition of SubscribeOnXXX and UnsibscribeFromXXX methods to realize custom stream which calls default implementaion of functions. For example:

// SubscribeOnTime -
func (server *Server) SubscribeOnTime(req *pb.Request, stream pb.TimeService_SubscribeOnTimeServer) error {
	return grpc.DefaultSubscribeOn[time.Time, *pb.Response](
		stream,
		server.subscriptions,
		NewTimeSubscription(),
	)
}

// UnsubscribeFromTime -
func (server *Server) UnsubscribeFromTime(ctx context.Context, req *generalPB.UnsubscribeRequest) (*generalPB.UnsubscribeResponse, error) {
	return grpc.DefaultUnsubscribe(ctx, server.subscriptions, req.Id)
}

DefaultSubscribeOn has the notation:

func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Subscriptions[T, P], subscription Subscription[T, P], handler func(id uint64) error)  error

handler is the function which calls after subscription response before real-time subscription. You can realize synchronization process by handler.

DefaultUnsubscribe has the notation:

func DefaultUnsubscribe[T any, P any](ctx context.Context, subscriptions *Subscriptions[T, P], subscriptionID uint64) (*pb.UnsubscribeResponse, error)
Client

Inherit of Client module and generated client to implement custom gRPC client module. Also you should create output of this module to add it in the workflow.

type Client struct {
	*grpc.Client

	output *modules.Output

	client pb.TimeServiceClient
	wg     *sync.WaitGroup
}

After that implement Start method where initialize generated client. The initilization of generated client has to be in Start because it requires opened connection to server which opens by calling Connect.

// Start -
func (client *Client) Start(ctx context.Context) {
	client.client = pb.NewTimeServiceClient(client.Connection())
}

Then implement SubscribeOnXXX and UnsubscribeFromXXX methods. Code below describes how to connect to server and notifies all client's subscribers about new event.

// SubscribeOnTime -
func (client *Client) SubscribeOnTime(ctx context.Context) (uint64, error) {
	stream, err := client.client.SubscribeOnTime(ctx, new(pb.Request))
	if err != nil {
		return 0, err
	}

	return grpc.Subscribe[*pb.Response](
		stream,
		client.handleTime,
		client.wg,
	)
}

func (client *Client) handleTime(ctx context.Context, data *pb.Response, id messages.SubscriptionID) error {
	log.Info().Str("time", data.Time).Msg("now")
	client.Publisher().Notify(messages.NewMessage(id, data))
	return nil
}

func (client *Client) handleTime(ctx context.Context, data *pb.Response, id uint64) error {
	client.output.Push(data)
	return nil
}

// UnsubscribeFromTime -
func (client *Client) UnsubscribeFromTime(ctx context.Context, id uint64) error {
	if _, err := client.client.UnsubscribeFromTime(ctx, &generalPB.UnsubscribeRequest{
		Id: id,
	}); err != nil {
		return err
	}
	return nil
}

handleTime is a hadler which called on receiving new event from server.

Also you need implement Module interface. It's described here. For example:

// Input -
func (client *Client) Input(name string) (*modules.Input, error) {
	return nil, errors.Wrap(modules.ErrUnknownInput, name)
}

// Output -
func (client *Client) Output(name string) (*modules.Output, error) {
	if name != "time" {
		return nil, errors.Wrap(modules.ErrUnknownOutput, name)
	}
	return client.output, nil
}

// AttachTo -
func (client *Client) AttachTo(name string, input *modules.Input) error {
	output, err := client.Output(name)
	if err != nil {
		return err
	}
	output.Attach(input)
	return nil
}
Inputs ans outputs

The inputs and outputs of your server and client modules must be defined by developer, because the data structure and the notification or processing logic differ from project to project.

Program

Full code of main file is below.

package main

import (
	"context"
	"os"
	"time"

	"github.com/rs/zerolog"
	"github.com/rs/zerolog/log"

	"github.com/dipdup-net/indexer-sdk/pkg/modules"
	"github.com/dipdup-net/indexer-sdk/pkg/modules/grpc"
)

func main() {
	log.Logger = log.Output(zerolog.ConsoleWriter{
		Out:        os.Stdout,
		TimeFormat: "2006-01-02 15:04:05",
	})

	bind := "127.0.0.1:8889"
	serverCfg := grpc.ServerConfig{
		Bind: bind,
	}

	// create server module
	server, err := NewServer(&serverCfg)
	if err != nil {
		log.Panic().Err(err).Msg("creating server error")
		return
	}

	// creating client module
	client := NewClient(bind)

	ctx, cancel := context.WithCancel(context.Background())

	// starting all modules
	server.Start(ctx)

	if err := client.Connect(ctx); err != nil {
		log.Panic().Err(err).Msg("connecting to server error")
		return
	}
	client.Start(ctx)

	// subscribing to time
	subscriptionID, err := client.SubscribeOnTime(ctx)
	if err != nil {
		log.Panic().Err(err).Msg("subscribing error")
		return
	}
	log.Info().Uint64("subscription_id", subscriptionID).Msg("subscribed")

	// creating custom module which receives notification from client and log it to console.
	module := NewCustomModule()

	if err := modules.Connect(client, module, "time", "input"); err != nil {
		log.Panic().Err(err).Msg("module connection error")
		return
	}

	module.Start(ctx)

	time.Sleep(time.Minute)

	if err := client.UnsubscribeFromTime(ctx, subscriptionID); err != nil {
		log.Panic().Err(err).Msg("unsubscribing error")
		return
	}
	log.Info().Msg("unsubscribed")

	cancel()

	// closing all modules
	if err := client.Close(); err != nil {
		log.Panic().Err(err).Msg("closing client error")
	}
	if err := module.Close(); err != nil {
		log.Panic().Err(err).Msg("closing custo module error")
	}
	if err := server.Close(); err != nil {
		log.Panic().Err(err).Msg("closing server error")
	}
}

Documentation

Index

Constants

View Source
const (
	SuccessMessage = "success"
)

Default messages

Variables

This section is empty.

Functions

func DefaultSubscribeOn

func DefaultSubscribeOn[T any, P any](stream ServerStream[P], subscriptions *Subscriptions[T, P], subscription Subscription[T, P], handler func(id uint64) error, onEndOfSync func(id uint64) error) error

DefaultSubscribeOn - default subscribe server handler

func DefaultUnsubscribe

func DefaultUnsubscribe[T any, P any](ctx context.Context, subscriptions *Subscriptions[T, P], subscriptionID uint64) (*pb.UnsubscribeResponse, error)

DefaultUnsubscribe - default unsubscribe server handler

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client - the structure which is responsible for connection to server

func NewClient

func NewClient(server string) *Client

NewClient - constructor of client structure

func (*Client) Close

func (client *Client) Close() error

Close - closes authentication client module

func (*Client) Connect

func (client *Client) Connect(ctx context.Context, opts ...ConnectOption) error

Connect - connects to server

func (*Client) Connection

func (client *Client) Connection() *grpc.ClientConn

Connection - receives connection entity

func (*Client) Name

func (client *Client) Name() string

Name -

func (*Client) Reconnect

func (client *Client) Reconnect() <-chan struct{}

Reconnect - returns channel with reconnection events

func (*Client) Start

func (client *Client) Start(ctx context.Context)

Start - starts authentication client module

type ConnectOption

type ConnectOption func(opts *ConnectOptions)

ConnectOption -

func WaitServer

func WaitServer() ConnectOption

WaitServer -

func WithReconnectTimeout

func WithReconnectTimeout(duration time.Duration) ConnectOption

WithReconnectTimeout -

func WithReconnectionTime

func WithReconnectionTime(duration time.Duration) ConnectOption

WithReconnectionTime -

func WithTlsFromCert

func WithTlsFromCert(domain string) ConnectOption

WithTlsFromCert -

func WithUserAgent

func WithUserAgent(s string) ConnectOption

WithUserAgent -

type ConnectOptions

type ConnectOptions struct {
	// contains filtered or unexported fields
}

ConnectOptions -

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server - basic server structure which implemented module interface and handle stats endpoints.

func NewServer

func NewServer(cfg *ServerConfig) (*Server, error)

NewServer - constructor of server struture

func (*Server) Close

func (module *Server) Close() error

Close - closes server module

func (*Server) Name

func (*Server) Name() string

Name -

func (*Server) Server

func (module *Server) Server() *gogrpc.Server

Server - returns current grpc.Server to register handlers

func (*Server) Start

func (module *Server) Start(ctx context.Context)

Start - starts server module

type ServerConfig

type ServerConfig struct {
	Bind    string `yaml:"bind" validate:"required,hostname_port"`
	Log     bool   `yaml:"log" validate:"omitempty"`
	Metrics bool   `yaml:"metrics" validate:"omitempty"`
	RPS     int    `yaml:"rps" validate:"omitempty,min=1"`
}

ServerConfig - config for server

type ServerStream

type ServerStream[T any] interface {
	Send(T) error
	grpc.ServerStream
}

ServerStream -

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream -

func NewStream

func NewStream[T any](stream grpc.ClientStream) *Stream[T]

NewStream - creates new stream

func (*Stream[T]) Close

func (s *Stream[T]) Close() error

Close - closes stream

func (*Stream[T]) Context

func (s *Stream[T]) Context() context.Context

Context -

func (*Stream[T]) Listen

func (s *Stream[T]) Listen() <-chan *T

Listen - channel with received messages

func (*Stream[T]) Subscribe

func (s *Stream[T]) Subscribe(ctx context.Context) (uint64, error)

Subscribe - generic function to subscribe on service stream

func (*Stream[T]) Unsubscribe

func (s *Stream[T]) Unsubscribe(ctx context.Context, id uint64) error

Unsubscribe -

type Subscription

type Subscription[T any, P any] interface {
	Filter(typ T) bool
	Send(msg P)
	Listen() <-chan P
	io.Closer
}

Subscription - general interface for subscription

type Subscriptions

type Subscriptions[T any, P any] struct {
	// contains filtered or unexported fields
}

Subscriptions -

func NewSubscriptions

func NewSubscriptions[T any, P any]() *Subscriptions[T, P]

NewSubscriptions -

func (*Subscriptions[T, P]) Add

func (s *Subscriptions[T, P]) Add(id uint64, subscription Subscription[T, P])

Add -

func (*Subscriptions[T, P]) Close

func (s *Subscriptions[T, P]) Close() error

Close -

func (*Subscriptions[T, P]) Get

func (s *Subscriptions[T, P]) Get(id uint64) (Subscription[T, P], bool)

Get -

func (*Subscriptions[T, P]) NotifyAll

func (s *Subscriptions[T, P]) NotifyAll(typ T, converter func(uint64, T) P)

NotifyAll -

func (*Subscriptions[T, P]) Remove

func (s *Subscriptions[T, P]) Remove(id uint64) error

Remove -

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL