rpcx-postgres

module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: MIT

README

Features

  • PostgreSQL-based service discovery for rpcx by @smallnest
  • Real-time service updates using PostgreSQL NOTIFY/LISTEN
  • Automatic cleanup of stale services

Requirements

  • PostgreSQL 13 or higher

Configuration

The plugin can be configured with the following options:

Server Plugin

type PostgresRegisterPlugin struct {
    Table          string        // Table name for service registration (Default: "rpcx_services")
    ServicePath    string        // Service registration path
    ServiceAddress string        // Service address (e.g., "tcp@localhost:8972")
    UpdateInterval time.Duration // Interval for updating service TTL
}

Client Discovery

type PostgresDiscoveryOption struct {
    Table      string                         // Table name for service discovery (Default: "rpcx_services")
    RetryCount int                            // Retry count for watch operations (-1 for infinite)
    Filter     client.ServiceDiscoveryFilter  // Optional filter for services
}

Example

Here's a complete example showing both server and client:

package main

import (
    "context"
    "flag"
    "log"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/smallnest/rpcx/server"
    "github.com/smallnest/rpcx/client"
    pgregistry "github.com/dapona/rpcx-postgres"
)

type Arith struct{}

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    return nil
}

type Args struct {
    A int `msg:"a"`
    B int `msg:"b"`
}

type Reply struct {
    C int `msg:"c"`
}

// Server example
func runServer(pool *pgxpool.Pool) error {
    s := server.NewServer()

    // Create the plugin
    plugin, err := pgregistry.NewPostgresRegisterPlugin(
        context.Background(),
        pool,
        "tcp@localhost:8972",    // service address
        "examples/arith",        // service path
        "rpcx_services",         // table name
        30*time.Second,          // update interval
    )
    if err != nil {
        return err
    }

    err = plugin.Start(context.Background())
    if err != nil {
        return err
    }
    defer plugin.Stop()

    // Add plugin to server
    s.Plugins.Add(plugin)

    // Register service
    s.RegisterName("Arith", new(Arith), "")

    // Start server
    return s.Serve("tcp", ":8972")
}

// Client example
func runClient(pool *pgxpool.Pool) error {
    // Create service discovery
    discovery, err := pgregistry.NewPostgresDiscoveryWithPool(
        context.Background(),
        "examples/arith",         // service path
        pool,
        "tcp@localhost:8973",     // client address
        &pgregistry.PostgresDiscoveryOption{
            RetryCount: -1,       // infinite retries
            "rpcx_services",      // table name
        },
    )
    if err != nil {
        return err
    }
    defer discovery.Close()

    // Create rpcx client
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, discovery, client.DefaultOption)
    defer xclient.Close()

    // Call service
    args := &Args{A: 10, B: 20}
    reply := &Reply{}

    err = xclient.Call(context.Background(), "Mul", args, reply)
    if err != nil {
        return err
    }

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    return nil
}

func main() {
    flag.Parse()

    // Create PostgreSQL connection pool
    pool, err := pgxpool.New(context.Background(), *postgresURL)
    if err != nil {
        log.Fatalf("Failed to create pool: %v", err)
    }
    defer pool.Close()

    // Run server in a goroutine
    go func() {
        if err := runServer(pool); err != nil {
            log.Printf("Server error: %v", err)
        }
    }()

    // Wait for server to start
    time.Sleep(time.Second)

    // Run client
    if err := runClient(pool); err != nil {
        log.Printf("Client error: %v", err)
    }
}

License

MIT License

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL