Documentation ¶
Index ¶
- func IsSocketClosedError(err error) bool
- func ToIntRequestN(n uint32) int
- func ToUint32RequestN(n int) uint32
- type AbstractRSocket
- func (a AbstractRSocket) FireAndForget(message payload.Payload)
- func (a AbstractRSocket) MetadataPush(message payload.Payload)
- func (a AbstractRSocket) RequestChannel(messages flux.Flux) flux.Flux
- func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
- func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
- type BaseSocket
- func (p *BaseSocket) Addr() (string, bool)
- func (p *BaseSocket) Close() error
- func (p *BaseSocket) FireAndForget(message payload.Payload)
- func (p *BaseSocket) MetadataPush(message payload.Payload)
- func (p *BaseSocket) OnClose(fn func(error))
- func (p *BaseSocket) RequestChannel(messages flux.Flux) flux.Flux
- func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono
- func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux
- func (p *BaseSocket) SetAddr(addr string)
- type ClientSocket
- type Closeable
- type DuplexConnection
- func (dc *DuplexConnection) Close() error
- func (dc *DuplexConnection) FireAndForget(req payload.Payload)
- func (dc *DuplexConnection) GetError() (err error)
- func (dc *DuplexConnection) LoopWrite(ctx context.Context) error
- func (dc *DuplexConnection) MetadataPush(payload payload.Payload)
- func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux)
- func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono)
- func (dc *DuplexConnection) RequestStream(sending payload.Payload) (ret flux.Flux)
- func (dc *DuplexConnection) SetError(err error)
- func (dc *DuplexConnection) SetResponder(responder Responder)
- func (dc *DuplexConnection) SetTransport(tp *transport.Transport) (ok bool)
- type Keepaliver
- type Responder
- type ServerSocket
- type SetupInfo
- type StreamID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsSocketClosedError ¶ added in v0.6.0
IsSocketClosedError returns true if input error is for socket closed.
func ToIntRequestN ¶ added in v0.6.0
ToIntRequestN converts n to valid request n.
func ToUint32RequestN ¶ added in v0.6.0
ToUint32RequestN converts n to valid request n.
Types ¶
type AbstractRSocket ¶
type AbstractRSocket struct { FF func(payload.Payload) MP func(payload.Payload) RR func(payload.Payload) mono.Mono RS func(payload.Payload) flux.Flux RC func(flux.Flux) flux.Flux }
AbstractRSocket represents an abstract RSocket.
func (AbstractRSocket) FireAndForget ¶
func (a AbstractRSocket) FireAndForget(message payload.Payload)
FireAndForget starts a request of FireAndForget.
func (AbstractRSocket) MetadataPush ¶
func (a AbstractRSocket) MetadataPush(message payload.Payload)
MetadataPush starts a request of MetadataPush.
func (AbstractRSocket) RequestChannel ¶
func (a AbstractRSocket) RequestChannel(messages flux.Flux) flux.Flux
RequestChannel starts a request of RequestChannel.
func (AbstractRSocket) RequestResponse ¶
func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse starts a request of RequestResponse.
func (AbstractRSocket) RequestStream ¶
func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream starts a request of RequestStream.
type BaseSocket ¶ added in v0.6.0
type BaseSocket struct {
// contains filtered or unexported fields
}
BaseSocket is basic socket.
func NewBaseSocket ¶ added in v0.6.0
func NewBaseSocket(rawSocket *DuplexConnection) *BaseSocket
NewBaseSocket creates a new BaseSocket.
func (*BaseSocket) Addr ¶ added in v0.7.13
func (p *BaseSocket) Addr() (string, bool)
func (*BaseSocket) FireAndForget ¶ added in v0.6.0
func (p *BaseSocket) FireAndForget(message payload.Payload)
FireAndForget sends FireAndForget request.
func (*BaseSocket) MetadataPush ¶ added in v0.6.0
func (p *BaseSocket) MetadataPush(message payload.Payload)
MetadataPush sends MetadataPush request.
func (*BaseSocket) OnClose ¶ added in v0.6.0
func (p *BaseSocket) OnClose(fn func(error))
OnClose registers handler when socket closed.
func (*BaseSocket) RequestChannel ¶ added in v0.6.0
func (p *BaseSocket) RequestChannel(messages flux.Flux) flux.Flux
RequestChannel sends RequestChannel request.
func (*BaseSocket) RequestResponse ¶ added in v0.6.0
func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse sends RequestResponse request.
func (*BaseSocket) RequestStream ¶ added in v0.6.0
func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream sends RequestStream request.
func (*BaseSocket) SetAddr ¶ added in v0.7.13
func (p *BaseSocket) SetAddr(addr string)
type ClientSocket ¶
type ClientSocket interface { Closeable Responder // Setup setups current socket. Setup(ctx context.Context, connectTimeout time.Duration, setup *SetupInfo) error }
ClientSocket represents a client-side socket.
func NewClient ¶
func NewClient(tp transport.ClientTransporter, socket *DuplexConnection) ClientSocket
NewClient create a simple client-side socket.
func NewResumableClientSocket ¶ added in v0.6.0
func NewResumableClientSocket(tp transport.ClientTransporter, socket *DuplexConnection) ClientSocket
NewResumableClientSocket creates a client-side socket with resume support.
type Closeable ¶
type Closeable interface { io.Closer // OnClose bind a handler when closing. OnClose(closer func(error)) }
Closeable represents a closeable target.
type DuplexConnection ¶ added in v0.6.0
type DuplexConnection struct {
// contains filtered or unexported fields
}
DuplexConnection represents a socket of RSocket which can be a requester or a responder.
func NewClientDuplexConnection ¶ added in v0.6.0
func NewClientDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Scheduler, mtu int, keepaliveInterval time.Duration) *DuplexConnection
NewClientDuplexConnection creates a new client-side DuplexConnection.
func NewServerDuplexConnection ¶ added in v0.6.0
func NewServerDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Scheduler, mtu int, leases lease.Factory) *DuplexConnection
NewServerDuplexConnection creates a new server-side DuplexConnection.
func (*DuplexConnection) Close ¶ added in v0.6.0
func (dc *DuplexConnection) Close() error
Close close current socket.
func (*DuplexConnection) FireAndForget ¶ added in v0.6.0
func (dc *DuplexConnection) FireAndForget(req payload.Payload)
FireAndForget start a request of FireAndForget.
func (*DuplexConnection) GetError ¶ added in v0.6.0
func (dc *DuplexConnection) GetError() (err error)
GetError get the error set.
func (*DuplexConnection) LoopWrite ¶ added in v0.6.0
func (dc *DuplexConnection) LoopWrite(ctx context.Context) error
LoopWrite start write loop
func (*DuplexConnection) MetadataPush ¶ added in v0.6.0
func (dc *DuplexConnection) MetadataPush(payload payload.Payload)
MetadataPush start a request of MetadataPush.
func (*DuplexConnection) RequestChannel ¶ added in v0.6.0
func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux)
RequestChannel start a request of RequestChannel.
func (*DuplexConnection) RequestResponse ¶ added in v0.6.0
func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono)
RequestResponse start a request of RequestResponse.
func (*DuplexConnection) RequestStream ¶ added in v0.6.0
func (dc *DuplexConnection) RequestStream(sending payload.Payload) (ret flux.Flux)
RequestStream start a request of RequestStream.
func (*DuplexConnection) SetError ¶ added in v0.6.0
func (dc *DuplexConnection) SetError(err error)
SetError sets error for current socket.
func (*DuplexConnection) SetResponder ¶ added in v0.6.0
func (dc *DuplexConnection) SetResponder(responder Responder)
SetResponder sets a responder for current socket.
func (*DuplexConnection) SetTransport ¶ added in v0.6.0
func (dc *DuplexConnection) SetTransport(tp *transport.Transport) (ok bool)
SetTransport sets a transport for current socket.
type Keepaliver ¶ added in v0.6.0
type Keepaliver struct {
// contains filtered or unexported fields
}
Keepaliver controls connection keepalive.
func NewKeepaliver ¶ added in v0.6.0
func NewKeepaliver(interval time.Duration) *Keepaliver
NewKeepaliver creates a new keepaliver.
func (Keepaliver) Done ¶ added in v0.6.0
func (p Keepaliver) Done() <-chan struct{}
Done returns done chan.
type Responder ¶
type Responder interface { // FireAndForget is a single one-way message. FireAndForget(message payload.Payload) // MetadataPush sends asynchronous Metadata frame. MetadataPush(message payload.Payload) // RequestResponse request single response. RequestResponse(message payload.Payload) mono.Mono // RequestStream request a completable stream. RequestStream(message payload.Payload) flux.Flux // RequestChannel request a completable stream in both directions. RequestChannel(messages flux.Flux) flux.Flux }
Responder is a contract providing different interaction models for RSocket protocol.
type ServerSocket ¶
type ServerSocket interface { Closeable Responder // SetResponder sets a responder for current socket. SetResponder(responder Responder) // SetTransport sets a transport for current socket. SetTransport(tp *transport.Transport) // Pause pause current socket. Pause() bool // Start starts current socket. Start(ctx context.Context) error // Token returns token of socket. Token() (token []byte, ok bool) // SetAddr sets address info. SetAddr(addr string) }
ServerSocket represents a server-side socket.
func NewResumableServerSocket ¶ added in v0.6.0
func NewResumableServerSocket(socket *DuplexConnection, token []byte) ServerSocket
NewResumableServerSocket creates a new server-side socket with resume support.
func NewSimpleServerSocket ¶ added in v0.6.0
func NewSimpleServerSocket(socket *DuplexConnection) ServerSocket
NewSimpleServerSocket creates a new server-side socket.
Source Files ¶
- abstract_socket.go
- base_socket.go
- callback.go
- duplex.go
- keepaliver.go
- leaser.go
- misc.go
- resumable_client_socket.go
- resumable_server_socket.go
- simple_client_socket.go
- simple_server_socket.go
- stream_id.go
- subscriber_request_channel.go
- subscriber_request_response.go
- subscriber_request_stream.go
- types.go