zmq4

package module
v0.99.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2024 License: BSD-3-Clause Imports: 20 Imported by: 2

README

zmq4

GitHub release go.dev reference CI codecov GoDoc License DOI

zmq4 is a pure-Go implementation of ØMQ (ZeroMQ), version 4.

See zeromq.org for more informations.

Development

zmq4 needs a caring maintainer. I (sbinet) have not much time to dedicate anymore to this project (as $WORK doesn't need it anymore).

License

zmq4 is released under the BSD-3 license.

Documentation

Documentation for zmq4 is served by GoDoc.

Documentation

Overview

Package zmq4 implements the ØMQ sockets and protocol for ZeroMQ-4.

For more informations, see http://zeromq.org.

Index

Constants

View Source
const (
	CmdCancel      = "CANCEL"
	CmdError       = "ERROR"
	CmdHello       = "HELLO"
	CmdInitiate    = "INITIATE"
	CmdPing        = "PING"
	CmdPong        = "PONG"
	CmdReady       = "READY"
	CmdSubscribe   = "SUBSCRIBE"
	CmdUnsubscribe = "UNSUBSCRIBE"
	CmdWelcome     = "WELCOME"
)

ZMTP commands as per:

https://rfc.zeromq.org/spec:23/ZMTP/#commands
View Source
const (
	OptionSubscribe   = "SUBSCRIBE"
	OptionUnsubscribe = "UNSUBSCRIBE"
	OptionHWM         = "HWM"
)

Variables

View Source
var (
	ErrBadCmd   = errors.New("zmq4: invalid command name")
	ErrBadFrame = errors.New("zmq4: invalid frame")
)
View Source
var (
	ErrBadProperty = errors.New("zmq4: bad property")
)
View Source
var ErrClosedConn = errors.New("zmq4: read/write on closed connection")

Functions

func RegisterTransport

func RegisterTransport(name string, trans transport.Transport) error

RegisterTransport registers a new transport with the zmq4 package.

func Transports

func Transports() []string

Transports returns the sorted list of currently registered transports.

Types

type Cmd

type Cmd struct {
	Name string
	Body []byte
}

Cmd is a ZMTP Cmd as per:

https://rfc.zeromq.org/spec:23/ZMTP/#formal-grammar

type Conn

type Conn struct {
	Server bool
	Meta   Metadata
	Peer   struct {
		Server bool
		Meta   Metadata
	}
	// contains filtered or unexported fields
}

Conn implements the ZeroMQ Message Transport Protocol as defined in https://rfc.zeromq.org/spec:23/ZMTP/.

func Open

func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity, server bool, onCloseErrorCB func(c *Conn)) (*Conn, error)

Open opens a ZMTP connection over rw with the given security, socket type and identity. An optional onCloseErrorCB can be provided to inform the caller when this Conn is closed. Open performs a complete ZMTP handshake.

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Closed

func (conn *Conn) Closed() bool

func (*Conn) Read

func (c *Conn) Read(p []byte) (int, error)

func (*Conn) RecvCmd

func (c *Conn) RecvCmd() (Cmd, error)

func (*Conn) RecvMsg

func (c *Conn) RecvMsg() (Msg, error)

RecvMsg receives a ZMTP message from the wire.

func (*Conn) SendCmd

func (c *Conn) SendCmd(name string, body []byte) error

SendCmd sends a ZMTP command over the wire.

func (*Conn) SendMsg

func (c *Conn) SendMsg(msg Msg) error

SendMsg sends a ZMTP message over the wire.

func (*Conn) SetClosed

func (conn *Conn) SetClosed()

func (*Conn) Write

func (c *Conn) Write(p []byte) (int, error)

type Metadata

type Metadata map[string]string

Metadata is describing a Conn's metadata information.

func (Metadata) MarshalZMTP

func (md Metadata) MarshalZMTP() ([]byte, error)

MarshalZMTP marshals MetaData to ZMTP encoded data.

func (*Metadata) UnmarshalZMTP

func (md *Metadata) UnmarshalZMTP(p []byte) error

UnmarshalZMTP unmarshals MetaData from a ZMTP encoded data.

type Msg

type Msg struct {
	Frames [][]byte
	Type   MsgType
	// contains filtered or unexported fields
}

Msg is a ZMTP message, possibly composed of multiple frames.

func NewMsg

func NewMsg(frame []byte) Msg

func NewMsgFrom

func NewMsgFrom(frames ...[]byte) Msg

func NewMsgFromString

func NewMsgFromString(frames []string) Msg

func NewMsgString

func NewMsgString(frame string) Msg

func (Msg) Bytes

func (msg Msg) Bytes() []byte

Bytes returns the concatenated content of all its frames.

func (Msg) Clone

func (msg Msg) Clone() Msg

func (Msg) Err

func (msg Msg) Err() error

func (Msg) String

func (msg Msg) String() string

type MsgType

type MsgType byte
const (
	UsrMsg MsgType = 0
	CmdMsg MsgType = 1
)

type Option

type Option func(s *socket)

Option configures some aspect of a ZeroMQ socket. (e.g. SocketIdentity, Security, ...)

func WithAutomaticReconnect

func WithAutomaticReconnect(automaticReconnect bool) Option

WithAutomaticReconnect allows to configure a socket to automatically reconnect on connection loss.

func WithDialerMaxRetries

func WithDialerMaxRetries(maxRetries int) Option

WithDialerMaxRetries configures the maximum number of retries when dialing an endpoint (-1 means infinite retries).

func WithDialerRetry

func WithDialerRetry(retry time.Duration) Option

WithDialerRetry configures the time to wait before two failed attempts at dialing an endpoint.

func WithDialerTimeout

func WithDialerTimeout(timeout time.Duration) Option

WithDialerTimeout sets the maximum amount of time a dial will wait for a connect to complete.

func WithID

func WithID(id SocketIdentity) Option

WithID configures a ZeroMQ socket identity.

func WithLogger

func WithLogger(msg *log.Logger) Option

WithLogger sets a dedicated log.Logger for the socket.

func WithSecurity

func WithSecurity(sec Security) Option

WithSecurity configures a ZeroMQ socket to use the given security mechanism. If the security mechanims is nil, the NULL mechanism is used.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets the timeout value for socket operations

type Property

type Property struct {
	K string
	V string
}

Property describes a Conn metadata's entry. The on-wire respresentation of Property is specified by:

https://rfc.zeromq.org/spec:23/ZMTP/

func (Property) Read

func (prop Property) Read(data []byte) (n int, err error)

func (*Property) Write

func (prop *Property) Write(data []byte) (n int, err error)

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

Proxy connects a frontend socket to a backend socket.

func NewProxy

func NewProxy(ctx context.Context, front, back, capture Socket) *Proxy

NewProxy creates a new Proxy value. It proxies messages received on the frontend to the backend (and vice versa) If capture is not nil, messages proxied are also sent on that socket.

Conceptually, data flows from frontend to backend. Depending on the socket types, replies may flow in the opposite direction. The direction is conceptual only; the proxy is fully symmetric and there is no technical difference between frontend and backend.

Before creating a Proxy, users must set any socket options, and Listen or Dial both frontend and backend sockets.

func (*Proxy) Kill

func (p *Proxy) Kill()

func (*Proxy) Pause

func (p *Proxy) Pause()

func (*Proxy) Resume

func (p *Proxy) Resume()

func (*Proxy) Run

func (p *Proxy) Run() error

Run runs the proxy loop.

func (*Proxy) Stats

func (p *Proxy) Stats()

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue() *Queue

func (*Queue) Init

func (q *Queue) Init()

func (*Queue) Len

func (q *Queue) Len() int

func (*Queue) Peek

func (q *Queue) Peek() (Msg, bool)

func (*Queue) Pop

func (q *Queue) Pop()

func (*Queue) Push

func (q *Queue) Push(val Msg)

type Security

type Security interface {
	// Type returns the security mechanism type.
	Type() SecurityType

	// Handshake implements the ZMTP security handshake according to
	// this security mechanism.
	// see:
	//  https://rfc.zeromq.org/spec:23/ZMTP/
	//  https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
	//  https://rfc.zeromq.org/spec:25/ZMTP-CURVE/
	Handshake(conn *Conn, server bool) error

	// Encrypt writes the encrypted form of data to w.
	Encrypt(w io.Writer, data []byte) (int, error)

	// Decrypt writes the decrypted form of data to w.
	Decrypt(w io.Writer, data []byte) (int, error)
}

Security is an interface for ZMTP security mechanisms

type SecurityType

type SecurityType string

SecurityType denotes types of ZMTP security mechanisms

const (
	// NullSecurityType is an empty security mechanism
	// that does no authentication nor encryption.
	NullSecurity SecurityType = "NULL"

	// PlainSecurity is a security mechanism that uses
	// plaintext passwords. It is a reference implementation and
	// should not be used to anything important.
	PlainSecurity SecurityType = "PLAIN"

	// CurveSecurity uses ZMQ_CURVE for authentication
	// and encryption.
	CurveSecurity SecurityType = "CURVE"
)

type Socket

type Socket interface {
	// Close closes the open Socket.
	Close() error

	// Send puts the message on the outbound send queue.
	//
	// Send blocks until the message can be queued or the send deadline expires.
	Send(msg Msg) error

	// SendMulti puts the message on the outbound send queue.
	//
	// SendMulti blocks until the message can be queued or the send deadline
	// expires. The message will be sent as a multipart message.
	SendMulti(msg Msg) error

	// Recv receives a complete message.
	Recv() (Msg, error)

	// Listen connects a local endpoint to the Socket.
	//
	// In ZeroMQ's terminology, it binds.
	Listen(ep string) error

	// Dial connects a remote endpoint to the Socket.
	//
	// In ZeroMQ's terminology, it connects.
	Dial(ep string) error

	// Type returns the type of this Socket (for example PUB, SUB, etc.)
	Type() SocketType

	// Addr returns the listener's address. It returns nil if the socket isn't a
	// listener.
	Addr() net.Addr

	// GetOption retrieves an option for a socket.
	GetOption(name string) (interface{}, error)

	// SetOption sets an option for a socket.
	SetOption(name string, value interface{}) error
}

Socket represents a ZeroMQ socket.

func NewDealer

func NewDealer(ctx context.Context, opts ...Option) Socket

NewDealer returns a new DEALER ZeroMQ socket. The returned socket value is initially unbound.

func NewPair

func NewPair(ctx context.Context, opts ...Option) Socket

NewPair returns a new PAIR ZeroMQ socket. The returned socket value is initially unbound.

func NewPub

func NewPub(ctx context.Context, opts ...Option) Socket

NewPub returns a new PUB ZeroMQ socket. The returned socket value is initially unbound.

func NewPull

func NewPull(ctx context.Context, opts ...Option) Socket

NewPull returns a new PULL ZeroMQ socket. The returned socket value is initially unbound.

func NewPush

func NewPush(ctx context.Context, opts ...Option) Socket

NewPush returns a new PUSH ZeroMQ socket. The returned socket value is initially unbound.

func NewRep

func NewRep(ctx context.Context, opts ...Option) Socket

NewRep returns a new REP ZeroMQ socket. The returned socket value is initially unbound.

func NewReq

func NewReq(ctx context.Context, opts ...Option) Socket

NewReq returns a new REQ ZeroMQ socket. The returned socket value is initially unbound.

func NewRouter

func NewRouter(ctx context.Context, opts ...Option) Socket

NewRouter returns a new ROUTER ZeroMQ socket. The returned socket value is initially unbound.

func NewSub

func NewSub(ctx context.Context, opts ...Option) Socket

NewSub returns a new SUB ZeroMQ socket. The returned socket value is initially unbound.

func NewXPub

func NewXPub(ctx context.Context, opts ...Option) Socket

NewXPub returns a new XPUB ZeroMQ socket. The returned socket value is initially unbound.

func NewXSub

func NewXSub(ctx context.Context, opts ...Option) Socket

NewXSub returns a new XSUB ZeroMQ socket. The returned socket value is initially unbound.

type SocketIdentity

type SocketIdentity []byte

SocketIdentity is the ZMTP metadata socket identity. See:

https://rfc.zeromq.org/spec:23/ZMTP/.

func (SocketIdentity) String

func (id SocketIdentity) String() string

type SocketType

type SocketType string

SocketType is a ZeroMQ socket type.

const (
	Pair   SocketType = "PAIR"   // a ZMQ_PAIR socket
	Pub    SocketType = "PUB"    // a ZMQ_PUB socket
	Sub    SocketType = "SUB"    // a ZMQ_SUB socket
	Req    SocketType = "REQ"    // a ZMQ_REQ socket
	Rep    SocketType = "REP"    // a ZMQ_REP socket
	Dealer SocketType = "DEALER" // a ZMQ_DEALER socket
	Router SocketType = "ROUTER" // a ZMQ_ROUTER socket
	Pull   SocketType = "PULL"   // a ZMQ_PULL socket
	Push   SocketType = "PUSH"   // a ZMQ_PUSH socket
	XPub   SocketType = "XPUB"   // a ZMQ_XPUB socket
	XSub   SocketType = "XSUB"   // a ZMQ_XSUB socket
)

func (SocketType) IsCompatible

func (sck SocketType) IsCompatible(peer SocketType) bool

IsCompatible checks whether two sockets are compatible and thus can be connected together. See https://rfc.zeromq.org/spec:23/ZMTP/ for more informations.

type Topics

type Topics interface {
	// Topics returns the sorted list of topics a socket is subscribed to.
	Topics() []string
}

Topics is an interface that wraps the basic Topics method.

type UnknownTransportError

type UnknownTransportError struct {
	Name string
}

UnknownTransportError records an error when trying to use an unknown transport.

func (UnknownTransportError) Error

func (ute UnknownTransportError) Error() string

Directories

Path Synopsis
internal
errgroup
Package errgroup is bit more advanced than golang.org/x/sync/errgroup.
Package errgroup is bit more advanced than golang.org/x/sync/errgroup.
inproc
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns.
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns.
security
null
Package null provides the ZeroMQ NULL security mechanism
Package null provides the ZeroMQ NULL security mechanism
plain
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages.
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL