Documentation ¶
Overview ¶
Example ¶
// Serve a server err := Receive(). Acceptor(func(setup SetupPayload, sendingSocket CloseableRSocket) RSocket { return NewAbstractSocket( RequestResponse(func(msg Payload) Mono { log.Println("incoming request:", msg) return JustMono(NewString("Pong", time.Now().String())) }), ) }). Transport("tcp://127.0.0.1:7878"). Serve(context.Background()) if err != nil { panic(err) } // Connect to a server. cli, err := Connect(). SetupPayload(NewString("Hello World", "From Golang")). Transport("tcp://127.0.0.1:7878"). Start(context.Background()) if err != nil { panic(err) } defer func() { _ = cli.Close() }() cli.RequestResponse(NewString("Ping", time.Now().String())). DoOnSuccess(func(ctx context.Context, s Subscription, elem Payload) { log.Println("incoming response:", elem) }). Subscribe(context.Background())
Output:
Index ¶
- type Client
- type ClientBuilder
- type ClientResumeOptions
- type ClientSocketAcceptor
- type ClientStarter
- type ClientTransportBuilder
- type CloseableRSocket
- type OpServerResume
- type OptAbstractSocket
- func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket
- func MetadataPush(fn func(msg payload.Payload)) OptAbstractSocket
- func RequestChannel(fn func(msgs rx.Publisher) rx.Flux) OptAbstractSocket
- func RequestResponse(fn func(msg payload.Payload) rx.Mono) OptAbstractSocket
- func RequestStream(fn func(msg payload.Payload) rx.Flux) OptAbstractSocket
- type RSocket
- type ServerAcceptor
- type ServerBuilder
- type ServerTransportBuilder
- type Start
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶ added in v0.2.0
type Client interface { CloseableRSocket }
Client is Client Side of a RSocket socket. Sends Frames to a RSocket Server.
type ClientBuilder ¶
type ClientBuilder interface { ClientTransportBuilder // Fragment set fragmentation size which default is 16_777_215(16MB). Fragment(mtu int) ClientBuilder // KeepAlive defines current client keepalive settings. KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder // Resume enable resume for current RSocket. Resume(opts ...ClientResumeOptions) ClientBuilder // DataMimeType is used to set payload data MIME type. // Default MIME type is `application/binary`. DataMimeType(mime string) ClientBuilder // MetadataMimeType is used to set payload metadata MIME type. // Default MIME type is `application/binary`. MetadataMimeType(mime string) ClientBuilder // SetupPayload set the setup payload. SetupPayload(setup payload.Payload) ClientBuilder // OnClose register handler when client socket closed. OnClose(fn func()) ClientBuilder // Acceptor set acceptor for RSocket client. Acceptor(acceptor ClientSocketAcceptor) ClientTransportBuilder }
ClientBuilder can be used to build a RSocket client.
func Connect ¶
func Connect() ClientBuilder
Connect create a new RSocket client builder with default settings.
Example ¶
cli, err := Connect(). Resume(). Fragment(65535). SetupPayload(NewString("Hello", "World")). Acceptor(func(socket RSocket) RSocket { return NewAbstractSocket(RequestResponse(func(msg Payload) Mono { return JustMono(NewString("Pong", time.Now().String())) })) }). Transport("tcp://127.0.0.1:7878"). Start(context.Background()) if err != nil { panic(err) } defer func() { _ = cli.Close() }() // Simple FireAndForget. cli.FireAndForget(NewString("This is a FNF message.", "")) // Simple RequestResponse. cli.RequestResponse(NewString("This is a RequestResponse message.", "")). DoOnSuccess(func(ctx context.Context, s Subscription, elem Payload) { log.Println("response:", elem) }). Subscribe(context.Background()) // RequestStream with backpressure. (one by one) cli.RequestStream(NewString("This is a RequestStream message.", "")). DoOnNext(func(ctx context.Context, s Subscription, elem Payload) { log.Println("next element in stream:", elem) s.Request(1) }). DoOnSubscribe(func(ctx context.Context, s Subscription) { s.Request(1) }). Subscribe(context.Background()) // Simple RequestChannel. sendFlux := Range(0, 3). Map(func(n int) Payload { return NewString(fmt.Sprintf("This is a RequestChannel message #%d.", n), "") }) cli.RequestChannel(sendFlux). DoOnNext(func(ctx context.Context, s Subscription, elem Payload) { log.Println("next element in channel:", elem) }). Subscribe(context.Background())
Output:
type ClientResumeOptions ¶ added in v0.2.0
type ClientResumeOptions func(opts *resumeOpts)
ClientResumeOptions represents resume options for client.
func WithClientResumeToken ¶ added in v0.2.0
func WithClientResumeToken(gen func() []byte) ClientResumeOptions
WithClientResumeToken creates a resume token generator.
type ClientSocketAcceptor ¶
ClientSocketAcceptor is alias for RSocket handler function.
type ClientStarter ¶
type ClientStarter interface { // Start start a client socket. Start(ctx context.Context) (Client, error) }
ClientStarter can be used to start a client.
type ClientTransportBuilder ¶
type ClientTransportBuilder interface { // Transport set Transport for current RSocket client. // URI is used to create RSocket Transport: // Example: // "tcp://127.0.0.1:7878" means a TCP RSocket transport. // "ws://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport. (NOTICE: Websocket will be supported in the future). Transport(uri string) ClientStarter }
ClientTransportBuilder is used to build a RSocket client with custom Transport string.
type CloseableRSocket ¶ added in v0.2.0
CloseableRSocket is a RSocket which support more events.
type OpServerResume ¶ added in v0.2.0
type OpServerResume func(o *serverResumeOptions)
OpServerResume represents resume options for RSocket server.
func WithServerResumeSessionDuration ¶ added in v0.2.0
func WithServerResumeSessionDuration(duration time.Duration) OpServerResume
WithServerResumeSessionDuration sets resume session duration for RSocket server.
type OptAbstractSocket ¶
type OptAbstractSocket func(*socket.AbstractRSocket)
OptAbstractSocket is option for abstract socket.
func FireAndForget ¶
func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket
FireAndForget register request handler for FireAndForget.
func MetadataPush ¶
func MetadataPush(fn func(msg payload.Payload)) OptAbstractSocket
MetadataPush register request handler for MetadataPush.
func RequestChannel ¶
func RequestChannel(fn func(msgs rx.Publisher) rx.Flux) OptAbstractSocket
RequestChannel register request handler for RequestChannel.
func RequestResponse ¶
func RequestResponse(fn func(msg payload.Payload) rx.Mono) OptAbstractSocket
RequestResponse register request handler for RequestResponse.
func RequestStream ¶
func RequestStream(fn func(msg payload.Payload) rx.Flux) OptAbstractSocket
RequestStream register request handler for RequestStream.
type RSocket ¶
type RSocket interface { // FireAndForget is a single one-way message. FireAndForget(msg payload.Payload) // MetadataPush sends asynchronous Metadata frame. MetadataPush(msg payload.Payload) // RequestResponse request single response. RequestResponse(msg payload.Payload) rx.Mono // RequestStream request a completable stream. RequestStream(msg payload.Payload) rx.Flux // RequestChannel request a completable stream in both directions. RequestChannel(msgs rx.Publisher) rx.Flux }
RSocket is a contract providing different interaction models for RSocket protocol.
func NewAbstractSocket ¶
func NewAbstractSocket(opts ...OptAbstractSocket) RSocket
NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.
type ServerAcceptor ¶
type ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) RSocket
ServerAcceptor is alias for server accepter.
type ServerBuilder ¶
type ServerBuilder interface { // Fragment set fragmentation size which default is 16_777_215(16MB). Fragment(mtu int) ServerBuilder // Resume enable resume for current server. Resume(opts ...OpServerResume) ServerBuilder // Acceptor register server acceptor which is used to handle incoming RSockets. Acceptor(acceptor ServerAcceptor) ServerTransportBuilder }
ServerBuilder can be used to build a RSocket server.
func Receive ¶
func Receive() ServerBuilder
Receive receives server connections from client RSockets.
Example ¶
err := Receive(). Resume(WithServerResumeSessionDuration(30 * time.Second)). Fragment(65535). Acceptor(func(setup SetupPayload, sendingSocket CloseableRSocket) RSocket { // Handle close. sendingSocket.OnClose(func() { log.Println("sending socket is closed") }) // Request to client. sendingSocket.RequestResponse(NewString("Ping", time.Now().String())). DoOnSuccess(func(ctx context.Context, s Subscription, elem Payload) { log.Println("response of Ping from client:", elem) }). SubscribeOn(ElasticScheduler()). Subscribe(context.Background()) // Return responser which just echo. return NewAbstractSocket( FireAndForget(func(msg Payload) { log.Println("receive fnf:", msg) }), RequestResponse(func(msg Payload) Mono { return JustMono(msg) }), RequestStream(func(msg Payload) Flux { return Range(0, 3).Map(func(n int) Payload { return NewString(msg.DataUTF8(), fmt.Sprintf("This is response #%04d", n)) }) }), RequestChannel(func(msgs Publisher) Flux { return ToFlux(msgs) }), ) }). Transport("tcp://0.0.0.0:7878"). Serve(context.Background()) panic(err)
Output:
type ServerTransportBuilder ¶
type ServerTransportBuilder interface { // Transport specify transport string. Transport(transport string) Start }
ServerTransportBuilder is used to build a RSocket server with custom Transport string.