rsocket

package module
v0.8.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2025 License: Apache-2.0 Imports: 23 Imported by: 30

README

rsocket-go

logo

GitHub Workflow Status codecov Go Report Card GoDoc License GitHub Release

rsocket-go is an implementation of the RSocket protocol in Go.

Features

  • Design For Golang.
  • Thin reactive-streams implementation.
  • Simulate Java SDK API.
  • Fast CLI (Compatible with https://github.com/rsocket/rsocket-cli).
    • Installation: go install github.com/rsocket/rsocket-go/cmd/rsocket-cli@latest
    • Example: rsocket-cli --request -i hello_world --setup setup_me tcp://127.0.0.1:7878

Install

Minimal go version is 1.11.

$ go install github.com/rsocket/rsocket-go/cmd/rsocket-cli@latest

Quick Start

Start an echo server

package main

import (
	"context"
	"log"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx/mono"
)

func main() {
	err := rsocket.Receive().
		Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			// bind responder
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
					return mono.Just(msg)
				}),
			), nil
		}).
		Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
		Serve(context.Background())
	log.Fatalln(err)
}

Connect to echo server

package main

import (
	"context"
	"log"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
)

func main() {
	// Connect to server
	cli, err := rsocket.Connect().
		SetupPayload(payload.NewString("Hello", "World")).
		Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()).
		Start(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.Close()
	// Send request
	result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
	if err != nil {
		panic(err)
	}
	log.Println("response:", result)
}

NOTICE: more server examples are Here

Advanced

rsocket-go provides TCP/Websocket transport implementations by default. Since v0.6.0, you can use core package to implement your own RSocket transport. I created an example project which show how to implement an unofficial QUIC transport. You can see rsocket-transport-quic if you are interested.

TODO

  • Wiki
  • UT: 90% coverage

Documentation

Overview

Example
// Serve a server
err := rsocket.Receive().
	Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
		return rsocket.NewAbstractSocket(
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				log.Println("incoming request:", msg)
				return mono.Just(payload.NewString("Pong", time.Now().String()))
			}),
		), nil
	}).
	Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
	Serve(context.Background())
if err != nil {
	panic(err)
}

// Connect to a server.
cli, err := rsocket.Connect().
	SetupPayload(payload.NewString("Hello World", "From Golang")).
	Transport(rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()).
	Start(context.Background())
if err != nil {
	panic(err)
}
defer cli.Close()
cli.RequestResponse(payload.NewString("Ping", time.Now().String())).
	DoOnSuccess(func(elem payload.Payload) error {
		log.Println("incoming response:", elem)
		return nil
	}).
	Subscribe(context.Background())
Output:

Index

Examples

Constants

View Source
const (
	// ErrorCodeInvalidSetup means the setup frame is invalid for the server.
	ErrorCodeInvalidSetup = core.ErrorCodeInvalidSetup
	// ErrorCodeUnsupportedSetup means some (or all) of the parameters specified by the client are unsupported by the server.
	ErrorCodeUnsupportedSetup = core.ErrorCodeUnsupportedSetup
	// ErrorCodeRejectedSetup means server rejected the setup, it can specify the reason in the payload.
	ErrorCodeRejectedSetup = core.ErrorCodeRejectedSetup
	// ErrorCodeRejectedResume means server rejected the resume, it can specify the reason in the payload.
	ErrorCodeRejectedResume = core.ErrorCodeRejectedResume
	// ErrorCodeConnectionError means the connection is being terminated.
	ErrorCodeConnectionError = core.ErrorCodeConnectionError
	// ErrorCodeConnectionClose means the connection is being terminated.
	ErrorCodeConnectionClose = core.ErrorCodeConnectionClose
	// ErrorCodeApplicationError means application layer logic generating a Reactive Streams onError event.
	ErrorCodeApplicationError = core.ErrorCodeApplicationError
	// ErrorCodeRejected means Responder reject it.
	ErrorCodeRejected = core.ErrorCodeRejected
	// ErrorCodeCanceled means the Responder canceled the request but may have started processing it (similar to REJECTED but doesn't guarantee lack of side-effects).
	ErrorCodeCanceled = core.ErrorCodeCanceled
	// ErrorCodeInvalid means the request is invalid.
	ErrorCodeInvalid = core.ErrorCodeInvalid
)
View Source
const (
	// DefaultUnixSockPath is the default UDS sock file path.
	DefaultUnixSockPath = "/var/run/rsocket.sock"
	// DefaultPort is the default port RSocket used.
	DefaultPort = 7878
)

Variables

This section is empty.

Functions

func GetAddr added in v0.7.13

func GetAddr(rs RSocket) (string, bool)

GetAddr returns the address info of given RSocket. Normally, the format is "IP:PORT".

Types

type Client added in v0.2.0

type Client interface {
	CloseableRSocket
}

Client is Client Side of a RSocket socket. Sends Frames to a RSocket Server.

type ClientBuilder

type ClientBuilder interface {
	ToClientStarter
	// Scheduler set schedulers for the requests or responses.
	// Nil scheduler means keep the default scheduler settings.
	Scheduler(requestScheduler scheduler.Scheduler, responseScheduler scheduler.Scheduler) ClientBuilder
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	// Also zero mtu means using default fragmentation size.
	Fragment(mtu int) ClientBuilder
	// KeepAlive defines current client keepalive settings.
	KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
	// Resume enable the functionality of resume.
	Resume(opts ...ClientResumeOptions) ClientBuilder
	// Lease enable the functionality of lease.
	Lease() ClientBuilder
	// DataMimeType is used to set payload data MIME type.
	// Default MIME type is `application/binary`.
	DataMimeType(mime string) ClientBuilder
	// MetadataMimeType is used to set payload metadata MIME type.
	// Default MIME type is `application/binary`.
	MetadataMimeType(mime string) ClientBuilder
	// SetupPayload set the setup payload.
	SetupPayload(setup payload.Payload) ClientBuilder
	// ConnectTimeout set connect timeout.
	ConnectTimeout(timeout time.Duration) ClientBuilder
	// OnClose register handler when client socket closed.
	OnClose(func(error)) ClientBuilder
	// OnConnect register handler when client socket connected.
	OnConnect(func(Client, error)) ClientBuilder
	// Acceptor set acceptor for RSocket client.
	Acceptor(acceptor ClientSocketAcceptor) ToClientStarter
}

ClientBuilder can be used to build a RSocket client.

func Connect

func Connect() ClientBuilder

Connect create a new RSocket client builder with default settings.

Example
cli, err := rsocket.Connect().
	Resume(). // Enable RESUME.
	Lease().  // Enable LEASE.
	Fragment(4096).
	SetupPayload(payload.NewString("Hello", "World")).
	Acceptor(func(ctx context.Context, socket rsocket.RSocket) rsocket.RSocket {
		return rsocket.NewAbstractSocket(
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				return mono.Just(payload.NewString("Pong", time.Now().String()))
			}),
		)
	}).
	Transport(rsocket.TCPClient().SetAddr("127.0.0.1:7878").Build()).
	Start(context.Background())
if err != nil {
	panic(err)
}
defer func() {
	_ = cli.Close()
}()
// Simple FireAndForget.
cli.FireAndForget(payload.NewString("This is a FNF message.", ""))
// Simple RequestResponse.
cli.RequestResponse(payload.NewString("This is a RequestResponse message.", "")).
	DoOnSuccess(func(elem payload.Payload) error {
		log.Println("response:", elem)
		return nil
	}).
	Subscribe(context.Background())
var s rx.Subscription
// RequestStream with backpressure. (one by one)
cli.RequestStream(payload.NewString("This is a RequestStream message.", "")).
	DoOnNext(func(elem payload.Payload) error {
		log.Println("next element in stream:", elem)
		s.Request(1)
		return nil
	}).
	Subscribe(context.Background(), rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
		s.Request(1)
	}))
// Simple RequestChannel.
sendFlux := flux.Create(func(ctx context.Context, s flux.Sink) {
	for i := 0; i < 3; i++ {
		s.Next(payload.NewString(fmt.Sprintf("This is a RequestChannel message #%d.", i), ""))
	}
	s.Complete()
})
cli.RequestChannel(sendFlux).
	DoOnNext(func(elem payload.Payload) error {
		log.Println("next element in channel:", elem)
		return nil
	}).
	Subscribe(context.Background())
Output:

type ClientResumeOptions added in v0.2.0

type ClientResumeOptions func(opts *resumeOpts)

ClientResumeOptions represents resume options for client.

func WithClientResumeToken added in v0.2.0

func WithClientResumeToken(gen func() []byte) ClientResumeOptions

WithClientResumeToken creates a resume token generator.

type ClientSocketAcceptor

type ClientSocketAcceptor = func(ctx context.Context, socket RSocket) RSocket

ClientSocketAcceptor is alias for RSocket handler function.

type ClientStarter

type ClientStarter interface {
	// Start start a client socket.
	Start(ctx context.Context) (Client, error)
}

ClientStarter can be used to start a client.

type CloseableRSocket added in v0.2.0

type CloseableRSocket interface {
	socket.Closeable
	RSocket
}

CloseableRSocket is RSocket which can be closed and handle close event.

type Error added in v0.5.10

type Error = core.CustomError

Error provides a method of accessing code and data.

type ErrorCode added in v0.5.10

type ErrorCode = core.ErrorCode

ErrorCode is code for RSocket error.

type OpServerResume added in v0.2.0

type OpServerResume func(o *serverResumeOptions)

OpServerResume represents resume options for RSocket server.

func WithServerResumeSessionDuration added in v0.2.0

func WithServerResumeSessionDuration(duration time.Duration) OpServerResume

WithServerResumeSessionDuration sets resume session duration for RSocket server.

type OptAbstractSocket

type OptAbstractSocket func(*socket.AbstractRSocket)

OptAbstractSocket is option for abstract socket.

func FireAndForget

func FireAndForget(fn func(request payload.Payload)) OptAbstractSocket

FireAndForget register request handler for FireAndForget.

func MetadataPush

func MetadataPush(fn func(request payload.Payload)) OptAbstractSocket

MetadataPush register request handler for MetadataPush.

func RequestChannel

func RequestChannel(fn func(requests flux.Flux) (responses flux.Flux)) OptAbstractSocket

RequestChannel register request handler for RequestChannel.

func RequestResponse

func RequestResponse(fn func(request payload.Payload) (response mono.Mono)) OptAbstractSocket

RequestResponse register request handler for RequestResponse.

func RequestStream

func RequestStream(fn func(request payload.Payload) (responses flux.Flux)) OptAbstractSocket

RequestStream register request handler for RequestStream.

type RSocket

type RSocket interface {
	// FireAndForget is a single one-way message.
	FireAndForget(message payload.Payload)
	// MetadataPush sends asynchronous Metadata frame.
	MetadataPush(message payload.Payload)
	// RequestResponse request single response.
	RequestResponse(message payload.Payload) mono.Mono
	// RequestStream request a completable stream.
	RequestStream(message payload.Payload) flux.Flux
	// RequestChannel request a completable stream in both directions.
	RequestChannel(messages flux.Flux) flux.Flux
}

RSocket is a contract providing different interaction models for RSocket protocol.

func NewAbstractSocket

func NewAbstractSocket(opts ...OptAbstractSocket) RSocket

NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.

type ServerAcceptor

type ServerAcceptor = func(ctx context.Context, setup payload.SetupPayload, socket CloseableRSocket) (RSocket, error)

ServerAcceptor is alias for server acceptor.

type ServerBuilder

type ServerBuilder interface {
	// Scheduler set schedulers for the requests or responses.
	// Nil scheduler means keep the default scheduler settings.
	Scheduler(requestScheduler, responseScheduler scheduler.Scheduler) ServerBuilder
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	Fragment(mtu int) ServerBuilder
	// Lease enable feature of Lease.
	Lease(leases lease.Factory) ServerBuilder
	// Resume enable resume for current server.
	Resume(opts ...OpServerResume) ServerBuilder
	// Acceptor register server acceptor which is used to handle incoming RSockets.
	Acceptor(acceptor ServerAcceptor) ToServerStarter
	// OnStart register a handler when serve success.
	OnStart(onStart func()) ServerBuilder
}

ServerBuilder can be used to build a RSocket server.

func Receive

func Receive() ServerBuilder

Receive receives server connections from client RSockets.

Example
err := rsocket.Receive().
	Resume(rsocket.WithServerResumeSessionDuration(30 * time.Second)).
	Fragment(65535).
	Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
		// Handle close.
		sendingSocket.OnClose(func(err error) {
			log.Println("sending socket is closed")
		})

		// You can reject connection. For example, do some authorization.
		// return nil, errors.New("ACCESS_DENY")

		// Request to client.
		sendingSocket.RequestResponse(payload.NewString("Ping", time.Now().String())).
			DoOnSuccess(func(elem payload.Payload) error {
				log.Println("response of Ping from client:", elem)
				return nil
			}).
			SubscribeOn(scheduler.Parallel()).
			Subscribe(context.Background())
		// Return responser which just echo.
		return rsocket.NewAbstractSocket(
			rsocket.FireAndForget(func(msg payload.Payload) {
				log.Println("receive fnf:", msg)
			}),
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				return mono.Just(msg)
			}),
			rsocket.RequestStream(func(msg payload.Payload) flux.Flux {
				return flux.Create(func(ctx context.Context, s flux.Sink) {
					for i := 0; i < 3; i++ {
						s.Next(payload.NewString(msg.DataUTF8(), fmt.Sprintf("This is response #%04d", i)))
					}
					s.Complete()
				})
			}),
			rsocket.RequestChannel(func(requests flux.Flux) flux.Flux {
				return requests
			}),
		), nil
	}).
	Transport(rsocket.TCPServer().SetHostAndPort("127.0.0.1", 7878).Build()).
	Serve(context.Background())
panic(err)
Output:

type Start

type Start interface {
	// Serve serve RSocket server.
	Serve(ctx context.Context) error
}

Start start a RSocket server.

type TCPClientBuilder added in v0.6.0

type TCPClientBuilder struct {
	// contains filtered or unexported fields
}

TCPClientBuilder provides builder which can be used to create a client-side TCP transport easily.

func TCPClient added in v0.6.0

func TCPClient() *TCPClientBuilder

TCPClient creates a new TCPClientBuilder

func (*TCPClientBuilder) Build added in v0.6.0

Build builds and returns a new TCP ClientTransporter.

func (*TCPClientBuilder) SetAddr added in v0.6.0

func (tc *TCPClientBuilder) SetAddr(addr string) *TCPClientBuilder

SetAddr sets the addr

func (*TCPClientBuilder) SetHostAndPort added in v0.6.0

func (tc *TCPClientBuilder) SetHostAndPort(host string, port int) *TCPClientBuilder

SetHostAndPort sets the host and port.

func (*TCPClientBuilder) SetTLSConfig added in v0.6.0

func (tc *TCPClientBuilder) SetTLSConfig(c *tls.Config) *TCPClientBuilder

SetTLSConfig sets the tls config.

Here's an example:

tc := &tls.Config{
	InsecureSkipVerify: true,
}

type TCPServerBuilder added in v0.6.0

type TCPServerBuilder struct {
	// contains filtered or unexported fields
}

TCPServerBuilder provides builder which can be used to create a server-side TCP transport easily.

func TCPServer added in v0.6.0

func TCPServer() *TCPServerBuilder

TCPServer creates a new TCPServerBuilder

func (*TCPServerBuilder) Build added in v0.6.0

Build builds and returns a new TCP ServerTransporter.

func (*TCPServerBuilder) SetAddr added in v0.6.0

func (ts *TCPServerBuilder) SetAddr(addr string) *TCPServerBuilder

SetAddr sets the addr.

func (*TCPServerBuilder) SetHostAndPort added in v0.6.0

func (ts *TCPServerBuilder) SetHostAndPort(host string, port int) *TCPServerBuilder

SetHostAndPort sets the host and port.

func (*TCPServerBuilder) SetTLSConfig added in v0.6.0

func (ts *TCPServerBuilder) SetTLSConfig(c *tls.Config) *TCPServerBuilder

SetTLSConfig sets the tls config.

You can generate cert.pem and key.pem for local testing:

 go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost

 Load X509
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
	panic(err)
}
// Init TLS configuration.
tc := &tls.Config{
	MinVersion:               tls.VersionTLS12,
	CurvePreferences:         []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
	PreferServerCipherSuites: true,
	CipherSuites: []uint16{
		tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
		tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
		tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
		tls.TLS_RSA_WITH_AES_256_CBC_SHA,
	},
	Certificates: []tls.Certificate{cert},
}

type ToClientStarter added in v0.6.0

type ToClientStarter interface {
	// Transport set generator func for current RSocket client.
	//
	// Examples:
	//
	// rsocket.TCPClient().SetHostAndPort("127.0.0.1", 7878).Build()
	// rsocket.WebsocketClient().SetURL("ws://127.0.0.1:8080/hello").Build()
	// rsocket.UnixClient().SetPath("/var/run/rsocket.sock").Build()
	Transport(transport.ClientTransporter) ClientStarter
}

ToClientStarter is used to build a RSocket client with custom Transport.

type ToServerStarter added in v0.6.0

type ToServerStarter interface {
	// Transport specify transport generator func.
	// Example:
	// rsocket.TCPServer().SetAddr(":8888").Build()
	Transport(t transport.ServerTransporter) Start
}

ToServerStarter is used to build a RSocket server with custom Transport string.

type UnixClientBuilder added in v0.6.0

type UnixClientBuilder struct {
	// contains filtered or unexported fields
}

UnixClientBuilder provides builder which can be used to create a client-side UDS transport easily.

func UnixClient added in v0.6.0

func UnixClient() *UnixClientBuilder

UnixClient creates a new UnixClientBuilder.

func (UnixClientBuilder) Build added in v0.6.0

Build builds and returns a new ClientTransporter.

func (*UnixClientBuilder) SetPath added in v0.6.0

func (uc *UnixClientBuilder) SetPath(path string) *UnixClientBuilder

SetPath sets UDS sock file path.

type UnixServerBuilder added in v0.6.0

type UnixServerBuilder struct {
	// contains filtered or unexported fields
}

UnixServerBuilder provides builder which can be used to create a server-side UDS transport easily.

func UnixServer added in v0.6.0

func UnixServer() *UnixServerBuilder

UnixServer creates a new UnixServerBuilder.

func (*UnixServerBuilder) Build added in v0.6.0

Build builds and returns a new ServerTransporter.

func (*UnixServerBuilder) SetPath added in v0.6.0

func (us *UnixServerBuilder) SetPath(path string) *UnixServerBuilder

SetPath sets UDS sock file path.

type WebsocketClientBuilder added in v0.6.0

type WebsocketClientBuilder struct {
	// contains filtered or unexported fields
}

WebsocketClientBuilder provides builder which can be used to create a client-side Websocket transport easily.

func WebsocketClient added in v0.6.0

func WebsocketClient() *WebsocketClientBuilder

WebsocketClient creates a new WebsocketClientBuilder.

func (*WebsocketClientBuilder) Build added in v0.6.0

Build builds and returns a new websocket ClientTransporter

func (*WebsocketClientBuilder) SetHeader added in v0.6.0

SetHeader sets header.

func (*WebsocketClientBuilder) SetProxy added in v0.7.13

func (wc *WebsocketClientBuilder) SetProxy(proxy func(*http.Request) (*url.URL, error)) *WebsocketClientBuilder

SetProxy sets proxy.

func (*WebsocketClientBuilder) SetTLSConfig added in v0.6.0

SetTLSConfig sets the tls config.

Here's an example:

tc := &tls.Config{
	InsecureSkipVerify: true,
}

func (*WebsocketClientBuilder) SetURL added in v0.6.0

SetURL sets the target url. Example: ws://127.0.0.1:7878/hello/world

type WebsocketServerBuilder added in v0.6.0

type WebsocketServerBuilder struct {
	// contains filtered or unexported fields
}

WebsocketServerBuilder provides builder which can be used to create a server-side Websocket transport easily.

func WebsocketServer added in v0.6.0

func WebsocketServer() *WebsocketServerBuilder

WebsocketServer creates a new WebsocketServerBuilder.

func (*WebsocketServerBuilder) Build added in v0.6.0

Build builds and returns a new websocket ServerTransporter.

func (*WebsocketServerBuilder) SetAddr added in v0.6.0

SetAddr sets the websocket listen addr. Default addr is "127.0.0.1:7878".

func (*WebsocketServerBuilder) SetPath added in v0.6.0

SetPath sets the path of websocket.

func (*WebsocketServerBuilder) SetTLSConfig added in v0.6.0

SetTLSConfig sets the tls config.

You can generate cert.pem and key.pem for local testing:

 go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost

 Load X509
cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
	panic(err)
}
// Init TLS configuration.
tc := &tls.Config{
	MinVersion:               tls.VersionTLS12,
	CurvePreferences:         []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
	PreferServerCipherSuites: true,
	CipherSuites: []uint16{
		tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
		tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
		tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
		tls.TLS_RSA_WITH_AES_256_CBC_SHA,
	},
	Certificates: []tls.Certificate{cert},
}

func (*WebsocketServerBuilder) SetUpgrader added in v0.6.0

SetUpgrader sets websocket upgrader. You can customize your own websocket upgrader instead of the default upgrader.

Example(also the default value):

upgrader := &websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
}

Directories

Path Synopsis
Package balancer defines APIs for load balancing in RSocket.
Package balancer defines APIs for load balancing in RSocket.
cmd
examples
internal
buffer
Package buffer provides an implementation of an unbounded buffer.
Package buffer provides an implementation of an unbounded buffer.
u24
rx

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL