socket

package
v0.8.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsSocketClosedError added in v0.6.0

func IsSocketClosedError(err error) bool

IsSocketClosedError returns true if input error is for socket closed.

func ToIntRequestN added in v0.6.0

func ToIntRequestN(n uint32) int

ToIntRequestN converts n to valid request n.

func ToUint32RequestN added in v0.6.0

func ToUint32RequestN(n int) uint32

ToUint32RequestN converts n to valid request n.

Types

type AbstractRSocket

type AbstractRSocket struct {
	FF func(payload.Payload)
	MP func(payload.Payload)
	RR func(payload.Payload) mono.Mono
	RS func(payload.Payload) flux.Flux
	RC func(flux.Flux) flux.Flux
}

AbstractRSocket represents an abstract RSocket.

func (AbstractRSocket) FireAndForget

func (a AbstractRSocket) FireAndForget(message payload.Payload)

FireAndForget starts a request of FireAndForget.

func (AbstractRSocket) MetadataPush

func (a AbstractRSocket) MetadataPush(message payload.Payload)

MetadataPush starts a request of MetadataPush.

func (AbstractRSocket) RequestChannel

func (a AbstractRSocket) RequestChannel(messages flux.Flux) flux.Flux

RequestChannel starts a request of RequestChannel.

func (AbstractRSocket) RequestResponse

func (a AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono

RequestResponse starts a request of RequestResponse.

func (AbstractRSocket) RequestStream

func (a AbstractRSocket) RequestStream(message payload.Payload) flux.Flux

RequestStream starts a request of RequestStream.

type BaseSocket added in v0.6.0

type BaseSocket struct {
	// contains filtered or unexported fields
}

BaseSocket is basic socket.

func NewBaseSocket added in v0.6.0

func NewBaseSocket(rawSocket *DuplexConnection) *BaseSocket

NewBaseSocket creates a new BaseSocket.

func (*BaseSocket) Addr added in v0.7.13

func (p *BaseSocket) Addr() (string, bool)

func (*BaseSocket) Close added in v0.6.0

func (p *BaseSocket) Close() error

Close closes socket.

func (*BaseSocket) FireAndForget added in v0.6.0

func (p *BaseSocket) FireAndForget(message payload.Payload)

FireAndForget sends FireAndForget request.

func (*BaseSocket) MetadataPush added in v0.6.0

func (p *BaseSocket) MetadataPush(message payload.Payload)

MetadataPush sends MetadataPush request.

func (*BaseSocket) OnClose added in v0.6.0

func (p *BaseSocket) OnClose(fn func(error))

OnClose registers handler when socket closed.

func (*BaseSocket) RequestChannel added in v0.6.0

func (p *BaseSocket) RequestChannel(messages flux.Flux) flux.Flux

RequestChannel sends RequestChannel request.

func (*BaseSocket) RequestResponse added in v0.6.0

func (p *BaseSocket) RequestResponse(message payload.Payload) mono.Mono

RequestResponse sends RequestResponse request.

func (*BaseSocket) RequestStream added in v0.6.0

func (p *BaseSocket) RequestStream(message payload.Payload) flux.Flux

RequestStream sends RequestStream request.

func (*BaseSocket) SetAddr added in v0.7.13

func (p *BaseSocket) SetAddr(addr string)

type ClientSocket

type ClientSocket interface {
	Closeable
	Responder
	// Setup setups current socket.
	Setup(ctx context.Context, connectTimeout time.Duration, setup *SetupInfo) error
}

ClientSocket represents a client-side socket.

func NewClient

NewClient create a simple client-side socket.

func NewResumableClientSocket added in v0.6.0

func NewResumableClientSocket(tp transport.ClientTransporter, socket *DuplexConnection) ClientSocket

NewResumableClientSocket creates a client-side socket with resume support.

type Closeable

type Closeable interface {
	io.Closer
	// OnClose bind a handler when closing.
	OnClose(closer func(error))
}

Closeable represents a closeable target.

type DuplexConnection added in v0.6.0

type DuplexConnection struct {
	// contains filtered or unexported fields
}

DuplexConnection represents a socket of RSocket which can be a requester or a responder.

func NewClientDuplexConnection added in v0.6.0

func NewClientDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Scheduler, mtu int, keepaliveInterval time.Duration) *DuplexConnection

NewClientDuplexConnection creates a new client-side DuplexConnection.

func NewServerDuplexConnection added in v0.6.0

func NewServerDuplexConnection(ctx context.Context, reqSche, resSche scheduler.Scheduler, mtu int, leases lease.Factory) *DuplexConnection

NewServerDuplexConnection creates a new server-side DuplexConnection.

func (*DuplexConnection) Close added in v0.6.0

func (dc *DuplexConnection) Close() error

Close close current socket.

func (*DuplexConnection) FireAndForget added in v0.6.0

func (dc *DuplexConnection) FireAndForget(req payload.Payload)

FireAndForget start a request of FireAndForget.

func (*DuplexConnection) GetError added in v0.6.0

func (dc *DuplexConnection) GetError() (err error)

GetError get the error set.

func (*DuplexConnection) LoopWrite added in v0.6.0

func (dc *DuplexConnection) LoopWrite(ctx context.Context) error

LoopWrite start write loop

func (*DuplexConnection) MetadataPush added in v0.6.0

func (dc *DuplexConnection) MetadataPush(payload payload.Payload)

MetadataPush start a request of MetadataPush.

func (*DuplexConnection) RequestChannel added in v0.6.0

func (dc *DuplexConnection) RequestChannel(sending flux.Flux) (ret flux.Flux)

RequestChannel start a request of RequestChannel.

func (*DuplexConnection) RequestResponse added in v0.6.0

func (dc *DuplexConnection) RequestResponse(req payload.Payload) (res mono.Mono)

RequestResponse start a request of RequestResponse.

func (*DuplexConnection) RequestStream added in v0.6.0

func (dc *DuplexConnection) RequestStream(sending payload.Payload) (ret flux.Flux)

RequestStream start a request of RequestStream.

func (*DuplexConnection) SetError added in v0.6.0

func (dc *DuplexConnection) SetError(err error)

SetError sets error for current socket.

func (*DuplexConnection) SetResponder added in v0.6.0

func (dc *DuplexConnection) SetResponder(responder Responder)

SetResponder sets a responder for current socket.

func (*DuplexConnection) SetTransport added in v0.6.0

func (dc *DuplexConnection) SetTransport(tp *transport.Transport) (ok bool)

SetTransport sets a transport for current socket.

type Keepaliver added in v0.6.0

type Keepaliver struct {
	// contains filtered or unexported fields
}

Keepaliver controls connection keepalive.

func NewKeepaliver added in v0.6.0

func NewKeepaliver(interval time.Duration) *Keepaliver

NewKeepaliver creates a new keepaliver.

func (Keepaliver) C added in v0.6.0

func (p Keepaliver) C() <-chan time.Time

C returns ticker.C.

func (Keepaliver) Done added in v0.6.0

func (p Keepaliver) Done() <-chan struct{}

Done returns done chan.

func (Keepaliver) Stop added in v0.6.0

func (p Keepaliver) Stop()

Stop stops keepaliver.

type Responder

type Responder interface {
	// FireAndForget is a single one-way message.
	FireAndForget(message payload.Payload)
	// MetadataPush sends asynchronous Metadata frame.
	MetadataPush(message payload.Payload)
	// RequestResponse request single response.
	RequestResponse(message payload.Payload) mono.Mono
	// RequestStream request a completable stream.
	RequestStream(message payload.Payload) flux.Flux
	// RequestChannel request a completable stream in both directions.
	RequestChannel(messages flux.Flux) flux.Flux
}

Responder is a contract providing different interaction models for RSocket protocol.

type ServerSocket

type ServerSocket interface {
	Closeable
	Responder
	// SetResponder sets a responder for current socket.
	SetResponder(responder Responder)
	// SetTransport sets a transport for current socket.
	SetTransport(tp *transport.Transport)
	// Pause pause current socket.
	Pause() bool
	// Start starts current socket.
	Start(ctx context.Context) error
	// Token returns token of socket.
	Token() (token []byte, ok bool)
	// SetAddr sets address info.
	SetAddr(addr string)
}

ServerSocket represents a server-side socket.

func NewResumableServerSocket added in v0.6.0

func NewResumableServerSocket(socket *DuplexConnection, token []byte) ServerSocket

NewResumableServerSocket creates a new server-side socket with resume support.

func NewSimpleServerSocket added in v0.6.0

func NewSimpleServerSocket(socket *DuplexConnection) ServerSocket

NewSimpleServerSocket creates a new server-side socket.

type SetupInfo

type SetupInfo struct {
	Lease             bool
	Version           core.Version
	KeepaliveInterval time.Duration
	KeepaliveLifetime time.Duration
	Token             []byte
	DataMimeType      []byte
	Data              []byte
	MetadataMimeType  []byte
	Metadata          []byte
}

SetupInfo represents basic info of setup.

type StreamID added in v0.6.0

type StreamID interface {
	// Next returns next stream id.
	Next() (id uint32, firstLoop bool)
}

StreamID can be used to generate stream ids.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL