arpc

package module
v1.2.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2024 License: MIT Imports: 16 Imported by: 16

README

ARPC - More Effective Network Communication

Slack

Mentioned in Awesome Go MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • Two-Way Calling
  • Two-Way Notify
  • Sync and Async Calling
  • Sync and Async Response
  • Batch Write | Writev | net.Buffers
  • Broadcast
  • Middleware
  • Pub/Sub
  • Opentracing
Pattern Interactive Directions Description
call two-way:
c -> s
s -> c
request and response
notify two-way:
c -> s
s -> c
request without response

Performance

Here are some thirdparty benchmark including arpc, although these repos have provide the performance report, but I suggest you run the code yourself and get the real result, other than just believe other people's doc:

Header Layout

  • LittleEndian
bodyLen reserved cmd flag methodLen sequence method body
4 bytes 1 byte 1 byte 1 bytes 1 bytes 8 bytes methodLen bytes bodyLen-methodLen bytes

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() {
	server := arpc.NewServer()

	// register router
	server.Handler.Handle("/echo", func(ctx *arpc.Context) {
		str := ""
		if err := ctx.Bind(&str); err == nil {
			ctx.Write(str)
		}
	})

	server.Run("localhost:8888")
}
package main

import (
	"log"
	"net"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	client, err := arpc.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	defer client.Stop()

	req := "hello"
	rsp := ""
	err = client.Call("/echo", &req, &rsp, time.Second*5)
	if err != nil {
		log.Fatalf("Call failed: %v", err)
	} else {
		log.Printf("Call Response: \"%v\"", rsp)
	}
}

API Examples

Register Routers
var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// message would be default handled one by one  in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)
Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpc/extension/middleware/router"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })
Coder Middleware
  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpc/extension/middleware/coder/gzip"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
Client Call, CallAsync, Notify
  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
	response := &Echo{}
	ctx.Bind(response)
	...	
}, timeout)
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)
Server Call, CallAsync, Notify
  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
	client = ctx.Client
	// release client
	client.OnDisconnected(func(c *arpc.Client){
		client = nil
	})
})

go func() {
	for {
		time.Sleep(time.Second)
		if client != nil {
			client.Call(...)
			client.CallAsync(...)
			client.Notify(...)
		}
	}
}()
  1. Then Call/CallAsync/Notify
Broadcast - Notify
var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
	var svr *arpc.Server = ... 
	msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
	mux.RLock()
	for client := range clientMap {
		client.PushMsg(msg, arpc.TimeZero)
	}
	mux.RUnlock()
}
Async Response
var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
	req := ...
	err := ctx.Bind(req)
	if err == nil {
		ctx.Write(data)
	}
}, asyncResponse)
Handle New Connection
// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
	...
})
Handle Disconnected
// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})
Handle Client's send queue overstock
// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})
Custom Net Protocol
// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
	return ... 
}
client, err := arpc.NewClient(dialer)
Custom Codec
import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package
codec.Defaultcodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec
Custom Logger
import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger
Custom operations before conn's recv and send
arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
	// ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
	// ...
})
Custom arpc.Client's Reader by wrapping net.Conn
arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
	// ...
})
Custom arpc.Client's send queue capacity
arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	s := pubsub.NewServer()
	s.Password = password

	// server publish to all clients
	go func() {
		for i := 0; true; i++ {
			time.Sleep(time.Second)
			s.Publish(topicName, fmt.Sprintf("message from server %v", i))
		}
	}()

	s.Run(address)
}
  • start a subscribe client
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func onTopic(topic *pubsub.Topic) {
	log.Info("[OnTopic] [%v] \"%v\", [%v]",
		topic.Name,
		string(topic.Data),
		time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	// subscribe topic
	if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
		panic(err)
	}

	<-make(chan int)
}
  • start a publish client
import "github.com/lesismal/arpc/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	for i := 0; true; i++ {
		if i%5 == 0 {
			// publish msg to all clients
			client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		} else {
			// publish msg to only one client
			client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		}
		time.Sleep(time.Second)
	}
}

More Examples

Documentation

Index

Constants

View Source
const (
	// TimeZero represents zero time.
	TimeZero time.Duration = 0
	// TimeForever represents forever time.
	TimeForever time.Duration = 1<<63 - 1
)
View Source
const (
	// CmdNone is invalid
	CmdNone byte = 0

	// CmdRequest the other side should response to a request message
	CmdRequest byte = 1

	// CmdResponse the other side should not response to a request message
	CmdResponse byte = 2

	// CmdNotify the other side should not response to a request message
	CmdNotify byte = 3

	// CmdPing .
	CmdPing byte = 4

	// CmdPong .
	CmdPong byte = 5

	// CmdStream .
	CmdStream byte = 6
)
View Source
const (
	// HeaderIndexBodyLenBegin .
	HeaderIndexBodyLenBegin = 0
	// HeaderIndexBodyLenEnd .
	HeaderIndexBodyLenEnd = 4
	// HeaderIndexReserved .
	HeaderIndexReserved = 4
	// HeaderIndexCmd .
	HeaderIndexCmd = 5
	// HeaderIndexFlag .
	HeaderIndexFlag = 6
	// HeaderIndexMethodLen .
	HeaderIndexMethodLen = 7
	// HeaderIndexSeqBegin .
	HeaderIndexSeqBegin = 8
	// HeaderIndexSeqEnd .
	HeaderIndexSeqEnd = 16
	// HeaderFlagMaskError .
	HeaderFlagMaskError byte = 0x01
	// HeaderFlagMaskAsync .
	HeaderFlagMaskAsync byte = 0x02

	HeaderStreamLocalBitIndex = 7
	HeaderStreamEOFBitIndex   = 6
	HeaderStreamLocalBit      = byte(0x1) << HeaderStreamLocalBitIndex
	HeaderStreamEOFBit        = byte(0x1) << HeaderStreamEOFBitIndex
	HeaderStreamFlagBitMask   = HeaderStreamLocalBit | HeaderStreamEOFBit
	HeaderCmdBitMask          = ^HeaderStreamFlagBitMask
)
View Source
const (
	// HeadLen represents Message head length.
	HeadLen int = 16

	// MaxMethodLen limits Message method length.
	MaxMethodLen int = 127

	// DefaultMaxBodyLen limits Message body length.
	DefaultMaxBodyLen int = 1024*1024*64 - 16
)

Variables

View Source
var (
	// ErrClientTimeout represents a timeout error because of timer or context.
	ErrClientTimeout = errors.New("timeout")

	// ErrClientInvalidTimeoutZero represents an error of 0 time parameter.
	ErrClientInvalidTimeoutZero = errors.New("invalid timeout, should not be 0")

	// ErrClientInvalidTimeoutLessThanZero represents an error of less than 0 time parameter.
	ErrClientInvalidTimeoutLessThanZero = errors.New("invalid timeout, should not be < 0")

	// ErrClientInvalidTimeoutZeroWithNonNilCallback represents an error with 0 time parameter but with non-nil callback.
	ErrClientInvalidTimeoutZeroWithNonNilCallback = errors.New("invalid timeout 0 with non-nil callback")

	// ErrClientOverstock represents an error of Client's send queue is full.
	ErrClientOverstock = errors.New("timeout: rpc Client's send queue is full")

	// ErrClientReconnecting represents an error that Client is reconnecting.
	ErrClientReconnecting = errors.New("client reconnecting")

	// ErrClientStopped represents an error that Client is stopped.
	ErrClientStopped = errors.New("client stopped")

	// ErrClientInvalidPoolDialers represents an error of empty dialer array.
	ErrClientInvalidPoolDialers = errors.New("invalid dialers: empty array")

	// ErrClientInvalidAsyncHandler represents an error of invalid(nil) async handler.
	ErrClientInvalidAsyncHandler = errors.New("invalid async handler: should not be nil")
)

client error

View Source
var (
	// ErrInvalidRspMessage represents an error of invalid message CMD.
	ErrInvalidRspMessage = errors.New("invalid response message cmd")

	// ErrMethodNotFound represents an error of method not found.
	ErrMethodNotFound = errors.New("method not found")

	// ErrInvalidFlagBitIndex represents an error of invlaid flag bit index.
	ErrInvalidFlagBitIndex = errors.New("invalid index, should be 0-7")
)

message error

View Source
var (
	// PingMessage .
	PingMessage = newMessage(CmdPing, "", nil, false, false, 0, nil, nil, nil)

	// PongMessage .
	PongMessage = newMessage(CmdPong, "", nil, false, false, 0, nil, nil, nil)
)
View Source
var (
	// ErrContextResponseToNotify represents an error that response to a notify message.
	ErrContextResponseToNotify = errors.New("should not response to a context with notify message")
)

context error

View Source
var (
	// ErrStreamClosedSend represents an error of stream closed send.
	ErrStreamClosedSend = errors.New("stream has closed send")
)

stream errors

View Source
var (
	// ErrTimeout represents an error of timeout.
	ErrTimeout = errors.New("timeout")
)

general errors

Functions

func Append added in v1.2.7

func Append(buf []byte, more ...byte) []byte

Append exports default package method.

func AppendString added in v1.2.7

func AppendString(buf []byte, more string) []byte

AppendString exports default package method.

func AsyncExecute added in v1.1.10

func AsyncExecute(f func())

AsyncExecute executes a func.

func AsyncResponse

func AsyncResponse() bool

AsyncResponse returns default AsyncResponse flag.

func BatchRecv

func BatchRecv() bool

BatchRecv returns default BatchRecv flag.

func BatchSend

func BatchSend() bool

BatchSend returns default BatchSend flag.

func BeforeRecv

func BeforeRecv(h func(net.Conn) error)

BeforeRecv registers default handler which will be called before Recv.

func BeforeSend

func BeforeSend(h func(net.Conn) error)

BeforeSend registers default handler which will be called before Send.

func EnablePool added in v1.1.9

func EnablePool(enable bool)

EnablePool registers handlers for pool operation for Context and Message and Message.Buffer

func Free added in v1.1.9

func Free(buf []byte)

Free exports default package method.

func Handle

func Handle(m string, h HandlerFunc, args ...interface{})

Handle registers default method/router handler.

If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, Else the handler will be called synchronously in the client's reading goroutine one by one.

func HandleConnected

func HandleConnected(onConnected func(*Client))

HandleConnected registers default handler which will be called when client connected.

func HandleDisconnected

func HandleDisconnected(onDisConnected func(*Client))

HandleDisconnected registers default handler which will be called when client disconnected.

func HandleMalloc added in v1.1.9

func HandleMalloc(f func(int) []byte)

HandleMalloc registers default buffer maker.

func HandleMessageDropped

func HandleMessageDropped(onOverstock func(c *Client, m *Message))

HandleMessageDropped registers default handler which will be called when message dropped.

func HandleNotFound

func HandleNotFound(h HandlerFunc)

HandleNotFound registers default "" method/router handler, It will be called when mothod/router is not found.

func HandleOverstock

func HandleOverstock(onOverstock func(c *Client, m *Message))

HandleOverstock registers default handler which will be called when client send queue is overstock.

func HandleSessionMiss

func HandleSessionMiss(onSessionMiss func(c *Client, m *Message))

HandleSessionMiss registers default handler which will be called when async message seq not found.

func LogDebugInfo added in v1.2.7

func LogDebugInfo()

LogDebugInfo .

func Malloc added in v1.1.9

func Malloc(size int) []byte

Malloc exports default package method.

func MaxBodyLen

func MaxBodyLen() int

func ReadTimeout added in v1.2.11

func ReadTimeout() time.Duration

ReadTimeout returns client's read timeout.

func Realloc added in v1.1.9

func Realloc(buf []byte, size int) []byte

Realloc exports default package method.

func RecvBufferSize

func RecvBufferSize() int

RecvBufferSize returns default client's read buffer size.

func SendBufferSize added in v1.1.9

func SendBufferSize() int

SendBufferSize returns default client's read buffer size.

func SendQueueSize

func SendQueueSize() int

SendQueueSize returns default client's send queue channel capacity.

func SetAsyncExecutor added in v1.1.10

func SetAsyncExecutor(executor func(f func()))

SetAsyncExecutor sets executor. AsyncExecute executes a func

func SetAsyncResponse

func SetAsyncResponse(async bool)

SetAsyncResponse sets default AsyncResponse flag.

func SetBatchRecv

func SetBatchRecv(batch bool)

SetBatchRecv sets default BatchRecv flag.

func SetBatchSend

func SetBatchSend(batch bool)

SetBatchSend sets default BatchSend flag.

func SetDebug added in v1.2.7

func SetDebug(enable bool)

SetDebug .

func SetHandler

func SetHandler(h Handler)

SetHandler sets default Handler.

func SetLogTag

func SetLogTag(tag string)

SetLogTag sets DefaultHandler's log tag.

func SetMaxBodyLen added in v1.1.11

func SetMaxBodyLen(l int)

func SetReadTimeout added in v1.2.11

func SetReadTimeout(timeout time.Duration)

SetReadTimeout sets client's read timeout.

func SetReaderWrapper

func SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)

SetReaderWrapper registers default reader wrapper for net.Conn.

func SetRecvBufferSize

func SetRecvBufferSize(size int)

SetRecvBufferSize sets default client's read buffer size.

func SetSendBufferSize added in v1.1.9

func SetSendBufferSize(size int)

SetSendBufferSize sets default client's read buffer size.

func SetSendQueueSize

func SetSendQueueSize(size int)

SetSendQueueSize sets default client's send queue channel capacity.

func SetStreamQueueSize added in v1.2.15

func SetStreamQueueSize(size int)

SetStreamQueueSize sets default stream queue channel capacity.

func SetWriteTimeout added in v1.2.11

func SetWriteTimeout(timeout time.Duration)

SetWriteTimeout sets client's write timeout.

func StreamQueueSize added in v1.2.15

func StreamQueueSize() int

StreamQueueSize returns default stream queue channel capacity.

func Use

func Use(h HandlerFunc)

Use registers default method/router handler middleware.

func UseCoder

func UseCoder(coder MessageCoder)

UseCoder registers default message coding middleware, coder.Encode will be called before message send, coder.Decode will be called after message recv.

func WriteTimeout added in v1.2.11

func WriteTimeout() time.Duration

WriteTimeout returns client's write timeout.

Types

type Allocator added in v1.2.7

type Allocator interface {
	Malloc(size int) []byte
	Realloc(buf []byte, size int) []byte
	Append(buf []byte, more ...byte) []byte
	AppendString(buf []byte, more string) []byte
	Free(buf []byte)
}
var DefaultAllocator Allocator = New(64, 64)

DefaultAllocator .

func New added in v1.1.9

func New(smallSize, bigSize int) Allocator

New .

type AsyncHandlerFunc added in v1.2.12

type AsyncHandlerFunc func(*Context, error)

AsyncHandlerFunc defines callback of Client.CallAsync.

type BufferPool added in v1.1.9

type BufferPool struct {
	Debug bool
	// contains filtered or unexported fields
}

BufferPool .

func (*BufferPool) Append added in v1.2.7

func (bp *BufferPool) Append(buf []byte, more ...byte) []byte

Append .

func (*BufferPool) AppendString added in v1.2.7

func (bp *BufferPool) AppendString(buf []byte, more string) []byte

AppendString .

func (*BufferPool) Free added in v1.2.7

func (bp *BufferPool) Free(buf []byte)

Free .

func (*BufferPool) LogDebugInfo added in v1.2.7

func (bp *BufferPool) LogDebugInfo()

func (*BufferPool) Malloc added in v1.2.7

func (bp *BufferPool) Malloc(size int) []byte

Malloc .

func (*BufferPool) Realloc added in v1.2.7

func (bp *BufferPool) Realloc(buf []byte, size int) []byte

Realloc .

type Client

type Client struct {
	Conn    net.Conn
	Codec   codec.Codec
	Handler Handler
	Reader  io.Reader
	Dialer  DialerFunc
	Head    Header
	// contains filtered or unexported fields
}

Client represents an arpc Client. There may be multiple outstanding Calls or Notifys associated with a single Client, and a Client may be used by multiple goroutines simultaneously.

func NewClient

func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error)

NewClient creates a Client.

func (*Client) Call

func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, args ...interface{}) error

Call makes an rpc call with a timeout. Call will block waiting for the server's response until timeout.

func (*Client) CallAsync

func (c *Client) CallAsync(method string, req interface{}, handler AsyncHandlerFunc, timeout time.Duration, args ...interface{}) error

CallAsync makes an asynchronous rpc call with timeout. CallAsync will not block waiting for the server's response, But the handler will be called if the response arrives before the timeout.

func (*Client) CallContext added in v1.2.12

func (c *Client) CallContext(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error

CallContext uses context to make rpc call. CallContext blocks to wait for a response from the server until it times out.

func (*Client) CallWith

func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error

CallWith uses context to make rpc call. CallWith blocks to wait for a response from the server until it times out.

func (*Client) CheckState

func (c *Client) CheckState() error

CheckState checks Client's state.

func (*Client) Delete

func (c *Client) Delete(key interface{})

Delete deletes key-value pair

func (*Client) Get

func (c *Client) Get(key interface{}) (interface{}, bool)

Get returns value for key.

func (*Client) Keepalive added in v1.2.11

func (c *Client) Keepalive(interval time.Duration)

Ping .

func (*Client) NewMessage

func (c *Client) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message

NewMessage creates a Message by client's seq, handler and codec.

func (*Client) NewStream added in v1.2.15

func (client *Client) NewStream(method string) *Stream

NewStream creates a stream.

func (*Client) Notify

func (c *Client) Notify(method string, data interface{}, timeout time.Duration, args ...interface{}) error

Notify makes a notify with timeout. A notify does not need a response from the server.

func (*Client) NotifyContext added in v1.2.12

func (c *Client) NotifyContext(ctx context.Context, method string, data interface{}, args ...interface{}) error

NotifyContext use context to make rpc notify. A notify does not need a response from the server.

func (*Client) NotifyWith

func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}, args ...interface{}) error

NotifyWith use context to make rpc notify. A notify does not need a response from the server.

func (*Client) Ping added in v1.2.11

func (c *Client) Ping()

Ping .

func (*Client) Pong added in v1.2.11

func (c *Client) Pong()

Pong .

func (*Client) PushMsg

func (c *Client) PushMsg(msg *Message, timeout time.Duration) error

PushMsg pushes a msg to Client's send queue with timeout.

func (*Client) Restart

func (c *Client) Restart() error

Restart stops and restarts a Client.

func (*Client) Set

func (c *Client) Set(key interface{}, value interface{})

Set sets key-value pair.

func (*Client) SetState added in v1.2.7

func (c *Client) SetState(running bool)

SetState sets running state, should be used only for non-blocking conn.

func (*Client) Stop

func (c *Client) Stop()

Stop stops a Client.

type ClientPool

type ClientPool struct {
	// contains filtered or unexported fields
}

ClientPool represents an arpc Client Pool.

func NewClientPool

func NewClientPool(dialer DialerFunc, size int, args ...interface{}) (*ClientPool, error)

NewClientPool creates a ClientPool.

func NewClientPoolFromDialers

func NewClientPoolFromDialers(dialers []DialerFunc, args ...interface{}) (*ClientPool, error)

NewClientPoolFromDialers creates a ClientPool by multiple dialers.

func (*ClientPool) Get

func (pool *ClientPool) Get(index int) *Client

Get returns a Client by index.

func (*ClientPool) Handler

func (pool *ClientPool) Handler() Handler

Handler returns Handler.

func (*ClientPool) Next

func (pool *ClientPool) Next() *Client

Next returns a Client by round robin.

func (*ClientPool) Size

func (pool *ClientPool) Size() int

Size returns Client number.

func (*ClientPool) Stop

func (pool *ClientPool) Stop()

Stop stops all clients.

type Context

type Context struct {
	Client  *Client
	Message *Message
	// contains filtered or unexported fields
}

Context represents an arpc Call's context.

func (*Context) Abort added in v1.1.0

func (ctx *Context) Abort()

Abort stops the one-by-one-calling of middlewares and method/router handler.

func (*Context) Bind

func (ctx *Context) Bind(v interface{}) error

Bind parses the body data and stores the result in the value pointed to by v.

func (*Context) Body

func (ctx *Context) Body() []byte

Body returns body.

func (*Context) Deadline added in v1.1.0

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

Deadline implements stdlib's Context.

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

Done implements stdlib's Context.

func (*Context) Err added in v1.1.0

func (ctx *Context) Err() error

Err implements stdlib's Context.

func (*Context) Error

func (ctx *Context) Error(v interface{}) error

Error responses an error Message to the Client.

func (*Context) Get

func (ctx *Context) Get(key interface{}) (interface{}, bool)

Get returns value for key.

func (*Context) Next

func (ctx *Context) Next()

Next calls next middleware or method/router handler.

func (*Context) Release added in v1.1.9

func (ctx *Context) Release()

func (*Context) ResponseError added in v1.2.16

func (ctx *Context) ResponseError() interface{}

func (*Context) Set

func (ctx *Context) Set(key interface{}, value interface{})

Set sets key-value pair.

func (*Context) Value added in v1.1.0

func (ctx *Context) Value(key interface{}) interface{}

Value returns the value associated with this context for key, implements stdlib's Context.

func (*Context) Values

func (ctx *Context) Values() map[interface{}]interface{}

Values returns values.

func (*Context) Write

func (ctx *Context) Write(v interface{}) error

Write responses a Message to the Client.

func (*Context) WriteWithTimeout

func (ctx *Context) WriteWithTimeout(v interface{}, timeout time.Duration) error

WriteWithTimeout responses a Message to the Client with timeout.

type DialerFunc

type DialerFunc func() (net.Conn, error)

DialerFunc defines the dialer used by arpc Client to connect to the server.

type Handler

type Handler interface {
	// Clone returns a copy of Handler.
	Clone() Handler

	// LogTag returns log tag value.
	LogTag() string
	// SetLogTag sets log tag.
	SetLogTag(tag string)

	// HandleConnected registers handler which will be called when client connected.
	HandleConnected(onConnected func(*Client))
	// OnConnected will be called when client is connected.
	OnConnected(c *Client)

	// HandleDisconnected registers handler which will be called when client is disconnected.
	HandleDisconnected(onDisConnected func(*Client))
	// OnDisconnected will be called when client is disconnected.
	OnDisconnected(c *Client)

	// MaxReconnectTimes returns client's max reconnect times.
	MaxReconnectTimes() int
	// SetMaxReconnectTimes sets client's max reconnect times for.
	SetMaxReconnectTimes(n int)

	// HandleOverstock registers handler which will be called when client send queue is overstock.
	HandleOverstock(onOverstock func(c *Client, m *Message))
	// OnOverstock will be called when client chSend is full.
	OnOverstock(c *Client, m *Message)

	// HandleMessageDone registers handler which will be called when message dropped.
	HandleMessageDone(onMessageDone func(c *Client, m *Message))
	// OnMessageDone will be called when message is dropped.
	OnMessageDone(c *Client, m *Message)

	// HandleMessageDropped registers handler which will be called when message dropped.
	HandleMessageDropped(onOverstock func(c *Client, m *Message))
	// OnOverstock will be called when message is dropped.
	OnMessageDropped(c *Client, m *Message)

	// HandleSessionMiss registers handler which will be called when async message seq not found.
	HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
	// OnSessionMiss will be called when async message seq not found.
	OnSessionMiss(c *Client, m *Message)

	// HandleContextDone registers handler which will be called when message dropped.
	HandleContextDone(onContextDone func(ctx *Context))
	// OnContextDone will be called when message is dropped.
	OnContextDone(ctx *Context)

	// BeforeRecv registers handler which will be called before Recv.
	BeforeRecv(h func(net.Conn) error)
	// BeforeSend registers handler which will be called before Send.
	BeforeSend(h func(net.Conn) error)

	// BatchRecv returns BatchRecv flag.
	BatchRecv() bool
	// SetBatchRecv sets BatchRecv flag.
	SetBatchRecv(batch bool)
	// BatchSend returns BatchSend flag.
	BatchSend() bool
	// SetBatchSend sets BatchSend flag.
	SetBatchSend(batch bool)

	// AsyncWrite returns AsyncWrite flag.
	AsyncWrite() bool
	// SetAsyncWrite sets AsyncWrite flag.
	SetAsyncWrite(async bool)

	// AsyncResponse returns AsyncResponse flag.
	AsyncResponse() bool
	// SetAsyncResponse sets AsyncResponse flag.
	SetAsyncResponse(async bool)

	// WrapReader wraps net.Conn to Read data with io.Reader.
	WrapReader(conn net.Conn) io.Reader
	// SetReaderWrapper registers reader wrapper for net.Conn.
	SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)

	// Recv reads a message from a client.
	Recv(c *Client) (*Message, error)
	// Send writes buffer data to a connection.
	Send(c net.Conn, buffer []byte) (int, error)
	// SendN writes multiple buffer data to a connection.
	SendN(conn net.Conn, buffers net.Buffers) (int, error)

	// RecvBufferSize returns client's recv buffer size.
	RecvBufferSize() int
	// SetRecvBufferSize sets client's recv buffer size.
	SetRecvBufferSize(size int)

	// SendBufferSize returns client's send buffer size.
	SendBufferSize() int
	// SetSendBufferSize sets client's send buffer size.
	SetSendBufferSize(size int)

	// ReadTimeout returns client's read timeout.
	ReadTimeout() time.Duration
	// SetReadTimeout sets client's read timeout.
	SetReadTimeout(timeout time.Duration)

	// WriteTimeout returns client's write timeout.
	WriteTimeout() time.Duration
	// SetWriteTimeout sets client's write timeout.
	SetWriteTimeout(timeout time.Duration)

	// SendQueueSize returns client's send queue channel capacity.
	SendQueueSize() int
	// SetSendQueueSize sets client's send queue channel capacity.
	SetSendQueueSize(size int)

	// StreamQueueSize returns stream queue channel capacity.
	StreamQueueSize() int
	// SetStreamQueueSize sets stream queue channel capacity.
	SetStreamQueueSize(size int)

	// MaxBodyLen returns max body length of a message.
	MaxBodyLen() int
	// SetMaxBodyLen sets max body length of a message.
	SetMaxBodyLen(l int)

	// Use registers method/router handler middleware.
	Use(h HandlerFunc)

	// UseCoder registers message coding middleware,
	// coder.Encode will be called before message send,
	// coder.Decode will be called after message recv.
	UseCoder(coder MessageCoder)

	// Coders returns coding middlewares.
	Coders() []MessageCoder

	// Handle registers method/router handler.
	//
	// If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine,
	// Else the handler will be called synchronously in the client's reading goroutine one by one.
	Handle(m string, h HandlerFunc, args ...interface{})

	// HandleNotFound registers "" method/router handler,
	// It will be called when mothod/router is not found.
	HandleNotFound(h HandlerFunc)

	// HandleStream registers method/router stream handler.
	HandleStream(m string, h StreamHandlerFunc, args ...interface{})

	// OnMessage finds method/router middlewares and handler, then call them one by one.
	OnMessage(c *Client, m *Message)

	// Malloc makes a buffer by size.
	Malloc(size int) []byte
	// HandleMalloc registers buffer maker.
	HandleMalloc(f func(size int) []byte)

	// Append append bytes to buffer.
	Append(b []byte, more ...byte) []byte
	// HandleAppend registers buffer appender.
	HandleAppend(f func(b []byte, more ...byte) []byte)

	// Free release a buffer.
	Free([]byte)
	// HandleFree registers buffer releaser.
	HandleFree(f func(buf []byte))

	// EnablePool registers handlers for pool operation for Context and Message and Message.Buffer
	EnablePool(enable bool)

	Context() (context.Context, context.CancelFunc)
	SetContext(ctx context.Context, cancel context.CancelFunc)
	Cancel()

	// NewMessage creates a Message.
	NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message

	// NewMessageWithBuffer creates a message with the buffer and manage the message by the pool.
	// The buffer arg should be managed by a pool if EnablePool(true) .
	NewMessageWithBuffer(buffer []byte) *Message

	// SetAsyncExecutor sets executor.
	SetAsyncExecutor(executor func(f func()))
	// AsyncExecute executes a func
	AsyncExecute(f func())
}

Handler defines net message handler interface.

var DefaultHandler Handler = NewHandler()

DefaultHandler is the default Handler used by arpc

func NewHandler

func NewHandler() Handler

NewHandler returns a default Handler implementation.

type HandlerFunc

type HandlerFunc func(*Context)

HandlerFunc defines message handler.

type Header []byte

Header defines Message head

func (Header) BodyLen

func (h Header) BodyLen() int

BodyLen returns Message body length.

type Message

type Message struct {
	Buffer []byte
	// contains filtered or unexported fields
}

Message represents an arpc Message.

func NewMessage added in v1.1.9

func NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, h Handler, codec codec.Codec, values map[interface{}]interface{}) *Message

NewMessage creates a Message.

func (*Message) BodyLen

func (m *Message) BodyLen() int

BodyLen returns body length.

func (*Message) Cmd

func (m *Message) Cmd() byte

Cmd returns cmd.

func (*Message) Data

func (m *Message) Data() []byte

Data returns payload data after method.

func (*Message) Error

func (m *Message) Error() error

Error returns error.

func (*Message) Get

func (m *Message) Get(key interface{}) (interface{}, bool)

Get returns value for key.

func (*Message) IsAsync

func (m *Message) IsAsync() bool

IsAsync returns async flag.

func (*Message) IsError

func (m *Message) IsError() bool

IsError returns error flag.

func (*Message) IsFlagBitSet

func (m *Message) IsFlagBitSet(index int) bool

IsFlagBitSet returns flag bit value.

func (*Message) IsStreamEOF added in v1.2.15

func (m *Message) IsStreamEOF() bool

IsStream represents whether it's a stream's last message and the stream is EOF and closed.

func (*Message) IsStreamLocal added in v1.2.15

func (m *Message) IsStreamLocal() bool

IsStream represents whether it's a stream message.

func (*Message) Len

func (m *Message) Len() int

Len returns total length of buffer.

func (*Message) Method

func (m *Message) Method() string

Method returns method.

func (*Message) MethodLen

func (m *Message) MethodLen() int

MethodLen returns method length.

func (*Message) Payback added in v1.1.9

func (m *Message) Payback()

Payback put Message to the pool.

func (*Message) Release added in v1.1.9

func (m *Message) Release() int32

Release decrement the reference count and returns the current value.

func (*Message) ResetAttrs added in v1.1.9

func (m *Message) ResetAttrs()

ResetAttrs resets reserved/cmd/flag/methodLen to 0.

func (*Message) Retain added in v1.1.9

func (m *Message) Retain() int32

Retain increment the reference count and returns the current value.

func (*Message) Seq

func (m *Message) Seq() uint64

Seq returns sequence number.

func (*Message) Set

func (m *Message) Set(key interface{}, value interface{})

Set sets key-value pair.

func (*Message) SetAsync

func (m *Message) SetAsync(isAsync bool)

SetAsync sets async flag.

func (*Message) SetBodyLen

func (m *Message) SetBodyLen(l int)

SetBodyLen sets body length.

func (*Message) SetCmd

func (m *Message) SetCmd(cmd byte)

SetCmd sets cmd.

func (*Message) SetError

func (m *Message) SetError(isError bool)

SetError sets error flag.

func (*Message) SetFlagBit

func (m *Message) SetFlagBit(index int, value bool) error

SetFlagBit sets flag bit value by index.

func (*Message) SetMethodLen

func (m *Message) SetMethodLen(l int)

SetMethodLen sets method length.

func (*Message) SetSeq

func (m *Message) SetSeq(seq uint64)

SetSeq sets sequence number.

func (*Message) SetStreamEOF added in v1.2.15

func (m *Message) SetStreamEOF(eof bool)

SetStream sets the flag for a stream's last message and mark the stream is EOF and closed.

func (*Message) SetStreamLocal added in v1.2.15

func (m *Message) SetStreamLocal(local bool)

SetStream sets the flag for a stream message.

func (*Message) Values

func (m *Message) Values() map[interface{}]interface{}

Values returns values.

type MessageCoder

type MessageCoder interface {
	// Encode wrap message before send to client
	Encode(*Client, *Message) *Message
	// Decode unwrap message between recv and handle
	Decode(*Client, *Message) *Message
}

MessageCoder defines Message coding middleware interface.

type NativeAllocator added in v1.1.9

type NativeAllocator struct{}

NativeAllocator definition.

func (*NativeAllocator) Free added in v1.1.9

func (a *NativeAllocator) Free(buf []byte)

Free .

func (*NativeAllocator) Malloc added in v1.1.9

func (a *NativeAllocator) Malloc(size int) []byte

Malloc .

func (*NativeAllocator) Realloc added in v1.1.9

func (a *NativeAllocator) Realloc(buf []byte, size int) []byte

Realloc .

type Server

type Server struct {
	Accepted int64
	CurrLoad int64
	MaxLoad  int64

	Codec   codec.Codec
	Handler Handler

	Listener net.Listener
	// contains filtered or unexported fields
}

Server represents an arpc Server.

func NewServer

func NewServer() *Server

NewServer creates an arpc Server.

func (*Server) Broadcast added in v1.2.2

func (s *Server) Broadcast(method string, v interface{}, args ...interface{})

func (*Server) BroadcastWithFilter added in v1.2.2

func (s *Server) BroadcastWithFilter(method string, v interface{}, filter func(*Client) bool, args ...interface{})

func (*Server) ForEach added in v1.2.2

func (s *Server) ForEach(h func(*Client))

func (*Server) ForEachWithFilter added in v1.2.2

func (s *Server) ForEachWithFilter(h func(*Client), filter func(*Client) bool)

func (*Server) NewMessage

func (s *Server) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message

NewMessage creates a Message.

func (*Server) Run

func (s *Server) Run(addr string) error

Run starts tcp service on addr.

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve starts service with listener.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown shutdown service.

func (*Server) Stop

func (s *Server) Stop() error

Stop stops service.

type Stream added in v1.2.15

type Stream struct {
	// contains filtered or unexported fields
}

Stream .

func (*Stream) CloseRecv added in v1.2.15

func (s *Stream) CloseRecv()

func (*Stream) CloseRecvContext added in v1.2.15

func (s *Stream) CloseRecvContext(ctx context.Context)

func (*Stream) CloseSend added in v1.2.15

func (s *Stream) CloseSend()

func (*Stream) CloseSendContext added in v1.2.15

func (s *Stream) CloseSendContext(ctx context.Context)

func (*Stream) Id added in v1.2.15

func (s *Stream) Id() uint64

func (*Stream) Recv added in v1.2.15

func (s *Stream) Recv(v interface{}) error

func (*Stream) RecvContext added in v1.2.15

func (s *Stream) RecvContext(ctx context.Context, v interface{}) error

func (*Stream) RecvWith added in v1.2.15

func (s *Stream) RecvWith(ctx context.Context, v interface{}) error

func (*Stream) Send added in v1.2.15

func (s *Stream) Send(v interface{}, args ...interface{}) error

func (*Stream) SendAndClose added in v1.2.15

func (s *Stream) SendAndClose(v interface{}, args ...interface{}) error

func (*Stream) SendAndCloseContext added in v1.2.15

func (s *Stream) SendAndCloseContext(ctx context.Context, v interface{}, args ...interface{}) error

func (*Stream) SendAndCloseWith added in v1.2.15

func (s *Stream) SendAndCloseWith(ctx context.Context, v interface{}, args ...interface{}) error

func (*Stream) SendContext added in v1.2.15

func (s *Stream) SendContext(ctx context.Context, v interface{}, args ...interface{}) error

func (*Stream) SendWith added in v1.2.15

func (s *Stream) SendWith(ctx context.Context, v interface{}, args ...interface{}) error

type StreamHandlerFunc added in v1.2.15

type StreamHandlerFunc func(*Stream)

StreamHandlerFunc defines stream handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL