Documentation ¶
Overview ¶
Package goczmq is a golang binding for CZMQ 3. CZMQ is a high level binding for ZeroMQ. Along with ZeroMQ socket support, CZMQ provides "actor" based services for authentication, service discovery, and creating proxies. GoCZMQ provides direct bindings to CZMQ along with higher level go abstractions such as channels and io.ReadWriter interface support.
"Tell them I was a writer.
A maker of software. A humanist. A father. And many things. But above all, a writer. Thank You. :) - Pieter Hintjens
Index ¶
- Constants
- Variables
- func Shutdown()
- type Auth
- type Beacon
- func (b *Beacon) Configure(port int) (string, error)
- func (b *Beacon) Destroy()
- func (b *Beacon) Publish(announcement string, interval int) error
- func (b *Beacon) PublishBytes(announcement []byte, interval int) error
- func (b *Beacon) Recv(timeout int) [][]byte
- func (b *Beacon) Subscribe(filter string) error
- func (b *Beacon) Verbose() error
- type Cert
- func (c *Cert) Apply(s *Sock)
- func (c *Cert) Destroy()
- func (c *Cert) Dup() *Cert
- func (c *Cert) Equal(compare *Cert) bool
- func (c *Cert) Meta(key string) string
- func (c *Cert) Print()
- func (c *Cert) PublicText() string
- func (c *Cert) Save(filename string) error
- func (c *Cert) SavePublic(filename string) error
- func (c *Cert) SaveSecret(filename string) error
- func (c *Cert) SetMeta(key string, value string)
- type CertStore
- type Channeler
- func NewDealerChanneler(endpoints string) *Channeler
- func NewPairChanneler(endpoints string) *Channeler
- func NewPubChanneler(endpoints string) *Channeler
- func NewPullChanneler(endpoints string) *Channeler
- func NewPushChanneler(endpoints string) *Channeler
- func NewRepChanneler(endpoints string) *Channeler
- func NewReqChanneler(endpoints string) *Channeler
- func NewRouterChanneler(endpoints string) *Channeler
- func NewStreamChanneler(endpoints string) *Channeler
- func NewSubChanneler(endpoints string, subscribe ...string) *Channeler
- func NewXPubChanneler(endpoints string) *Channeler
- func NewXSubChanneler(endpoints string) *Channeler
- type Poller
- type Proxy
- type ReadWriter
- type Sock
- func NewDealer(endpoints string) (*Sock, error)
- func NewPair(endpoints string) (*Sock, error)
- func NewPub(endpoints string) (*Sock, error)
- func NewPull(endpoints string) (*Sock, error)
- func NewPush(endpoints string) (*Sock, error)
- func NewRep(endpoints string) (*Sock, error)
- func NewReq(endpoints string) (*Sock, error)
- func NewRouter(endpoints string) (*Sock, error)
- func NewSock(t int) *Sock
- func NewStream(endpoints string) (*Sock, error)
- func NewSub(endpoints string, subscribe string) (*Sock, error)
- func NewXPub(endpoints string) (*Sock, error)
- func NewXSub(endpoints string) (*Sock, error)
- func (s *Sock) Affinity() int
- func (s *Sock) Attach(endpoints string, serverish bool) error
- func (s *Sock) Backlog() int
- func (s *Sock) Bind(endpoint string) (int, error)
- func (s *Sock) Connect(endpoint string) error
- func (s *Sock) ConnectTimeout() int
- func (s *Sock) CurvePublickey() string
- func (s *Sock) CurveSecretkey() string
- func (s *Sock) CurveServer() int
- func (s *Sock) CurveServerkey() string
- func (s *Sock) Destroy()
- func (s *Sock) Disconnect(endpoint string) error
- func (s *Sock) Events() int
- func (s *Sock) Fd() int
- func (s *Sock) GetLastClientID() []byte
- func (s *Sock) GetType() int
- func (s *Sock) GssapiPlaintext() int
- func (s *Sock) GssapiPrincipal() string
- func (s *Sock) GssapiServer() int
- func (s *Sock) GssapiServicePrincipal() string
- func (s *Sock) HandshakeIvl() int
- func (s *Sock) HeartbeatIvl() int
- func (s *Sock) HeartbeatTimeout() int
- func (s *Sock) HeartbeatTtl() int
- func (s *Sock) Identity() string
- func (s *Sock) Immediate() int
- func (s *Sock) InvertMatching() int
- func (s *Sock) Ipv4only() int
- func (s *Sock) Ipv6() int
- func (s *Sock) LastEndpoint() string
- func (s *Sock) Linger() int
- func (s *Sock) Maxmsgsize() int
- func (s *Sock) Mechanism() int
- func (s *Sock) MulticastHops() int
- func (s *Sock) MulticastMaxtpdu() int
- func (s *Sock) PlainPassword() string
- func (s *Sock) PlainServer() int
- func (s *Sock) PlainUsername() string
- func (s *Sock) Pollin() bool
- func (s *Sock) Pollout() bool
- func (s *Sock) Rate() int
- func (s *Sock) Rcvbuf() int
- func (s *Sock) Rcvhwm() int
- func (s *Sock) Rcvmore() int
- func (s *Sock) Rcvtimeo() int
- func (s *Sock) Read(p []byte) (int, error)
- func (s *Sock) ReconnectIvl() int
- func (s *Sock) ReconnectIvlMax() int
- func (s *Sock) RecoveryIvl() int
- func (s *Sock) RecvFrame() ([]byte, int, error)
- func (s *Sock) RecvFrameNoWait() ([]byte, int, error)
- func (s *Sock) RecvMessage() ([][]byte, error)
- func (s *Sock) RecvMessageNoWait() ([][]byte, error)
- func (s *Sock) SendFrame(data []byte, flags int) error
- func (s *Sock) SendMessage(parts [][]byte) error
- func (s *Sock) SetAffinity(val int)
- func (s *Sock) SetBacklog(val int)
- func (s *Sock) SetConflate(val int)
- func (s *Sock) SetConnectRid(val string)
- func (s *Sock) SetConnectTimeout(val int)
- func (s *Sock) SetCurvePublickey(val string)
- func (s *Sock) SetCurveSecretkey(val string)
- func (s *Sock) SetCurveServer(val int)
- func (s *Sock) SetCurveServerkey(val string)
- func (s *Sock) SetDelayAttachOnConnect(val int)
- func (s *Sock) SetGssapiPlaintext(val int)
- func (s *Sock) SetGssapiPrincipal(val string)
- func (s *Sock) SetGssapiServer(val int)
- func (s *Sock) SetGssapiServicePrincipal(val string)
- func (s *Sock) SetHandshakeIvl(val int)
- func (s *Sock) SetHeartbeatIvl(val int)
- func (s *Sock) SetHeartbeatTimeout(val int)
- func (s *Sock) SetHeartbeatTtl(val int)
- func (s *Sock) SetIdentity(val string)
- func (s *Sock) SetImmediate(val int)
- func (s *Sock) SetInvertMatching(val int)
- func (s *Sock) SetIpv4only(val int)
- func (s *Sock) SetIpv6(val int)
- func (s *Sock) SetLastClientID(id []byte)
- func (s *Sock) SetLinger(val int)
- func (s *Sock) SetMaxmsgsize(val int)
- func (s *Sock) SetMulticastHops(val int)
- func (s *Sock) SetMulticastMaxtpdu(val int)
- func (s *Sock) SetPlainPassword(val string)
- func (s *Sock) SetPlainServer(val int)
- func (s *Sock) SetPlainUsername(val string)
- func (s *Sock) SetProbeRouter(val int)
- func (s *Sock) SetRate(val int)
- func (s *Sock) SetRcvbuf(val int)
- func (s *Sock) SetRcvhwm(val int)
- func (s *Sock) SetRcvtimeo(val int)
- func (s *Sock) SetReconnectIvl(val int)
- func (s *Sock) SetReconnectIvlMax(val int)
- func (s *Sock) SetRecoveryIvl(val int)
- func (s *Sock) SetReqCorrelate(val int)
- func (s *Sock) SetReqRelaxed(val int)
- func (s *Sock) SetRouterHandover(val int)
- func (s *Sock) SetRouterMandatory(val int)
- func (s *Sock) SetRouterRaw(val int)
- func (s *Sock) SetSndbuf(val int)
- func (s *Sock) SetSndhwm(val int)
- func (s *Sock) SetSndtimeo(val int)
- func (s *Sock) SetSocksProxy(val string)
- func (s *Sock) SetStreamNotify(val int)
- func (s *Sock) SetSubscribe(val string)
- func (s *Sock) SetTcpAcceptFilter(val string)
- func (s *Sock) SetTcpKeepalive(val int)
- func (s *Sock) SetTcpKeepaliveCnt(val int)
- func (s *Sock) SetTcpKeepaliveIdle(val int)
- func (s *Sock) SetTcpKeepaliveIntvl(val int)
- func (s *Sock) SetTcpMaxrt(val int)
- func (s *Sock) SetTos(val int)
- func (s *Sock) SetUnsubscribe(val string)
- func (s *Sock) SetUseFd(val int)
- func (s *Sock) SetVmciBufferMaxSize(val int)
- func (s *Sock) SetVmciBufferMinSize(val int)
- func (s *Sock) SetVmciBufferSize(val int)
- func (s *Sock) SetVmciConnectTimeout(val int)
- func (s *Sock) SetXPubManual(val int)
- func (s *Sock) SetXPubNodrop(val int)
- func (s *Sock) SetXPubVerbose(val int)
- func (s *Sock) SetXPubVerboser(val int)
- func (s *Sock) SetXPubWelcomeMsg(val string)
- func (s *Sock) SetZapDomain(val string)
- func (s *Sock) Sndbuf() int
- func (s *Sock) Sndhwm() int
- func (s *Sock) Sndtimeo() int
- func (s *Sock) SocksProxy() string
- func (s *Sock) TcpAcceptFilter() string
- func (s *Sock) TcpKeepalive() int
- func (s *Sock) TcpKeepaliveCnt() int
- func (s *Sock) TcpKeepaliveIdle() int
- func (s *Sock) TcpKeepaliveIntvl() int
- func (s *Sock) TcpMaxrt() int
- func (s *Sock) ThreadSafe() int
- func (s *Sock) Tos() int
- func (s *Sock) Type() int
- func (s *Sock) Unbind(endpoint string) error
- func (s *Sock) UseFd() int
- func (s *Sock) VmciBufferMaxSize() int
- func (s *Sock) VmciBufferMinSize() int
- func (s *Sock) VmciBufferSize() int
- func (s *Sock) VmciConnectTimeout() int
- func (s *Sock) Write(p []byte) (int, error)
- func (s *Sock) ZapDomain() string
Examples ¶
Constants ¶
const ( // Req is a ZMQ_REQ socket type Req = int(C.ZMQ_REQ) // Rep is a ZMQ_REP socket type Rep = int(C.ZMQ_REP) // Dealer is a ZMQ_DEALER socket type Dealer = int(C.ZMQ_DEALER) // Router is a ZMQ_ROUTER socket type Router = int(C.ZMQ_ROUTER) // Pub is a ZMQ_PUB socket type Pub = int(C.ZMQ_PUB) // Sub is a ZMQ_SUB socket type Sub = int(C.ZMQ_SUB) // XPub is a ZMQ_XPUB socket type XPub = int(C.ZMQ_XPUB) // XSub is a ZMQ_XSUB socket type XSub = int(C.ZMQ_XSUB) // Push is a ZMQ_PUSH socket type Push = int(C.ZMQ_PUSH) // Pull is a ZMQ_PULL socket type Pull = int(C.ZMQ_PULL) // Pair is a ZMQ_PAIR socket type Pair = int(C.ZMQ_PAIR) // Stream is a ZMQ_STREAM socket type Stream = int(C.ZMQ_STREAM) // Pollin is the ZMQ_POLLIN constant Pollin = int(C.ZMQ_POLLIN) // Pollout is the ZMQ_POLLOUT constant Pollout = int(C.ZMQ_POLLOUT) // FlagMore is the ZFRAME_MORE flag FlagMore = int(C.ZFRAME_MORE) // FlagReuse is the ZFRAME_REUSE flag FlagReuse = int(C.ZFRAME_REUSE) //FlagDontWait is the ZFRAME_DONTWAIT flag FlagDontWait = int(C.ZFRAME_DONTWAIT) //FlagNone means there are no flags FlagNone = 0 // CurveAllowAny is a semantic convenience for allowing // any Curve clients CurveAllowAny = "*" //ZMQVersionMajor is the major version of the underlying ZeroMQ library ZMQVersionMajor = int(C.ZMQ_VERSION_MAJOR) //ZMQVersionMinor is the minor version of the underlying ZeroMQ library ZMQVersionMinor = int(C.ZMQ_VERSION_MINOR) //CZMQVersionMajor is the major version of the underlying CZMQ library CZMQVersionMajor = int(C.CZMQ_VERSION_MAJOR) // CZMQVersionMinor is the minor version of the underlying CZMQ library CZMQVersionMinor = int(C.CZMQ_VERSION_MINOR) )
Variables ¶
var ( // ErrActorCmd is returned when there is an error sending // a command to an actor ErrActorCmd = errors.New("error sending actor command") // ErrSockAttach is returned when an attach call to a socket fails ErrSockAttach = errors.New("error attaching zsock") // ErrInvalidSockType is returned when a function is called // against a socket type that is not applicable for that socket type ErrInvalidSockType = errors.New("invalid socket type") // ErrSliceFull is returned if a []byte passed to Read was not // large enough to hold the contents of a message ErrSliceFull = errors.New("slice full") // ErrConnect is returned if Connect on a socket fails ErrConnect = errors.New("connect error") // ErrDisconnect is returned if Disconnect on a socket fails ErrDisconnect = errors.New("disconnect error") // ErrBind is returned if Bind on a socket fails ErrBind = errors.New("bind error") // ErrUnbind is returned if Unbind on a socket fails ErrUnbind = errors.New("unbind error") // ErrSendFrame is returned if SendFrame on a socket fails ErrSendFrame = errors.New("send frame error") // ErrRecvFrame is returned if RecvFrame on a socket fails ErrRecvFrame = errors.New("recv frame error") // ErrRecvFrameAfterDestroy is returned if RecvFrame is called // on a socket after it has been destroyed. ErrRecvFrameAfterDestroy = errors.New("RecvFrame() is invalid on socket after Detroy() has been called.") // ErrRecvMessage is returned if RecvMessage on a socket fails ErrRecvMessage = errors.New("recv message error") // ErrWaitAfterDestroy is returned by a Poller if there is an error // accessing the underlying socket pointer when Wait is called ErrWaitAfterDestroy = errors.New("Wait() is invalid on Poller after Destroy() is called.") // ErrMultiPartUnsupported is returned when a function that does // not support multi-part messages encounters a multi-part message ErrMultiPartUnsupported = errors.New("function does not support multi part messages") // ErrTimeout is returned when a function that supports timeouts times out ErrTimeout = errors.New("function timed out") // ErrCertNotFound is returned when NewCertFromFile tries to // load a file that does not exist. ErrCertNotFound = errors.New("file not found") )
Functions ¶
func Shutdown ¶
func Shutdown()
Shutdown shuts down the CZMQ zsys layer. The CZMQ zsys layer normally shuts down on process termination through the use of an atexit cleanup function. Calling this allows the zsys layer to be shutdown manually.
This is beneficial when CZMQ will no longer be used but the process will not be terminating. Any potential resources allocated by the zsys layer can be freed as they will no longer be needed.
Types ¶
type Auth ¶
type Auth struct {
// contains filtered or unexported fields
}
Auth wraps the CZMQ zauth actor. It handles authentication for all incoming connections. It allows whitelisting and blackisting peers based on IP address and support PLAIN and CURVE authentication policies.
Example ¶
// create a server certificate serverCert := NewCert() defer serverCert.Destroy() // create a client certificate and save it clientCert := NewCert() defer clientCert.Destroy() clientCert.SavePublic("client_cert") defer func() { os.Remove("client_cert") }() // create an auth service auth := NewAuth() defer auth.Destroy() // tell the auth service the client cert is allowed auth.Curve("client_cert") // create a server socket and set it to // use the "global" auth domain server := NewSock(Push) defer server.Destroy() server.SetZapDomain("global") // set the server cert as the server cert // for the socket we created and set it // to be a curve server serverCert.Apply(server) server.SetCurveServer(1) // bind our server to an endpoint server.Bind("tcp://*:9898") // create a client socket client := NewSock(Pull) defer client.Destroy() // assign the client cert we made to the client clientCert.Apply(client) // set the server cert as the server cert // for the client. for the client to be // allowed to connect, it needs to know // the servers public cert. client.SetCurveServerkey(serverCert.PublicText()) // connect client.Connect("tcp://127.0.0.1:9898")
Output:
type Beacon ¶
type Beacon struct {
// contains filtered or unexported fields
}
Beacon wraps the CZMQ beacon actor. It implements a peer-to-peer discovery service for local networks. Beacons can broadcast and receive UDPv4 service broadcasts.
Example ¶
beacon := NewBeacon() defer beacon.Destroy() address, err := beacon.Configure(9999) if err != nil { panic(err) } fmt.Printf("started beacon on: %s", address) beacon.Publish("HI", 100)
Output:
func (*Beacon) Configure ¶
Configure accepts a port number and configures the beacon, returning an address
func (*Beacon) PublishBytes ¶
PublishBytes publishes an announcement byte slice at an interval
type Cert ¶
type Cert struct {
// contains filtered or unexported fields
}
Cert wraps the CZMQ zcert class. It provides tools for creating and working with ZMQ CURVE security certs. The certs can be used as a temporary object in memory or persisted to disk. Certs are made up of a public and secret keypair + metadata.
Example ¶
cert := NewCert() defer cert.Destroy() cert.SetMeta("email", "taotetek@gmail.com") cert.SetMeta("name", "Brian Knox") cert.SetMeta("organization", "ZeroMQ")
Output:
func NewCertFromFile ¶
NewCertFromFile Load loads a Cert from files
func NewCertFromKeys ¶
NewCertFromKeys creates a new Cert from a public and private key
func (*Cert) PublicText ¶
PublicText returns the public key as a string
func (*Cert) SavePublic ¶
SavePublic saves the public key to a file
func (*Cert) SaveSecret ¶
SaveSecret saves the secret key to a file
type CertStore ¶
type CertStore struct {
// contains filtered or unexported fields
}
func NewCertStore ¶
NewCertStore creates a new certificate store from a disk directory, loading and indexing all certificates.
func (*CertStore) Insert ¶
Insert inserts a certificate into the store in memory. Call Save directly on the cert if you wish to save it to disk.
type Channeler ¶
type Channeler struct { SendChan chan<- [][]byte RecvChan <-chan [][]byte // contains filtered or unexported fields }
Channeler serializes all access to a socket through a send and receive channel. It starts two threads, on is used for receiving from the zeromq socket. The other is used to listen to the receive channel, and send everything back to the socket thrad for sending using an additional inproc socket.
Example (Output) ¶
// create a dealer channeler dealer := NewDealerChanneler("inproc://channelerdealerrouter") defer dealer.Destroy() // create a router channeler router := NewRouterChanneler("inproc://channelerdealerrouter") defer router.Destroy() // send a hello message dealer.SendChan <- [][]byte{[]byte("Hello")} // receive the hello message request := <-router.RecvChan // first frame is identity of client - let's append 'World' // to the message and route it back request = append(request, []byte("World")) // send the reply router.SendChan <- request // receive the reply reply := <-dealer.RecvChan fmt.Printf("%s %s", string(reply[0]), string(reply[1]))
Output: Hello World
func NewDealerChanneler ¶
NewDealerChanneler creates a new Channeler wrapping a Dealer socket. The socket will connect by default.
func NewPairChanneler ¶
NewPairChanneler creates a new Channeler wrapping a Pair socket. The socket will connect by default.
func NewPubChanneler ¶
NewPubChanneler creats a new Channeler wrapping a Pub socket. The socket will bind by default.
func NewPullChanneler ¶
NewPullChanneler creates a new Channeler wrapping a Pull socket. The socket will bind by default.
func NewPushChanneler ¶
NewPushChanneler creates a new Channeler wrapping a Push socket. The socket will connect by default.
func NewRepChanneler ¶
NewRepChanneler creates a new Channeler wrapping a Rep socket. The socket will bind by default.
func NewReqChanneler ¶
NewReqChanneler creates a new Channeler wrapping a Req socket. The socket will connect by default.
func NewRouterChanneler ¶
NewRouterChanneler creates a new Channeler wrapping a Router socket. The socket will Bind by default.
func NewStreamChanneler ¶
NewStreamChanneler creates a new Channeler wrapping a Pair socket. The socket will connect by default.
func NewSubChanneler ¶
NewSubChanneler creates a new Channeler wrapping a Sub socket. Along with an endpoint list it accepts a comma delimited list of topics. The socket will connect by default.
func NewXPubChanneler ¶
NewXPubChanneler creates a new Channeler wrapping an XPub socket. The socket will Bind by default.
func NewXSubChanneler ¶
NewXSubChanneler creates a new Channeler wrapping a XSub socket. The socket will connect by default.
func (*Channeler) Destroy ¶
func (c *Channeler) Destroy()
Destroy sends a message to the Channeler to shut it down and clean it up.
func (*Channeler) Unsubscribe ¶
Unsubscribe from a Topic
type Poller ¶
type Poller struct {
// contains filtered or unexported fields
}
Poller provides a simple wrapper to ZeroMQ's zmq_poll API, for the common case of reading from a number of sockets. Sockets can be added and removed from the running poller.
Example ¶
sock1, err := NewRouter("inproc://poller_example_1") if err != nil { panic(err) } defer sock1.Destroy() poller, err := NewPoller(sock1) if err != nil { panic(err) } sock2, err := NewRouter("inproc://poller_example_2") if err != nil { panic(err) } defer sock2.Destroy() err = poller.Add(sock2) if err != nil { panic(err) } // Poller.Wait(millis) returns first socket that has a waiting message _ = poller.Wait(1)
Output:
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
Proxy wraps the CZMQ zproxy actor. A proxy actor switches messages between a frontend and backend socket, and also provides an optional capture socket messages can be mirrored to. The proxy can be paused and resumed.
Example ¶
proxy := NewProxy() defer proxy.Destroy() // set front end address and socket type err := proxy.SetFrontend(Pull, "inproc://frontend") if err != nil { panic(err) } // set back end address and socket type err = proxy.SetBackend(Push, "inproc://backend") if err != nil { panic(err) } // set address for "tee"ing proxy traffic to err = proxy.SetCapture("inproc://capture") if err != nil { panic(err) } // we can pause the proxy err = proxy.Pause() if err != nil { panic(err) } // and we can resume it err = proxy.Resume() if err != nil { panic(err) } proxy.Destroy()
Output:
func (*Proxy) SetBackend ¶
SetBackend accepts a socket type and endpoint, and sends a message to the zactor thread telling it to set up a socket bound to the endpoint.
func (*Proxy) SetCapture ¶
SetCapture accepts a socket endpoint and sets up a Push socket bound to that endpoint, that sends a copy of all messages passing through the proxy.
func (*Proxy) SetFrontend ¶
SetFrontend accepts a socket type and endpoint, and sends a message to the zactor thread telling it to set up a socket bound to the endpoint.
type ReadWriter ¶
type ReadWriter struct {
// contains filtered or unexported fields
}
ReadWriter provides an io.ReadWriter compatible interface for goczmq.Sock
func NewReadWriter ¶
func NewReadWriter(sock *Sock) (*ReadWriter, error)
NewReadWriter accepts a sock and returns a goczmq.ReadWriter. The io.ReadWriter should now be considered responsible for this Sock.
func (*ReadWriter) Destroy ¶
func (r *ReadWriter) Destroy()
Destroy destroys both the ReadWriter and the underlying Sock
func (*ReadWriter) GetLastClientID ¶
func (r *ReadWriter) GetLastClientID() []byte
GetLastClientID returns the id of the last client you received a message from if the underlying socket is a Router socket
func (*ReadWriter) SetLastClientID ¶
func (r *ReadWriter) SetLastClientID(id []byte)
SetLastClientID lets you manually set the id of the client you last received a message from if the underlying socket is a Router socket
func (*ReadWriter) SetTimeout ¶
func (r *ReadWriter) SetTimeout(ms int)
SetTimeout sets the timeout on Read in millisecond. If no new data is received within the timeout period, Read will return an ErrTimeout
type Sock ¶
type Sock struct {
// contains filtered or unexported fields
}
Sock wraps the CZMQ zsock class.
Example (Output) ¶
// create dealer socket dealer, err := NewDealer("inproc://example") if err != nil { panic(err) } defer dealer.Destroy() // create router socket router, err := NewRouter("inproc://example") if err != nil { panic(err) } defer router.Destroy() // send hello message dealer.SendFrame([]byte("Hello"), FlagNone) // receive hello message request, err := router.RecvMessage() if err != nil { panic(err) } // first frame is identify of client - let's append 'World' // to the message and route it back. request = append(request, []byte("World")) // send reply err = router.SendMessage(request) if err != nil { panic(err) } // receive reply reply, err := dealer.RecvMessage() if err != nil { panic(err) } fmt.Printf("%s %s", string(reply[0]), string(reply[1]))
Output: Hello World
func NewDealer ¶
NewDealer creates a Dealer socket and calls Attach. The socket will Connect by default.
func NewRouter ¶
NewRouter creates a Router socket and calls Attach. The socket will Bind by default.
func NewSock ¶
NewSock creates a new socket. The caller source and line number are passed so CZMQ can report socket leaks intelligently.
func NewStream ¶
NewStream creates a Stream socket and calls Attach. The socket will Connect by default.
func NewSub ¶
NewSub creates a Sub socket and calls Attach. 'subscribe' is a comma delimited list of topics to subscribe to. The socket will Connect by default.
func (*Sock) Attach ¶
Attach attaches a socket to zero or more endpoints. If endpoints is not null, parses as list of ZeroMQ endpoints, separated by commas, and prefixed by '@' (to bind the socket) or '>' (to attach the socket). If the endpoint does not start with '@' or '>', the serverish argument determines whether it is used to bind (serverish = true) or connect (serverish = false)
func (*Sock) Bind ¶
Bind binds a socket to an endpoint. On success returns the port number used for tcp transports, or 0 for other transports. On failure returns a -1 for port, and an error.
func (*Sock) Connect ¶
Connect connects a socket to an endpoint returns an error if the connect failed.
func (*Sock) ConnectTimeout ¶ added in v4.1.0
ConnectTimeout returns the current value of the socket's connect_timeout option
func (*Sock) CurvePublickey ¶
CurvePublickey returns the current value of the socket's curve_publickey option
func (*Sock) CurveSecretkey ¶
CurveSecretkey returns the current value of the socket's curve_secretkey option
func (*Sock) CurveServer ¶
CurveServer returns the current value of the socket's curve_server option
func (*Sock) CurveServerkey ¶
CurveServerkey returns the current value of the socket's curve_serverkey option
func (*Sock) Disconnect ¶
Disconnect disconnects a socket from an endpoint. If returns an error if the endpoint was not found
func (*Sock) GetLastClientID ¶
GetLastClientID returns the id of the last client you received a message from if the underlying socket is a Router socket DEPRECATED: See goczmq.ReadWriter
func (*Sock) GssapiPlaintext ¶
GssapiPlaintext returns the current value of the socket's gssapi_plaintext option
func (*Sock) GssapiPrincipal ¶
GssapiPrincipal returns the current value of the socket's gssapi_principal option
func (*Sock) GssapiServer ¶
GssapiServer returns the current value of the socket's gssapi_server option
func (*Sock) GssapiServicePrincipal ¶
GssapiServicePrincipal returns the current value of the socket's gssapi_service_principal option
func (*Sock) HandshakeIvl ¶ added in v4.1.0
HandshakeIvl returns the current value of the socket's handshake_ivl option
func (*Sock) HeartbeatIvl ¶ added in v4.1.0
HeartbeatIvl returns the current value of the socket's heartbeat_ivl option
func (*Sock) HeartbeatTimeout ¶ added in v4.1.0
HeartbeatTimeout returns the current value of the socket's heartbeat_timeout option
func (*Sock) HeartbeatTtl ¶ added in v4.1.0
HeartbeatTtl returns the current value of the socket's heartbeat_ttl option
func (*Sock) InvertMatching ¶ added in v4.1.0
InvertMatching returns the current value of the socket's invert_matching option
func (*Sock) LastEndpoint ¶
LastEndpoint returns the current value of the socket's last_endpoint option
func (*Sock) Maxmsgsize ¶
Maxmsgsize returns the current value of the socket's maxmsgsize option
func (*Sock) MulticastHops ¶
MulticastHops returns the current value of the socket's multicast_hops option
func (*Sock) MulticastMaxtpdu ¶ added in v4.1.0
MulticastMaxtpdu returns the current value of the socket's multicast_maxtpdu option
func (*Sock) PlainPassword ¶
PlainPassword returns the current value of the socket's plain_password option
func (*Sock) PlainServer ¶
PlainServer returns the current value of the socket's plain_server option
func (*Sock) PlainUsername ¶
PlainUsername returns the current value of the socket's plain_username option
func (*Sock) Read ¶
Read provides an io.Reader interface to a zeromq socket DEPRECATED: see goczmq.ReadWriter
func (*Sock) ReconnectIvl ¶
ReconnectIvl returns the current value of the socket's reconnect_ivl option
func (*Sock) ReconnectIvlMax ¶
ReconnectIvlMax returns the current value of the socket's reconnect_ivl_max option
func (*Sock) RecoveryIvl ¶
RecoveryIvl returns the current value of the socket's recovery_ivl option
func (*Sock) RecvFrame ¶
RecvFrame reads a frame from the socket and returns it as a byte array, along with a more flag and and error (if there is an error)
func (*Sock) RecvFrameNoWait ¶
RecvFrameNoWait receives a frame from the socket and returns it as a byte array if one is waiting. Returns an empty frame, a 0 more flag and an error if one is not immediately available
func (*Sock) RecvMessage ¶
RecvMessage receives a full message from the socket and returns it as an array of byte arrays.
func (*Sock) RecvMessageNoWait ¶
RecvMessageNoWait receives a full message from the socket and returns it as an array of byte arrays if one is waiting. Returns an empty message and an error if one is not immediately available
func (*Sock) SendFrame ¶
SendFrame sends a byte array via the socket. For the flags value, use 0 for a single message, or SNDFlagMore if it is a multi-part message
func (*Sock) SendMessage ¶
SendMessage accepts an array of byte arrays and sends it as a multi-part message.
func (*Sock) SetAffinity ¶
SetAffinity sets the affinity option for the socket
func (*Sock) SetBacklog ¶
SetBacklog sets the backlog option for the socket
func (*Sock) SetConflate ¶
SetConflate sets the conflate option for the socket
func (*Sock) SetConnectRid ¶ added in v4.1.0
SetConnectRid sets the connect_rid option for the socket
func (*Sock) SetConnectTimeout ¶ added in v4.1.0
SetConnectTimeout sets the connect_timeout option for the socket
func (*Sock) SetCurvePublickey ¶
SetCurvePublickey sets the curve_publickey option for the socket
func (*Sock) SetCurveSecretkey ¶
SetCurveSecretkey sets the curve_secretkey option for the socket
func (*Sock) SetCurveServer ¶
SetCurveServer sets the curve_server option for the socket
func (*Sock) SetCurveServerkey ¶
SetCurveServerkey sets the curve_serverkey option for the socket
func (*Sock) SetDelayAttachOnConnect ¶
SetDelayAttachOnConnect sets the delay_attach_on_connect option for the socket
func (*Sock) SetGssapiPlaintext ¶
SetGssapiPlaintext sets the gssapi_plaintext option for the socket
func (*Sock) SetGssapiPrincipal ¶
SetGssapiPrincipal sets the gssapi_principal option for the socket
func (*Sock) SetGssapiServer ¶
SetGssapiServer sets the gssapi_server option for the socket
func (*Sock) SetGssapiServicePrincipal ¶
SetGssapiServicePrincipal sets the gssapi_service_principal option for the socket
func (*Sock) SetHandshakeIvl ¶ added in v4.1.0
SetHandshakeIvl sets the handshake_ivl option for the socket
func (*Sock) SetHeartbeatIvl ¶ added in v4.1.0
SetHeartbeatIvl sets the heartbeat_ivl option for the socket
func (*Sock) SetHeartbeatTimeout ¶ added in v4.1.0
SetHeartbeatTimeout sets the heartbeat_timeout option for the socket
func (*Sock) SetHeartbeatTtl ¶ added in v4.1.0
SetHeartbeatTtl sets the heartbeat_ttl option for the socket
func (*Sock) SetIdentity ¶
SetIdentity sets the identity option for the socket
func (*Sock) SetImmediate ¶
SetImmediate sets the immediate option for the socket
func (*Sock) SetInvertMatching ¶ added in v4.1.0
SetInvertMatching sets the invert_matching option for the socket
func (*Sock) SetIpv4only ¶
SetIpv4only sets the ipv4only option for the socket
func (*Sock) SetLastClientID ¶
SetLastClientID lets you manually set the id of the client you last received a message from if the underlying socket is a Router socket DEPRECATED: See goczmq.ReadWriter
func (*Sock) SetMaxmsgsize ¶
SetMaxmsgsize sets the maxmsgsize option for the socket
func (*Sock) SetMulticastHops ¶
SetMulticastHops sets the multicast_hops option for the socket
func (*Sock) SetMulticastMaxtpdu ¶ added in v4.1.0
SetMulticastMaxtpdu sets the multicast_maxtpdu option for the socket
func (*Sock) SetPlainPassword ¶
SetPlainPassword sets the plain_password option for the socket
func (*Sock) SetPlainServer ¶
SetPlainServer sets the plain_server option for the socket
func (*Sock) SetPlainUsername ¶
SetPlainUsername sets the plain_username option for the socket
func (*Sock) SetProbeRouter ¶
SetProbeRouter sets the probe_router option for the socket
func (*Sock) SetRcvtimeo ¶
SetRcvtimeo sets the rcvtimeo option for the socket
func (*Sock) SetReconnectIvl ¶
SetReconnectIvl sets the reconnect_ivl option for the socket
func (*Sock) SetReconnectIvlMax ¶
SetReconnectIvlMax sets the reconnect_ivl_max option for the socket
func (*Sock) SetRecoveryIvl ¶
SetRecoveryIvl sets the recovery_ivl option for the socket
func (*Sock) SetReqCorrelate ¶
SetReqCorrelate sets the req_correlate option for the socket
func (*Sock) SetReqRelaxed ¶
SetReqRelaxed sets the req_relaxed option for the socket
func (*Sock) SetRouterHandover ¶
SetRouterHandover sets the router_handover option for the socket
func (*Sock) SetRouterMandatory ¶
SetRouterMandatory sets the router_mandatory option for the socket
func (*Sock) SetRouterRaw ¶
SetRouterRaw sets the router_raw option for the socket
func (*Sock) SetSndtimeo ¶
SetSndtimeo sets the sndtimeo option for the socket
func (*Sock) SetSocksProxy ¶ added in v4.1.0
SetSocksProxy sets the socks_proxy option for the socket
func (*Sock) SetStreamNotify ¶ added in v4.1.0
SetStreamNotify sets the stream_notify option for the socket
func (*Sock) SetSubscribe ¶
SetSubscribe sets the subscribe option for the socket
func (*Sock) SetTcpAcceptFilter ¶
SetTcpAcceptFilter sets the tcp_accept_filter option for the socket
func (*Sock) SetTcpKeepalive ¶
SetTcpKeepalive sets the tcp_keepalive option for the socket
func (*Sock) SetTcpKeepaliveCnt ¶
SetTcpKeepaliveCnt sets the tcp_keepalive_cnt option for the socket
func (*Sock) SetTcpKeepaliveIdle ¶
SetTcpKeepaliveIdle sets the tcp_keepalive_idle option for the socket
func (*Sock) SetTcpKeepaliveIntvl ¶
SetTcpKeepaliveIntvl sets the tcp_keepalive_intvl option for the socket
func (*Sock) SetTcpMaxrt ¶ added in v4.1.0
SetTcpMaxrt sets the tcp_maxrt option for the socket
func (*Sock) SetUnsubscribe ¶
SetUnsubscribe sets the unsubscribe option for the socket
func (*Sock) SetVmciBufferMaxSize ¶ added in v4.1.0
SetVmciBufferMaxSize sets the vmci_buffer_max_size option for the socket
func (*Sock) SetVmciBufferMinSize ¶ added in v4.1.0
SetVmciBufferMinSize sets the vmci_buffer_min_size option for the socket
func (*Sock) SetVmciBufferSize ¶ added in v4.1.0
SetVmciBufferSize sets the vmci_buffer_size option for the socket
func (*Sock) SetVmciConnectTimeout ¶ added in v4.1.0
SetVmciConnectTimeout sets the vmci_connect_timeout option for the socket
func (*Sock) SetXPubManual ¶ added in v4.1.0
SetXPubManual sets the xpub_manual option for the socket
func (*Sock) SetXPubNodrop ¶ added in v4.1.0
SetXPubNodrop sets the xpub_nodrop option for the socket
func (*Sock) SetXPubVerbose ¶ added in v4.1.0
SetXPubVerbose sets the xpub_verbose option for the socket
func (*Sock) SetXPubVerboser ¶ added in v4.1.0
SetXPubVerboser sets the xpub_verboser option for the socket
func (*Sock) SetXPubWelcomeMsg ¶ added in v4.1.0
SetXPubWelcomeMsg sets the xpub_welcome_msg option for the socket
func (*Sock) SetZapDomain ¶
SetZapDomain sets the zap_domain option for the socket
func (*Sock) SocksProxy ¶ added in v4.1.0
SocksProxy returns the current value of the socket's socks_proxy option
func (*Sock) TcpAcceptFilter ¶
TcpAcceptFilter returns the current value of the socket's tcp_accept_filter option
func (*Sock) TcpKeepalive ¶
TcpKeepalive returns the current value of the socket's tcp_keepalive option
func (*Sock) TcpKeepaliveCnt ¶
TcpKeepaliveCnt returns the current value of the socket's tcp_keepalive_cnt option
func (*Sock) TcpKeepaliveIdle ¶
TcpKeepaliveIdle returns the current value of the socket's tcp_keepalive_idle option
func (*Sock) TcpKeepaliveIntvl ¶
TcpKeepaliveIntvl returns the current value of the socket's tcp_keepalive_intvl option
func (*Sock) TcpMaxrt ¶ added in v4.1.0
TcpMaxrt returns the current value of the socket's tcp_maxrt option
func (*Sock) ThreadSafe ¶ added in v4.1.0
ThreadSafe returns the current value of the socket's thread_safe option
func (*Sock) Unbind ¶
Unbind unbinds a socket from an endpoint. If returns an error if the endpoint was not found
func (*Sock) VmciBufferMaxSize ¶ added in v4.1.0
VmciBufferMaxSize returns the current value of the socket's vmci_buffer_max_size option
func (*Sock) VmciBufferMinSize ¶ added in v4.1.0
VmciBufferMinSize returns the current value of the socket's vmci_buffer_min_size option
func (*Sock) VmciBufferSize ¶ added in v4.1.0
VmciBufferSize returns the current value of the socket's vmci_buffer_size option
func (*Sock) VmciConnectTimeout ¶ added in v4.1.0
VmciConnectTimeout returns the current value of the socket's vmci_connect_timeout option