rpcmq

package module
v0.0.0-...-0bb4887 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2015 License: BSD-3-Clause Imports: 11 Imported by: 0

README

rpcmq GoDoc

Introduction

The package rpcmq implements an RPC protocol over AMQP.

Installation

go get github.com/jroimartin/rpcmq

Documentation

godoc github.com/jroimartin/rpcmq

Examples

The following snippets show how easy is to implement both an RPC client and server using rpcmq. In this example the server registers a new method called "toUpper" that takes an string and convert it to uppercase. On the other hand, the RPC client will invoke this method remotely. Then, after 10 seconds, the client and the server will shutdown.

server.go

package main

import (
	"log"
	"strings"
	"time"

	"github.com/jroimartin/rpcmq"
)

func main() {
	s := rpcmq.NewServer("amqp://amqp_broker:5672",
		"rcp-queue", "rpc-exchange", "direct")
	if err := s.Register("toUpper", toUpper); err != nil {
		log.Fatalf("Register: %v", err)
	}
	if err := s.Init(); err != nil {
		log.Fatalf("Init: %v", err)
	}
	defer s.Shutdown()

	time.Sleep(1 * time.Minute)
}

func toUpper(data []byte) ([]byte, error) {
	log.Printf("Received: toUpper(%v)\n", string(data))
	return []byte(strings.ToUpper(string(data))), nil
}

client.go

package main

import (
	"log"
	"time"

	"github.com/jroimartin/rpcmq"
)

func main() {
	c := rpcmq.NewClient("amqp://amqp_broker:5672",
		"rcp-queue", "rpc-client", "rpc-exchange", "direct")
	if err := c.Init(); err != nil {
		log.Fatalf("Init: %v", err)
	}
	defer c.Shutdown()

	go func() {
		for {
			data := []byte("Hello gophers!")
			uuid, err := c.Call("toUpper", data, 0)
			if err != nil {
				log.Println("Call:", err)
			}
			log.Printf("Sent: toUpper(%v) (%v)\n", string(data), uuid)
			time.Sleep(500 * time.Millisecond)
		}
	}()

	go func() {
		for r := range c.Results() {
			if r.Err != "" {
				log.Printf("Received error: %v (%v)", r.Err, r.UUID)
				continue
			}
			log.Printf("Received: %v (%v)\n", string(r.Data), r.UUID)
		}
	}()

	time.Sleep(1 * time.Minute)
}

This code will generate the following output:

screen shot 2015-03-03 at 11 56 33

Documentation

Overview

Package rpcmq implements an RPC protocol over AMQP.

Client/Server initialization

It is important to note that both clients and servers must be initialized before being used. Also, any configuration parameter (e.g. TLSConfig or Parallel) should be set up before calling Init().

SSL/TLS

When connecting to the broker via amqps protocol, the TLS configuration can be set up using the TLSConfig parameter present in the Client and Server objects. For more information, see the documentation of the package "crypto/tls".

Index

Constants

This section is empty.

Variables

Log is the logger used to register warnings and info messages. If it is nil, no messages will be logged.

View Source
var RetrySleep time.Duration = 2 * time.Second

RetrySleep is the time between retries if the connection with the broker is lost.

Functions

This section is empty.

Types

type Client

type Client struct {

	// TLSConfig allows to configure the TLS parameters used to connect to
	// the broker via amqps
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

A Client is an RPC client, which is used to invoke remote procedures.

func NewClient

func NewClient(uri, msgsQueue, repliesQueue, exchange, kind string) *Client

NewClient returns a reference to a Client object. The paremeter uri is the network address of the broker and msgsQueue/repliesQueue are the names of queues that will be created to exchange the messages between clients and servers. On the other hand, the parameters exchange and kind determine the type of exchange that will be created. In fanout mode the queue name is ignored, so each queue has its own unique id.

func (*Client) Call

func (c *Client) Call(method string, data []byte, ttl time.Duration) (id string, err error)

Call invokes the remote procedure specified by the parameter method, being the parameter data the input passed to it. On the other hand, ttl is the time that this task will remain in the queue before being considered dead. The returned id can be used to identify the result corresponding to each invokation. If ttl is 0, the message will not expire.

func (*Client) Init

func (c *Client) Init() error

Init initializes the Client object. It establishes the connection with the broker, creating a channel and the queues that will be used under the hood.

func (*Client) Results

func (c *Client) Results() <-chan Result

Results returns a channel used to receive the results returned by the invoked procedures.

func (*Client) Shutdown

func (c *Client) Shutdown()

Shutdown shuts down the client gracefully. Using this method will ensure that all replies sent by the RPC servers to the client will be received by the latter.

type Function

type Function func(id string, data []byte) ([]byte, error)

The type Function declares the signature of the methods that can be registered by an RPC server. The id parameter contains the uuid of the task being executed.

type Result

type Result struct {
	UUID string
	Data []byte
	Err  string
}

A Result contains the data returned by the invoked procedure or an error message, in case that it finished with error. The UUID allows to link the result with the procedure call.

type Server

type Server struct {

	// Parallel allows to define the number of methods to be run in
	// parallel
	Parallel int
	// Prefetch allows to define the number of tasks to be "cached"
	Prefetch int
	// TLSConfig allows to configure the TLS parameters used to connect to
	// the broker via amqps
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

A Server is an RPC sever, which is used to register the methods than can be invoked remotely.

func NewServer

func NewServer(uri, msgsQueue, exchange, kind string) *Server

NewServer returns a reference to a Server object. The paremeter uri is the network address of the broker and msgsQueue is the name of queue that will be created to exchange the messages between clients and servers. On the other hand, the parameters exchange and kind determine the type of exchange that will be created. In fanout mode the queue name is ignored, so each queue has its own unique id.

func (*Server) Init

func (s *Server) Init() error

Init initializes the Server object. It establishes the connection with the broker, creating a channel and the queues that will be used under the hood.

func (*Server) Register

func (s *Server) Register(method string, f Function) error

Register registers a method with the name given by the parameter method and links the function f to it. Register should be called before Init() to avoid dropping messages due to "not registered method" errors.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown shuts down the server gracefully. Using this method will ensure that all requests sent by the RPC clients to the server will be handled by the latter.

Directories

Path Synopsis
_examples
sum
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL