Documentation ¶
Overview ¶
Package zmq4 implements the ØMQ sockets and protocol for ZeroMQ-4.
For more informations, see http://zeromq.org.
Index ¶
- Constants
- Variables
- func RegisterTransport(name string, trans transport.Transport) error
- func Transports() []string
- type Cmd
- type Conn
- func (c *Conn) Close() error
- func (conn *Conn) Closed() bool
- func (c *Conn) Read(p []byte) (int, error)
- func (c *Conn) RecvCmd() (Cmd, error)
- func (c *Conn) RecvMsg() (Msg, error)
- func (c *Conn) SendCmd(name string, body []byte) error
- func (c *Conn) SendMsg(msg Msg) error
- func (conn *Conn) SetClosed()
- func (c *Conn) Write(p []byte) (int, error)
- type Metadata
- type Msg
- type MsgType
- type Option
- func WithAutomaticReconnect(automaticReconnect bool) Option
- func WithDialerMaxRetries(maxRetries int) Option
- func WithDialerRetry(retry time.Duration) Option
- func WithDialerTimeout(timeout time.Duration) Option
- func WithID(id SocketIdentity) Option
- func WithLogger(msg *log.Logger) Option
- func WithSecurity(sec Security) Option
- func WithTimeout(timeout time.Duration) Option
- type Property
- type Proxy
- type Queue
- type Security
- type SecurityType
- type Socket
- func NewDealer(ctx context.Context, opts ...Option) Socket
- func NewPair(ctx context.Context, opts ...Option) Socket
- func NewPub(ctx context.Context, opts ...Option) Socket
- func NewPull(ctx context.Context, opts ...Option) Socket
- func NewPush(ctx context.Context, opts ...Option) Socket
- func NewRep(ctx context.Context, opts ...Option) Socket
- func NewReq(ctx context.Context, opts ...Option) Socket
- func NewRouter(ctx context.Context, opts ...Option) Socket
- func NewSub(ctx context.Context, opts ...Option) Socket
- func NewXPub(ctx context.Context, opts ...Option) Socket
- func NewXSub(ctx context.Context, opts ...Option) Socket
- type SocketIdentity
- type SocketType
- type Topics
- type UnknownTransportError
Constants ¶
const ( CmdCancel = "CANCEL" CmdError = "ERROR" CmdHello = "HELLO" CmdInitiate = "INITIATE" CmdPing = "PING" CmdPong = "PONG" CmdReady = "READY" CmdSubscribe = "SUBSCRIBE" CmdUnsubscribe = "UNSUBSCRIBE" CmdWelcome = "WELCOME" )
ZMTP commands as per:
https://rfc.zeromq.org/spec:23/ZMTP/#commands
const ( OptionSubscribe = "SUBSCRIBE" OptionUnsubscribe = "UNSUBSCRIBE" OptionHWM = "HWM" )
Variables ¶
var ( ErrBadCmd = errors.New("zmq4: invalid command name") ErrBadFrame = errors.New("zmq4: invalid frame") )
var (
ErrBadProperty = errors.New("zmq4: bad property")
)
var ErrClosedConn = errors.New("zmq4: read/write on closed connection")
Functions ¶
func RegisterTransport ¶
RegisterTransport registers a new transport with the zmq4 package.
func Transports ¶
func Transports() []string
Transports returns the sorted list of currently registered transports.
Types ¶
type Conn ¶
type Conn struct { Server bool Meta Metadata Peer struct { Server bool Meta Metadata } // contains filtered or unexported fields }
Conn implements the ZeroMQ Message Transport Protocol as defined in https://rfc.zeromq.org/spec:23/ZMTP/.
func Open ¶
func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity, server bool, onCloseErrorCB func(c *Conn)) (*Conn, error)
Open opens a ZMTP connection over rw with the given security, socket type and identity. An optional onCloseErrorCB can be provided to inform the caller when this Conn is closed. Open performs a complete ZMTP handshake.
type Metadata ¶
Metadata is describing a Conn's metadata information.
func (Metadata) MarshalZMTP ¶
MarshalZMTP marshals MetaData to ZMTP encoded data.
func (*Metadata) UnmarshalZMTP ¶
UnmarshalZMTP unmarshals MetaData from a ZMTP encoded data.
type Msg ¶
Msg is a ZMTP message, possibly composed of multiple frames.
func NewMsgFrom ¶
func NewMsgFromString ¶
func NewMsgString ¶
type Option ¶
type Option func(s *socket)
Option configures some aspect of a ZeroMQ socket. (e.g. SocketIdentity, Security, ...)
func WithAutomaticReconnect ¶
WithAutomaticReconnect allows to configure a socket to automatically reconnect on connection loss.
func WithDialerMaxRetries ¶
WithDialerMaxRetries configures the maximum number of retries when dialing an endpoint (-1 means infinite retries).
func WithDialerRetry ¶
WithDialerRetry configures the time to wait before two failed attempts at dialing an endpoint.
func WithDialerTimeout ¶
WithDialerTimeout sets the maximum amount of time a dial will wait for a connect to complete.
func WithLogger ¶
WithLogger sets a dedicated log.Logger for the socket.
func WithSecurity ¶
WithSecurity configures a ZeroMQ socket to use the given security mechanism. If the security mechanims is nil, the NULL mechanism is used.
func WithTimeout ¶
WithTimeout sets the timeout value for socket operations
type Property ¶
Property describes a Conn metadata's entry. The on-wire respresentation of Property is specified by:
https://rfc.zeromq.org/spec:23/ZMTP/
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
Proxy connects a frontend socket to a backend socket.
func NewProxy ¶
NewProxy creates a new Proxy value. It proxies messages received on the frontend to the backend (and vice versa) If capture is not nil, messages proxied are also sent on that socket.
Conceptually, data flows from frontend to backend. Depending on the socket types, replies may flow in the opposite direction. The direction is conceptual only; the proxy is fully symmetric and there is no technical difference between frontend and backend.
Before creating a Proxy, users must set any socket options, and Listen or Dial both frontend and backend sockets.
type Security ¶
type Security interface { // Type returns the security mechanism type. Type() SecurityType // Handshake implements the ZMTP security handshake according to // this security mechanism. // see: // https://rfc.zeromq.org/spec:23/ZMTP/ // https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/ // https://rfc.zeromq.org/spec:25/ZMTP-CURVE/ Handshake(conn *Conn, server bool) error // Encrypt writes the encrypted form of data to w. Encrypt(w io.Writer, data []byte) (int, error) // Decrypt writes the decrypted form of data to w. Decrypt(w io.Writer, data []byte) (int, error) }
Security is an interface for ZMTP security mechanisms
type SecurityType ¶
type SecurityType string
SecurityType denotes types of ZMTP security mechanisms
const ( // NullSecurityType is an empty security mechanism // that does no authentication nor encryption. NullSecurity SecurityType = "NULL" // PlainSecurity is a security mechanism that uses // plaintext passwords. It is a reference implementation and // should not be used to anything important. PlainSecurity SecurityType = "PLAIN" // CurveSecurity uses ZMQ_CURVE for authentication // and encryption. CurveSecurity SecurityType = "CURVE" )
type Socket ¶
type Socket interface { // Close closes the open Socket. Close() error // Send puts the message on the outbound send queue. // // Send blocks until the message can be queued or the send deadline expires. Send(msg Msg) error // SendMulti puts the message on the outbound send queue. // // SendMulti blocks until the message can be queued or the send deadline // expires. The message will be sent as a multipart message. SendMulti(msg Msg) error // Recv receives a complete message. Recv() (Msg, error) // Listen connects a local endpoint to the Socket. // // In ZeroMQ's terminology, it binds. Listen(ep string) error // Dial connects a remote endpoint to the Socket. // // In ZeroMQ's terminology, it connects. Dial(ep string) error // Type returns the type of this Socket (for example PUB, SUB, etc.) Type() SocketType // Addr returns the listener's address. It returns nil if the socket isn't a // listener. Addr() net.Addr // GetOption retrieves an option for a socket. GetOption(name string) (interface{}, error) // SetOption sets an option for a socket. SetOption(name string, value interface{}) error }
Socket represents a ZeroMQ socket.
func NewDealer ¶
NewDealer returns a new DEALER ZeroMQ socket. The returned socket value is initially unbound.
func NewPair ¶
NewPair returns a new PAIR ZeroMQ socket. The returned socket value is initially unbound.
func NewPub ¶
NewPub returns a new PUB ZeroMQ socket. The returned socket value is initially unbound.
func NewPull ¶
NewPull returns a new PULL ZeroMQ socket. The returned socket value is initially unbound.
func NewPush ¶
NewPush returns a new PUSH ZeroMQ socket. The returned socket value is initially unbound.
func NewRep ¶
NewRep returns a new REP ZeroMQ socket. The returned socket value is initially unbound.
func NewReq ¶
NewReq returns a new REQ ZeroMQ socket. The returned socket value is initially unbound.
func NewRouter ¶
NewRouter returns a new ROUTER ZeroMQ socket. The returned socket value is initially unbound.
func NewSub ¶
NewSub returns a new SUB ZeroMQ socket. The returned socket value is initially unbound.
type SocketIdentity ¶
type SocketIdentity []byte
SocketIdentity is the ZMTP metadata socket identity. See:
https://rfc.zeromq.org/spec:23/ZMTP/.
func (SocketIdentity) String ¶
func (id SocketIdentity) String() string
type SocketType ¶
type SocketType string
SocketType is a ZeroMQ socket type.
const ( Pair SocketType = "PAIR" // a ZMQ_PAIR socket Pub SocketType = "PUB" // a ZMQ_PUB socket Sub SocketType = "SUB" // a ZMQ_SUB socket Req SocketType = "REQ" // a ZMQ_REQ socket Rep SocketType = "REP" // a ZMQ_REP socket Dealer SocketType = "DEALER" // a ZMQ_DEALER socket Router SocketType = "ROUTER" // a ZMQ_ROUTER socket Pull SocketType = "PULL" // a ZMQ_PULL socket Push SocketType = "PUSH" // a ZMQ_PUSH socket XPub SocketType = "XPUB" // a ZMQ_XPUB socket XSub SocketType = "XSUB" // a ZMQ_XSUB socket )
func (SocketType) IsCompatible ¶
func (sck SocketType) IsCompatible(peer SocketType) bool
IsCompatible checks whether two sockets are compatible and thus can be connected together. See https://rfc.zeromq.org/spec:23/ZMTP/ for more informations.
type Topics ¶
type Topics interface { // Topics returns the sorted list of topics a socket is subscribed to. Topics() []string }
Topics is an interface that wraps the basic Topics method.
type UnknownTransportError ¶
type UnknownTransportError struct {
Name string
}
UnknownTransportError records an error when trying to use an unknown transport.
func (UnknownTransportError) Error ¶
func (ute UnknownTransportError) Error() string
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
errgroup
Package errgroup is bit more advanced than golang.org/x/sync/errgroup.
|
Package errgroup is bit more advanced than golang.org/x/sync/errgroup. |
inproc
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns.
|
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns. |
security
|
|
null
Package null provides the ZeroMQ NULL security mechanism
|
Package null provides the ZeroMQ NULL security mechanism |
plain
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
|
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/ |
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages.
|
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages. |