Documentation ¶
Overview ¶
Package goczmq is a golang binding for CZMQ 3. CZMQ is a high level binding for ZeroMQ. Along with ZeroMQ socket support, CZMQ provides "actor" based services for authentication, service discovery, and creating proxies. GoCZMQ provides direct bindings to CZMQ along with higher level go abstractions such as channels and io.ReadWriter interface support.
"Tell them I was a writer.
A maker of software. A humanist. A father. And many things. But above all, a writer. Thank You. :) - Pieter Hintjens
Index ¶
- Constants
- Variables
- func Affinity(s *Sock) int
- func Backlog(s *Sock) int
- func ConnectTimeout(s *Sock) int
- func CurvePublickey(s *Sock) string
- func CurveSecretkey(s *Sock) string
- func CurveServer(s *Sock) int
- func CurveServerkey(s *Sock) string
- func Events(s *Sock) int
- func Fd(s *Sock) int
- func GssapiPlaintext(s *Sock) int
- func GssapiPrincipal(s *Sock) string
- func GssapiServer(s *Sock) int
- func GssapiServicePrincipal(s *Sock) string
- func HandshakeIvl(s *Sock) int
- func HeartbeatIvl(s *Sock) int
- func HeartbeatTimeout(s *Sock) int
- func HeartbeatTtl(s *Sock) int
- func Identity(s *Sock) string
- func Immediate(s *Sock) int
- func InvertMatching(s *Sock) int
- func Ipv4only(s *Sock) int
- func Ipv6(s *Sock) int
- func LastEndpoint(s *Sock) string
- func Linger(s *Sock) int
- func Maxmsgsize(s *Sock) int
- func Mechanism(s *Sock) int
- func MulticastHops(s *Sock) int
- func MulticastMaxtpdu(s *Sock) int
- func PlainPassword(s *Sock) string
- func PlainServer(s *Sock) int
- func PlainUsername(s *Sock) string
- func Rate(s *Sock) int
- func Rcvbuf(s *Sock) int
- func Rcvhwm(s *Sock) int
- func Rcvmore(s *Sock) int
- func Rcvtimeo(s *Sock) int
- func ReconnectIvl(s *Sock) int
- func ReconnectIvlMax(s *Sock) int
- func RecoveryIvl(s *Sock) int
- func Shutdown()
- func Sndbuf(s *Sock) int
- func Sndhwm(s *Sock) int
- func Sndtimeo(s *Sock) int
- func SocksProxy(s *Sock) string
- func TcpAcceptFilter(s *Sock) string
- func TcpKeepalive(s *Sock) int
- func TcpKeepaliveCnt(s *Sock) int
- func TcpKeepaliveIdle(s *Sock) int
- func TcpKeepaliveIntvl(s *Sock) int
- func TcpMaxrt(s *Sock) int
- func ThreadSafe(s *Sock) int
- func Tos(s *Sock) int
- func Type(s *Sock) int
- func UseFd(s *Sock) int
- func VmciBufferMaxSize(s *Sock) int
- func VmciBufferMinSize(s *Sock) int
- func VmciBufferSize(s *Sock) int
- func VmciConnectTimeout(s *Sock) int
- func ZapDomain(s *Sock) string
- type Auth
- type Beacon
- func (b *Beacon) Configure(port int) (string, error)
- func (b *Beacon) Destroy()
- func (b *Beacon) Publish(announcement string, interval int) error
- func (b *Beacon) PublishBytes(announcement []byte, interval int) error
- func (b *Beacon) Recv(timeout int) [][]byte
- func (b *Beacon) Subscribe(filter string) error
- func (b *Beacon) Verbose() error
- type Cert
- func (c *Cert) Apply(s *Sock)
- func (c *Cert) Destroy()
- func (c *Cert) Dup() *Cert
- func (c *Cert) Equal(compare *Cert) bool
- func (c *Cert) Meta(key string) string
- func (c *Cert) Print()
- func (c *Cert) PublicText() string
- func (c *Cert) Save(filename string) error
- func (c *Cert) SavePublic(filename string) error
- func (c *Cert) SaveSecret(filename string) error
- func (c *Cert) SetMeta(key string, value string)
- type CertStore
- type Channeler
- func NewDealerChanneler(endpoints string, options ...SockOption) *Channeler
- func NewPairChanneler(endpoints string, options ...SockOption) *Channeler
- func NewPubChanneler(endpoints string, options ...SockOption) *Channeler
- func NewPullChanneler(endpoints string, options ...SockOption) *Channeler
- func NewPushChanneler(endpoints string, options ...SockOption) *Channeler
- func NewRepChanneler(endpoints string, options ...SockOption) *Channeler
- func NewReqChanneler(endpoints string, options ...SockOption) *Channeler
- func NewRouterChanneler(endpoints string, options ...SockOption) *Channeler
- func NewStreamChanneler(endpoints string, options ...SockOption) *Channeler
- func NewSubChanneler(endpoints string, varargs ...interface{}) *Channeler
- func NewXPubChanneler(endpoints string, options ...SockOption) *Channeler
- func NewXSubChanneler(endpoints string, options ...SockOption) *Channeler
- type Monitor
- type Poller
- type Proxy
- func (p *Proxy) Destroy()
- func (p *Proxy) Pause() error
- func (p *Proxy) Resume() error
- func (p *Proxy) SetBackend(sockType int, endpoint string) error
- func (p *Proxy) SetBackendCurve(publicKey string, secretKey string) error
- func (p *Proxy) SetBackendDomain(domain string) error
- func (p *Proxy) SetCapture(endpoint string) error
- func (p *Proxy) SetFrontend(sockType int, endpoint string) error
- func (p *Proxy) SetFrontendCurve(publicKey string, secretKey string) error
- func (p *Proxy) SetFrontendDomain(domain string) error
- func (p *Proxy) Verbose() error
- type ReadWriter
- type Sock
- func NewDealer(endpoints string, options ...SockOption) (*Sock, error)
- func NewPair(endpoints string, options ...SockOption) (*Sock, error)
- func NewPub(endpoints string, options ...SockOption) (*Sock, error)
- func NewPull(endpoints string, options ...SockOption) (*Sock, error)
- func NewPush(endpoints string, options ...SockOption) (*Sock, error)
- func NewRep(endpoints string, options ...SockOption) (*Sock, error)
- func NewReq(endpoints string, options ...SockOption) (*Sock, error)
- func NewRouter(endpoints string, options ...SockOption) (*Sock, error)
- func NewSock(t int, options ...SockOption) *Sock
- func NewStream(endpoints string, options ...SockOption) (*Sock, error)
- func NewSub(endpoints string, subscribe string, options ...SockOption) (*Sock, error)
- func NewXPub(endpoints string, options ...SockOption) (*Sock, error)
- func NewXSub(endpoints string, options ...SockOption) (*Sock, error)
- func (s *Sock) Attach(endpoints string, serverish bool) error
- func (s *Sock) Bind(endpoint string) (int, error)
- func (s *Sock) Connect(endpoint string) error
- func (s *Sock) Destroy()
- func (s *Sock) Disconnect(endpoint string) error
- func (s *Sock) GetLastClientID() []byte
- func (s *Sock) GetType() int
- func (s *Sock) Pollin() bool
- func (s *Sock) Pollout() bool
- func (s *Sock) Read(p []byte) (int, error)
- func (s *Sock) RecvFrame() ([]byte, int, error)
- func (s *Sock) RecvFrameNoWait() ([]byte, int, error)
- func (s *Sock) RecvMessage() ([][]byte, error)
- func (s *Sock) RecvMessageNoWait() ([][]byte, error)
- func (s *Sock) SendFrame(data []byte, flags int) error
- func (s *Sock) SendMessage(parts [][]byte) error
- func (s *Sock) SetLastClientID(id []byte)
- func (s *Sock) SetOption(o SockOption)
- func (s *Sock) Unbind(endpoint string) error
- func (s *Sock) Write(p []byte) (int, error)
- type SockOption
- func SockSetAffinity(v int) SockOption
- func SockSetBacklog(v int) SockOption
- func SockSetConflate(v int) SockOption
- func SockSetConnectRid(v string) SockOption
- func SockSetConnectTimeout(v int) SockOption
- func SockSetCurvePublickey(v string) SockOption
- func SockSetCurveSecretkey(v string) SockOption
- func SockSetCurveServer(v int) SockOption
- func SockSetCurveServerkey(v string) SockOption
- func SockSetDelayAttachOnConnect(v int) SockOption
- func SockSetGssapiPlaintext(v int) SockOption
- func SockSetGssapiPrincipal(v string) SockOption
- func SockSetGssapiServer(v int) SockOption
- func SockSetGssapiServicePrincipal(v string) SockOption
- func SockSetHandshakeIvl(v int) SockOption
- func SockSetHeartbeatIvl(v int) SockOption
- func SockSetHeartbeatTimeout(v int) SockOption
- func SockSetHeartbeatTtl(v int) SockOption
- func SockSetIdentity(v string) SockOption
- func SockSetImmediate(v int) SockOption
- func SockSetInvertMatching(v int) SockOption
- func SockSetIpv4only(v int) SockOption
- func SockSetIpv6(v int) SockOption
- func SockSetLinger(v int) SockOption
- func SockSetMaxmsgsize(v int) SockOption
- func SockSetMulticastHops(v int) SockOption
- func SockSetMulticastMaxtpdu(v int) SockOption
- func SockSetPlainPassword(v string) SockOption
- func SockSetPlainServer(v int) SockOption
- func SockSetPlainUsername(v string) SockOption
- func SockSetProbeRouter(v int) SockOption
- func SockSetRate(v int) SockOption
- func SockSetRcvbuf(v int) SockOption
- func SockSetRcvhwm(v int) SockOption
- func SockSetRcvtimeo(v int) SockOption
- func SockSetReconnectIvl(v int) SockOption
- func SockSetReconnectIvlMax(v int) SockOption
- func SockSetRecoveryIvl(v int) SockOption
- func SockSetReqCorrelate(v int) SockOption
- func SockSetReqRelaxed(v int) SockOption
- func SockSetRouterHandover(v int) SockOption
- func SockSetRouterMandatory(v int) SockOption
- func SockSetRouterNotify(v int) SockOption
- func SockSetRouterRaw(v int) SockOption
- func SockSetSndbuf(v int) SockOption
- func SockSetSndhwm(v int) SockOption
- func SockSetSndtimeo(v int) SockOption
- func SockSetSocksProxy(v string) SockOption
- func SockSetStreamNotify(v int) SockOption
- func SockSetSubscribe(v string) SockOption
- func SockSetTcpAcceptFilter(v string) SockOption
- func SockSetTcpKeepalive(v int) SockOption
- func SockSetTcpKeepaliveCnt(v int) SockOption
- func SockSetTcpKeepaliveIdle(v int) SockOption
- func SockSetTcpKeepaliveIntvl(v int) SockOption
- func SockSetTcpMaxrt(v int) SockOption
- func SockSetTos(v int) SockOption
- func SockSetUnsubscribe(v string) SockOption
- func SockSetUseFd(v int) SockOption
- func SockSetVmciBufferMaxSize(v int) SockOption
- func SockSetVmciBufferMinSize(v int) SockOption
- func SockSetVmciBufferSize(v int) SockOption
- func SockSetVmciConnectTimeout(v int) SockOption
- func SockSetXPubManual(v int) SockOption
- func SockSetXPubNodrop(v int) SockOption
- func SockSetXPubVerbose(v int) SockOption
- func SockSetXPubVerboser(v int) SockOption
- func SockSetXPubWelcomeMsg(v string) SockOption
- func SockSetZapDomain(v string) SockOption
Examples ¶
Constants ¶
const ( // Req is a ZMQ_REQ socket type Req = int(C.ZMQ_REQ) // Rep is a ZMQ_REP socket type Rep = int(C.ZMQ_REP) // Dealer is a ZMQ_DEALER socket type Dealer = int(C.ZMQ_DEALER) // Router is a ZMQ_ROUTER socket type Router = int(C.ZMQ_ROUTER) // Pub is a ZMQ_PUB socket type Pub = int(C.ZMQ_PUB) // Sub is a ZMQ_SUB socket type Sub = int(C.ZMQ_SUB) // XPub is a ZMQ_XPUB socket type XPub = int(C.ZMQ_XPUB) // XSub is a ZMQ_XSUB socket type XSub = int(C.ZMQ_XSUB) // Push is a ZMQ_PUSH socket type Push = int(C.ZMQ_PUSH) // Pull is a ZMQ_PULL socket type Pull = int(C.ZMQ_PULL) // Pair is a ZMQ_PAIR socket type Pair = int(C.ZMQ_PAIR) // Stream is a ZMQ_STREAM socket type Stream = int(C.ZMQ_STREAM) // Pollin is the ZMQ_POLLIN constant Pollin = int(C.ZMQ_POLLIN) // Pollout is the ZMQ_POLLOUT constant Pollout = int(C.ZMQ_POLLOUT) // FlagMore is the ZFRAME_MORE flag FlagMore = int(C.ZFRAME_MORE) // FlagReuse is the ZFRAME_REUSE flag FlagReuse = int(C.ZFRAME_REUSE) //FlagDontWait is the ZFRAME_DONTWAIT flag FlagDontWait = int(C.ZFRAME_DONTWAIT) //FlagNone means there are no flags FlagNone = 0 // CurveAllowAny is a semantic convenience for allowing // any Curve clients CurveAllowAny = "*" //ZMQVersionMajor is the major version of the underlying ZeroMQ library ZMQVersionMajor = int(C.ZMQ_VERSION_MAJOR) //ZMQVersionMinor is the minor version of the underlying ZeroMQ library ZMQVersionMinor = int(C.ZMQ_VERSION_MINOR) //CZMQVersionMajor is the major version of the underlying CZMQ library CZMQVersionMajor = int(C.CZMQ_VERSION_MAJOR) // CZMQVersionMinor is the minor version of the underlying CZMQ library CZMQVersionMinor = int(C.CZMQ_VERSION_MINOR) )
Variables ¶
var ( // ErrActorCmd is returned when there is an error sending // a command to an actor ErrActorCmd = errors.New("error sending actor command") // ErrSockAttach is returned when an attach call to a socket fails ErrSockAttach = errors.New("error attaching zsock") // ErrSockAttachEmptyEndpoints is returned when the endpoints value is empty ErrSockAttachEmptyEndpoints = errors.New("Endpoints cannot be empty") // ErrInvalidSockType is returned when a function is called // against a socket type that is not applicable for that socket type ErrInvalidSockType = errors.New("invalid socket type") // ErrSliceFull is returned if a []byte passed to Read was not // large enough to hold the contents of a message ErrSliceFull = errors.New("slice full") // ErrConnect is returned if Connect on a socket fails ErrConnect = errors.New("connect error") // ErrDisconnect is returned if Disconnect on a socket fails ErrDisconnect = errors.New("disconnect error") // ErrBind is returned if Bind on a socket fails ErrBind = errors.New("bind error") // ErrUnbind is returned if Unbind on a socket fails ErrUnbind = errors.New("unbind error") // ErrSendFrame is returned if SendFrame on a socket fails ErrSendFrame = errors.New("send frame error") // ErrRecvFrame is returned if RecvFrame on a socket fails ErrRecvFrame = errors.New("recv frame error") // ErrRecvFrameAfterDestroy is returned if RecvFrame is called // on a socket after it has been destroyed. ErrRecvFrameAfterDestroy = errors.New("RecvFrame() is invalid on socket after Detroy() has been called.") // ErrRecvMessage is returned if RecvMessage on a socket fails ErrRecvMessage = errors.New("recv message error") // ErrWaitAfterDestroy is returned by a Poller if there is an error // accessing the underlying socket pointer when Wait is called ErrWaitAfterDestroy = errors.New("Wait() is invalid on Poller after Destroy() is called.") // ErrMultiPartUnsupported is returned when a function that does // not support multi-part messages encounters a multi-part message ErrMultiPartUnsupported = errors.New("function does not support multi part messages") // ErrTimeout is returned when a function that supports timeouts times out ErrTimeout = errors.New("function timed out") // ErrCertNotFound is returned when NewCertFromFile tries to // load a file that does not exist. ErrCertNotFound = errors.New("file not found") )
Functions ¶
func ConnectTimeout ¶
ConnectTimeout returns the current value of the socket's connect_timeout option
func CurvePublickey ¶
CurvePublickey returns the current value of the socket's curve_publickey option
func CurveSecretkey ¶
CurveSecretkey returns the current value of the socket's curve_secretkey option
func CurveServer ¶
CurveServer returns the current value of the socket's curve_server option
func CurveServerkey ¶
CurveServerkey returns the current value of the socket's curve_serverkey option
func GssapiPlaintext ¶
GssapiPlaintext returns the current value of the socket's gssapi_plaintext option
func GssapiPrincipal ¶
GssapiPrincipal returns the current value of the socket's gssapi_principal option
func GssapiServer ¶
GssapiServer returns the current value of the socket's gssapi_server option
func GssapiServicePrincipal ¶
GssapiServicePrincipal returns the current value of the socket's gssapi_service_principal option
func HandshakeIvl ¶
HandshakeIvl returns the current value of the socket's handshake_ivl option
func HeartbeatIvl ¶
HeartbeatIvl returns the current value of the socket's heartbeat_ivl option
func HeartbeatTimeout ¶
HeartbeatTimeout returns the current value of the socket's heartbeat_timeout option
func HeartbeatTtl ¶
HeartbeatTtl returns the current value of the socket's heartbeat_ttl option
func InvertMatching ¶
InvertMatching returns the current value of the socket's invert_matching option
func LastEndpoint ¶
LastEndpoint returns the current value of the socket's last_endpoint option
func Maxmsgsize ¶
Maxmsgsize returns the current value of the socket's maxmsgsize option
func MulticastHops ¶
MulticastHops returns the current value of the socket's multicast_hops option
func MulticastMaxtpdu ¶
MulticastMaxtpdu returns the current value of the socket's multicast_maxtpdu option
func PlainPassword ¶
PlainPassword returns the current value of the socket's plain_password option
func PlainServer ¶
PlainServer returns the current value of the socket's plain_server option
func PlainUsername ¶
PlainUsername returns the current value of the socket's plain_username option
func ReconnectIvl ¶
ReconnectIvl returns the current value of the socket's reconnect_ivl option
func ReconnectIvlMax ¶
ReconnectIvlMax returns the current value of the socket's reconnect_ivl_max option
func RecoveryIvl ¶
RecoveryIvl returns the current value of the socket's recovery_ivl option
func Shutdown ¶
func Shutdown()
Shutdown shuts down the CZMQ zsys layer. The CZMQ zsys layer normally shuts down on process termination through the use of an atexit cleanup function. Calling this allows the zsys layer to be shutdown manually.
This is beneficial when CZMQ will no longer be used but the process will not be terminating. Any potential resources allocated by the zsys layer can be freed as they will no longer be needed.
func SocksProxy ¶
SocksProxy returns the current value of the socket's socks_proxy option
func TcpAcceptFilter ¶
TcpAcceptFilter returns the current value of the socket's tcp_accept_filter option
func TcpKeepalive ¶
TcpKeepalive returns the current value of the socket's tcp_keepalive option
func TcpKeepaliveCnt ¶
TcpKeepaliveCnt returns the current value of the socket's tcp_keepalive_cnt option
func TcpKeepaliveIdle ¶
TcpKeepaliveIdle returns the current value of the socket's tcp_keepalive_idle option
func TcpKeepaliveIntvl ¶
TcpKeepaliveIntvl returns the current value of the socket's tcp_keepalive_intvl option
func ThreadSafe ¶
ThreadSafe returns the current value of the socket's thread_safe option
func VmciBufferMaxSize ¶
VmciBufferMaxSize returns the current value of the socket's vmci_buffer_max_size option
func VmciBufferMinSize ¶
VmciBufferMinSize returns the current value of the socket's vmci_buffer_min_size option
func VmciBufferSize ¶
VmciBufferSize returns the current value of the socket's vmci_buffer_size option
func VmciConnectTimeout ¶
VmciConnectTimeout returns the current value of the socket's vmci_connect_timeout option
Types ¶
type Auth ¶
type Auth struct {
// contains filtered or unexported fields
}
Auth wraps the CZMQ zauth actor. It handles authentication for all incoming connections. It allows whitelisting and blackisting peers based on IP address and support PLAIN and CURVE authentication policies.
Example ¶
// create a server certificate serverCert := NewCert() defer serverCert.Destroy() // create a client certificate and save it clientCert := NewCert() defer clientCert.Destroy() err := clientCert.SavePublic("client_cert") if err != nil { panic(err) } defer func() { err := os.Remove("client_cert") if err != nil { if err != nil { panic(err) } } }() // create an auth service auth := NewAuth() defer auth.Destroy() // tell the auth service the client cert is allowed err = auth.Curve("client_cert") if err != nil { panic(err) } // create a server socket and set it to // use the "global" auth domain server := NewSock(Push, SockSetZapDomain("global")) defer server.Destroy() // set the server cert as the server cert // for the socket we created and set it // to be a curve server serverCert.Apply(server) server.SetOption(SockSetCurveServer(1)) // bind our server to an endpoint _, err = server.Bind("tcp://*:9898") if err != nil { panic(err) } // create a client socket client := NewSock(Pull) defer client.Destroy() // assign the client cert we made to the client clientCert.Apply(client) // set the server cert as the server cert // for the client. for the client to be // allowed to connect, it needs to know // the servers public cert. client.SetOption(SockSetCurveServerkey(serverCert.PublicText())) // connect err = client.Connect("tcp://127.0.0.1:9898") if err != nil { panic(err) }
Output:
type Beacon ¶
type Beacon struct {
// contains filtered or unexported fields
}
Beacon wraps the CZMQ beacon actor. It implements a peer-to-peer discovery service for local networks. Beacons can broadcast and receive UDPv4 service broadcasts.
Example ¶
beacon := NewBeacon() defer beacon.Destroy() address, err := beacon.Configure(9999) if err != nil { panic(err) } fmt.Printf("started beacon on: %s", address) err = beacon.Publish("HI", 100) if err != nil { panic(err) }
Output:
func (*Beacon) Configure ¶
Configure accepts a port number and configures the beacon, returning an address
func (*Beacon) PublishBytes ¶
PublishBytes publishes an announcement byte slice at an interval
type Cert ¶
type Cert struct {
// contains filtered or unexported fields
}
Cert wraps the CZMQ zcert class. It provides tools for creating and working with ZMQ CURVE security certs. The certs can be used as a temporary object in memory or persisted to disk. Certs are made up of a public and secret keypair + metadata.
Example ¶
cert := NewCert() defer cert.Destroy() cert.SetMeta("email", "taotetek@gmail.com") cert.SetMeta("name", "Brian Knox") cert.SetMeta("organization", "ZeroMQ")
Output:
func NewCertFromFile ¶
NewCertFromFile Load loads a Cert from files
func NewCertFromKeys ¶
NewCertFromKeys creates a new Cert from a public and private key
func (*Cert) PublicText ¶
PublicText returns the public key as a string
func (*Cert) SavePublic ¶
SavePublic saves the public key to a file
func (*Cert) SaveSecret ¶
SaveSecret saves the secret key to a file
type CertStore ¶
type CertStore struct {
// contains filtered or unexported fields
}
func NewCertStore ¶
NewCertStore creates a new certificate store from a disk directory, loading and indexing all certificates.
func NewCertStoreInMemory ¶
func NewCertStoreInMemory() *CertStore
NewCertStoreInMemory creates a new certificate store in memory.
func (*CertStore) Insert ¶
Insert inserts a certificate into the store in memory. Call Save directly on the cert if you wish to save it to disk.
type Channeler ¶
type Channeler struct { SendChan chan<- [][]byte RecvChan <-chan [][]byte ErrChan <-chan error // contains filtered or unexported fields }
Channeler serializes all access to a socket through a send and receive channel. It starts two threads, one is used for receiving from the zeromq socket. The other is used to listen to the receive channel, and send everything back to the socket thread for sending using an additional inproc socket.
Example (Output) ¶
// create a dealer channeler dealer := NewDealerChanneler("inproc://channelerdealerrouter") defer dealer.Destroy() // create a router channeler router := NewRouterChanneler("inproc://channelerdealerrouter") defer router.Destroy() // send a hello message dealer.SendChan <- [][]byte{[]byte("Hello")} // receive the hello message request := <-router.RecvChan // first frame is identity of client - let's append 'World' // to the message and route it back request = append(request, []byte("World")) // send the reply router.SendChan <- request // receive the reply reply := <-dealer.RecvChan fmt.Printf("%s %s", string(reply[0]), string(reply[1]))
Output: Hello World
func NewDealerChanneler ¶
func NewDealerChanneler(endpoints string, options ...SockOption) *Channeler
NewDealerChanneler creates a new Channeler wrapping a Dealer socket. The socket will connect by default.
func NewPairChanneler ¶
func NewPairChanneler(endpoints string, options ...SockOption) *Channeler
NewPairChanneler creates a new Channeler wrapping a Pair socket. The socket will connect by default.
func NewPubChanneler ¶
func NewPubChanneler(endpoints string, options ...SockOption) *Channeler
NewPubChanneler creats a new Channeler wrapping a Pub socket. The socket will bind by default.
func NewPullChanneler ¶
func NewPullChanneler(endpoints string, options ...SockOption) *Channeler
NewPullChanneler creates a new Channeler wrapping a Pull socket. The socket will bind by default.
func NewPushChanneler ¶
func NewPushChanneler(endpoints string, options ...SockOption) *Channeler
NewPushChanneler creates a new Channeler wrapping a Push socket. The socket will connect by default.
func NewRepChanneler ¶
func NewRepChanneler(endpoints string, options ...SockOption) *Channeler
NewRepChanneler creates a new Channeler wrapping a Rep socket. The socket will bind by default.
func NewReqChanneler ¶
func NewReqChanneler(endpoints string, options ...SockOption) *Channeler
NewReqChanneler creates a new Channeler wrapping a Req socket. The socket will connect by default.
func NewRouterChanneler ¶
func NewRouterChanneler(endpoints string, options ...SockOption) *Channeler
NewRouterChanneler creates a new Channeler wrapping a Router socket. The socket will Bind by default.
func NewStreamChanneler ¶
func NewStreamChanneler(endpoints string, options ...SockOption) *Channeler
NewStreamChanneler creates a new Channeler wrapping a Pair socket. The socket will connect by default.
func NewSubChanneler ¶
NewSubChanneler creates a new Channeler wrapping a Sub socket. Along with an endpoint list it accepts a list of topics and/or socket options (discriminated by type). The socket will connect by default.
func NewXPubChanneler ¶
func NewXPubChanneler(endpoints string, options ...SockOption) *Channeler
NewXPubChanneler creates a new Channeler wrapping an XPub socket. The socket will Bind by default.
func NewXSubChanneler ¶
func NewXSubChanneler(endpoints string, options ...SockOption) *Channeler
NewXSubChanneler creates a new Channeler wrapping a XSub socket. The socket will connect by default.
func (*Channeler) Destroy ¶
func (c *Channeler) Destroy()
Destroy sends a message to the Channeler to shut it down and clean it up.
func (*Channeler) Unsubscribe ¶
Unsubscribe from a Topic
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
Monitor provides an API for obtaining socket events
func (*Monitor) Socket ¶
Socket returns the actor as a Sock instance, useful and necessary for being able to receive messages
type Poller ¶
type Poller struct {
// contains filtered or unexported fields
}
Poller provides a simple wrapper to ZeroMQ's zmq_poll API, for the common case of reading from a number of sockets. Sockets can be added and removed from the running poller.
Example ¶
sock1, err := NewRouter("inproc://poller_example_1") if err != nil { panic(err) } defer sock1.Destroy() poller, err := NewPoller(sock1) if err != nil { panic(err) } sock2, err := NewRouter("inproc://poller_example_2") if err != nil { panic(err) } defer sock2.Destroy() err = poller.Add(sock2) if err != nil { panic(err) } // Poller.Wait(millis) returns first socket that has a waiting message poller.Wait(1)
Output:
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
Proxy wraps the CZMQ zproxy actor. A proxy actor switches messages between a frontend and backend socket, and also provides an optional capture socket messages can be mirrored to. The proxy can be paused and resumed.
Example ¶
proxy := NewProxy() defer proxy.Destroy() // set front end address and socket type err := proxy.SetFrontend(Pull, "inproc://frontend") if err != nil { panic(err) } // set back end address and socket type err = proxy.SetBackend(Push, "inproc://backend") if err != nil { panic(err) } // set address for "tee"ing proxy traffic to err = proxy.SetCapture("inproc://capture") if err != nil { panic(err) } // we can pause the proxy err = proxy.Pause() if err != nil { panic(err) } // and we can resume it err = proxy.Resume() if err != nil { panic(err) } proxy.Destroy()
Output:
func (*Proxy) SetBackend ¶
SetBackend accepts a socket type and endpoint, and sends a message to the zactor thread telling it to set up a socket bound to the endpoint.
func (*Proxy) SetBackendCurve ¶
SetBackendCurve accepts Z85 encoded public and secret keys and sends a message to the zactor thread telling it to set up CURVE authentication for the socket.
func (*Proxy) SetBackendDomain ¶
SetBackendDomain accepts a domain, and sends a message to the zactor thread telling it to set up ZAP authentication domain for the socket.
func (*Proxy) SetCapture ¶
SetCapture accepts a socket endpoint and sets up a Push socket bound to that endpoint, that sends a copy of all messages passing through the proxy.
func (*Proxy) SetFrontend ¶
SetFrontend accepts a socket type and endpoint, and sends a message to the zactor thread telling it to set up a socket bound to the endpoint.
func (*Proxy) SetFrontendCurve ¶
SetFrontendCurve accepts Z85 encoded public and secret keys and sends a message to the zactor thread telling it to set up CURVE authentication for the socket.
func (*Proxy) SetFrontendDomain ¶
SetFrontendDomain accepts a domain, and sends a message to the zactor thread telling it to set up ZAP authentication domain for the socket.
type ReadWriter ¶
type ReadWriter struct {
// contains filtered or unexported fields
}
ReadWriter provides an io.ReadWriter compatible interface for goczmq.Sock
func NewReadWriter ¶
func NewReadWriter(sock *Sock) (*ReadWriter, error)
NewReadWriter accepts a sock and returns a goczmq.ReadWriter. The io.ReadWriter should now be considered responsible for this Sock.
func (*ReadWriter) Destroy ¶
func (r *ReadWriter) Destroy()
Destroy destroys both the ReadWriter and the underlying Sock
func (*ReadWriter) GetLastClientID ¶
func (r *ReadWriter) GetLastClientID() []byte
GetLastClientID returns the id of the last client you received a message from if the underlying socket is a Router socket
func (*ReadWriter) SetLastClientID ¶
func (r *ReadWriter) SetLastClientID(id []byte)
SetLastClientID lets you manually set the id of the client you last received a message from if the underlying socket is a Router socket
func (*ReadWriter) SetTimeout ¶
func (r *ReadWriter) SetTimeout(ms int)
SetTimeout sets the timeout on Read in millisecond. If no new data is received within the timeout period, Read will return an ErrTimeout
type Sock ¶
type Sock struct {
// contains filtered or unexported fields
}
Sock wraps the CZMQ zsock class.
Example (Output) ¶
// create dealer socket dealer, err := NewDealer("inproc://example") if err != nil { panic(err) } defer dealer.Destroy() // create router socket router, err := NewRouter("inproc://example") if err != nil { panic(err) } defer router.Destroy() // send hello message err = dealer.SendFrame([]byte("Hello"), FlagNone) if err != nil { panic(err) } // receive hello message request, err := router.RecvMessage() if err != nil { panic(err) } // first frame is identify of client - let's append 'World' // to the message and route it back. request = append(request, []byte("World")) // send reply err = router.SendMessage(request) if err != nil { panic(err) } // receive reply reply, err := dealer.RecvMessage() if err != nil { panic(err) } fmt.Printf("%s %s", string(reply[0]), string(reply[1]))
Output: Hello World
func NewDealer ¶
func NewDealer(endpoints string, options ...SockOption) (*Sock, error)
NewDealer creates a Dealer socket and calls Attach. The socket will Connect by default.
func NewPair ¶
func NewPair(endpoints string, options ...SockOption) (*Sock, error)
NewPair creates a Pair socket and calls Attach. The socket will Connect by default.
func NewPub ¶
func NewPub(endpoints string, options ...SockOption) (*Sock, error)
NewPub creates a Pub socket and calls Attach. The socket will Bind by default.
func NewPull ¶
func NewPull(endpoints string, options ...SockOption) (*Sock, error)
NewPull creates a Pull socket and calls Attach. The socket will Bind by default.
func NewPush ¶
func NewPush(endpoints string, options ...SockOption) (*Sock, error)
NewPush creates a Push socket and calls Attach. The socket will Connect by default.
func NewRep ¶
func NewRep(endpoints string, options ...SockOption) (*Sock, error)
NewRep creates a Rep socket and calls Attach. The socket will Bind by default.
func NewReq ¶
func NewReq(endpoints string, options ...SockOption) (*Sock, error)
NewReq creates a Req socket and calls Attach. The socket will Connect by default.
func NewRouter ¶
func NewRouter(endpoints string, options ...SockOption) (*Sock, error)
NewRouter creates a Router socket and calls Attach. The socket will Bind by default.
func NewSock ¶
func NewSock(t int, options ...SockOption) *Sock
NewSock creates a new socket. The caller source and line number are passed so CZMQ can report socket leaks intelligently.
func NewStream ¶
func NewStream(endpoints string, options ...SockOption) (*Sock, error)
NewStream creates a Stream socket and calls Attach. The socket will Connect by default.
func NewSub ¶
func NewSub(endpoints string, subscribe string, options ...SockOption) (*Sock, error)
NewSub creates a Sub socket and calls Attach. 'subscribe' is a comma delimited list of topics to subscribe to. The socket will Connect by default.
func NewXPub ¶
func NewXPub(endpoints string, options ...SockOption) (*Sock, error)
NewXPub creates an XPub socket and calls Attach. The socket will Bind by default.
func NewXSub ¶
func NewXSub(endpoints string, options ...SockOption) (*Sock, error)
NewXSub creates an XSub socket and calls Attach. The socket will Connect by default.
func (*Sock) Attach ¶
Attach attaches a socket to zero or more endpoints. If endpoints is not null, parses as list of ZeroMQ endpoints, separated by commas, and prefixed by '@' (to bind the socket) or '>' (to attach the socket). If the endpoint does not start with '@' or '>', the serverish argument determines whether it is used to bind (serverish = true) or connect (serverish = false)
func (*Sock) Bind ¶
Bind binds a socket to an endpoint. On success returns the port number used for tcp transports, or 0 for other transports. On failure returns a -1 for port, and an error.
func (*Sock) Connect ¶
Connect connects a socket to an endpoint returns an error if the connect failed.
func (*Sock) Disconnect ¶
Disconnect disconnects a socket from an endpoint. If returns an error if the endpoint was not found
func (*Sock) GetLastClientID ¶
GetLastClientID returns the id of the last client you received a message from if the underlying socket is a Router socket DEPRECATED: See goczmq.ReadWriter
func (*Sock) Read ¶
Read provides an io.Reader interface to a zeromq socket DEPRECATED: see goczmq.ReadWriter
func (*Sock) RecvFrame ¶
RecvFrame reads a frame from the socket and returns it as a byte array, along with a more flag and and error (if there is an error)
func (*Sock) RecvFrameNoWait ¶
RecvFrameNoWait receives a frame from the socket and returns it as a byte array if one is waiting. Returns an empty frame, a 0 more flag and an error if one is not immediately available
func (*Sock) RecvMessage ¶
RecvMessage receives a full message from the socket and returns it as an array of byte arrays.
func (*Sock) RecvMessageNoWait ¶
RecvMessageNoWait receives a full message from the socket and returns it as an array of byte arrays if one is waiting. Returns an empty message and an error if one is not immediately available
func (*Sock) SendFrame ¶
SendFrame sends a byte array via the socket. For the flags value, use FlagNone (0) for a single message, or FlagMore if it is a multi-part message
func (*Sock) SendMessage ¶
SendMessage accepts an array of byte arrays and sends it as a multi-part message.
func (*Sock) SetLastClientID ¶
SetLastClientID lets you manually set the id of the client you last received a message from if the underlying socket is a Router socket DEPRECATED: See goczmq.ReadWriter
func (*Sock) SetOption ¶
func (s *Sock) SetOption(o SockOption)
SetOption accepts a SockOption and uses it to set an option on the underlying ZeroMQ socket
type SockOption ¶
type SockOption func(*Sock)
SockOption is a type for setting options on the underlying ZeroMQ socket
func SockSetAffinity ¶
func SockSetAffinity(v int) SockOption
SockSetAffinity sets the affinity option for the socket
func SockSetBacklog ¶
func SockSetBacklog(v int) SockOption
SockSetBacklog sets the backlog option for the socket
func SockSetConflate ¶
func SockSetConflate(v int) SockOption
SockSetConflate sets the conflate option for the socket
func SockSetConnectRid ¶
func SockSetConnectRid(v string) SockOption
SockSetConnectRid sets the connect_rid option for the socket
func SockSetConnectTimeout ¶
func SockSetConnectTimeout(v int) SockOption
SockSetConnectTimeout sets the connect_timeout option for the socket
func SockSetCurvePublickey ¶
func SockSetCurvePublickey(v string) SockOption
SockSetCurvePublickey sets the curve_publickey option for the socket
func SockSetCurveSecretkey ¶
func SockSetCurveSecretkey(v string) SockOption
SockSetCurveSecretkey sets the curve_secretkey option for the socket
func SockSetCurveServer ¶
func SockSetCurveServer(v int) SockOption
SockSetCurveServer sets the curve_server option for the socket
func SockSetCurveServerkey ¶
func SockSetCurveServerkey(v string) SockOption
SockSetCurveServerkey sets the curve_serverkey option for the socket
func SockSetDelayAttachOnConnect ¶
func SockSetDelayAttachOnConnect(v int) SockOption
SockSetDelayAttachOnConnect sets the delay_attach_on_connect option for the socket
func SockSetGssapiPlaintext ¶
func SockSetGssapiPlaintext(v int) SockOption
SockSetGssapiPlaintext sets the gssapi_plaintext option for the socket
func SockSetGssapiPrincipal ¶
func SockSetGssapiPrincipal(v string) SockOption
SockSetGssapiPrincipal sets the gssapi_principal option for the socket
func SockSetGssapiServer ¶
func SockSetGssapiServer(v int) SockOption
SockSetGssapiServer sets the gssapi_server option for the socket
func SockSetGssapiServicePrincipal ¶
func SockSetGssapiServicePrincipal(v string) SockOption
SockSetGssapiServicePrincipal sets the gssapi_service_principal option for the socket
func SockSetHandshakeIvl ¶
func SockSetHandshakeIvl(v int) SockOption
SockSetHandshakeIvl sets the handshake_ivl option for the socket
func SockSetHeartbeatIvl ¶
func SockSetHeartbeatIvl(v int) SockOption
SockSetHeartbeatIvl sets the heartbeat_ivl option for the socket
func SockSetHeartbeatTimeout ¶
func SockSetHeartbeatTimeout(v int) SockOption
SockSetHeartbeatTimeout sets the heartbeat_timeout option for the socket
func SockSetHeartbeatTtl ¶
func SockSetHeartbeatTtl(v int) SockOption
SockSetHeartbeatTtl sets the heartbeat_ttl option for the socket
func SockSetIdentity ¶
func SockSetIdentity(v string) SockOption
SockSetIdentity sets the identity option for the socket
func SockSetImmediate ¶
func SockSetImmediate(v int) SockOption
SockSetImmediate sets the immediate option for the socket
func SockSetInvertMatching ¶
func SockSetInvertMatching(v int) SockOption
SockSetInvertMatching sets the invert_matching option for the socket
func SockSetIpv4only ¶
func SockSetIpv4only(v int) SockOption
SockSetIpv4only sets the ipv4only option for the socket
func SockSetIpv6 ¶
func SockSetIpv6(v int) SockOption
SockSetIpv6 sets the ipv6 option for the socket
func SockSetLinger ¶
func SockSetLinger(v int) SockOption
SockSetLinger sets the linger option for the socket
func SockSetMaxmsgsize ¶
func SockSetMaxmsgsize(v int) SockOption
SockSetMaxmsgsize sets the maxmsgsize option for the socket
func SockSetMulticastHops ¶
func SockSetMulticastHops(v int) SockOption
SockSetMulticastHops sets the multicast_hops option for the socket
func SockSetMulticastMaxtpdu ¶
func SockSetMulticastMaxtpdu(v int) SockOption
SockSetMulticastMaxtpdu sets the multicast_maxtpdu option for the socket
func SockSetPlainPassword ¶
func SockSetPlainPassword(v string) SockOption
SockSetPlainPassword sets the plain_password option for the socket
func SockSetPlainServer ¶
func SockSetPlainServer(v int) SockOption
SockSetPlainServer sets the plain_server option for the socket
func SockSetPlainUsername ¶
func SockSetPlainUsername(v string) SockOption
SockSetPlainUsername sets the plain_username option for the socket
func SockSetProbeRouter ¶
func SockSetProbeRouter(v int) SockOption
SockSetProbeRouter sets the probe_router option for the socket
func SockSetRate ¶
func SockSetRate(v int) SockOption
SockSetRate sets the rate option for the socket
func SockSetRcvbuf ¶
func SockSetRcvbuf(v int) SockOption
SockSetRcvbuf sets the rcvbuf option for the socket
func SockSetRcvhwm ¶
func SockSetRcvhwm(v int) SockOption
SockSetRcvhwm sets the rcvhwm option for the socket
func SockSetRcvtimeo ¶
func SockSetRcvtimeo(v int) SockOption
SockSetRcvtimeo sets the rcvtimeo option for the socket
func SockSetReconnectIvl ¶
func SockSetReconnectIvl(v int) SockOption
SockSetReconnectIvl sets the reconnect_ivl option for the socket
func SockSetReconnectIvlMax ¶
func SockSetReconnectIvlMax(v int) SockOption
SockSetReconnectIvlMax sets the reconnect_ivl_max option for the socket
func SockSetRecoveryIvl ¶
func SockSetRecoveryIvl(v int) SockOption
SockSetRecoveryIvl sets the recovery_ivl option for the socket
func SockSetReqCorrelate ¶
func SockSetReqCorrelate(v int) SockOption
SockSetReqCorrelate sets the req_correlate option for the socket
func SockSetReqRelaxed ¶
func SockSetReqRelaxed(v int) SockOption
SockSetReqRelaxed sets the req_relaxed option for the socket
func SockSetRouterHandover ¶
func SockSetRouterHandover(v int) SockOption
SockSetRouterHandover sets the router_handover option for the socket
func SockSetRouterMandatory ¶
func SockSetRouterMandatory(v int) SockOption
SockSetRouterMandatory sets the router_mandatory option for the socket
func SockSetRouterNotify ¶
func SockSetRouterNotify(v int) SockOption
SockSetRouterNotify sets the router_notify option for the socket
func SockSetRouterRaw ¶
func SockSetRouterRaw(v int) SockOption
SockSetRouterRaw sets the router_raw option for the socket
func SockSetSndbuf ¶
func SockSetSndbuf(v int) SockOption
SockSetSndbuf sets the sndbuf option for the socket
func SockSetSndhwm ¶
func SockSetSndhwm(v int) SockOption
SockSetSndhwm sets the sndhwm option for the socket
func SockSetSndtimeo ¶
func SockSetSndtimeo(v int) SockOption
SockSetSndtimeo sets the sndtimeo option for the socket
func SockSetSocksProxy ¶
func SockSetSocksProxy(v string) SockOption
SockSetSocksProxy sets the socks_proxy option for the socket
func SockSetStreamNotify ¶
func SockSetStreamNotify(v int) SockOption
SockSetStreamNotify sets the stream_notify option for the socket
func SockSetSubscribe ¶
func SockSetSubscribe(v string) SockOption
SockSetSubscribe sets the subscribe option for the socket
func SockSetTcpAcceptFilter ¶
func SockSetTcpAcceptFilter(v string) SockOption
SockSetTcpAcceptFilter sets the tcp_accept_filter option for the socket
func SockSetTcpKeepalive ¶
func SockSetTcpKeepalive(v int) SockOption
SockSetTcpKeepalive sets the tcp_keepalive option for the socket
func SockSetTcpKeepaliveCnt ¶
func SockSetTcpKeepaliveCnt(v int) SockOption
SockSetTcpKeepaliveCnt sets the tcp_keepalive_cnt option for the socket
func SockSetTcpKeepaliveIdle ¶
func SockSetTcpKeepaliveIdle(v int) SockOption
SockSetTcpKeepaliveIdle sets the tcp_keepalive_idle option for the socket
func SockSetTcpKeepaliveIntvl ¶
func SockSetTcpKeepaliveIntvl(v int) SockOption
SockSetTcpKeepaliveIntvl sets the tcp_keepalive_intvl option for the socket
func SockSetTcpMaxrt ¶
func SockSetTcpMaxrt(v int) SockOption
SockSetTcpMaxrt sets the tcp_maxrt option for the socket
func SockSetUnsubscribe ¶
func SockSetUnsubscribe(v string) SockOption
SockSetUnsubscribe sets the unsubscribe option for the socket
func SockSetUseFd ¶
func SockSetUseFd(v int) SockOption
SockSetUseFd sets the use_fd option for the socket
func SockSetVmciBufferMaxSize ¶
func SockSetVmciBufferMaxSize(v int) SockOption
SockSetVmciBufferMaxSize sets the vmci_buffer_max_size option for the socket
func SockSetVmciBufferMinSize ¶
func SockSetVmciBufferMinSize(v int) SockOption
SockSetVmciBufferMinSize sets the vmci_buffer_min_size option for the socket
func SockSetVmciBufferSize ¶
func SockSetVmciBufferSize(v int) SockOption
SockSetVmciBufferSize sets the vmci_buffer_size option for the socket
func SockSetVmciConnectTimeout ¶
func SockSetVmciConnectTimeout(v int) SockOption
SockSetVmciConnectTimeout sets the vmci_connect_timeout option for the socket
func SockSetXPubManual ¶
func SockSetXPubManual(v int) SockOption
SockSetXPubManual sets the xpub_manual option for the socket
func SockSetXPubNodrop ¶
func SockSetXPubNodrop(v int) SockOption
SockSetXPubNodrop sets the xpub_nodrop option for the socket
func SockSetXPubVerbose ¶
func SockSetXPubVerbose(v int) SockOption
SockSetXPubVerbose sets the xpub_verbose option for the socket
func SockSetXPubVerboser ¶
func SockSetXPubVerboser(v int) SockOption
SockSetXPubVerboser sets the xpub_verboser option for the socket
func SockSetXPubWelcomeMsg ¶
func SockSetXPubWelcomeMsg(v string) SockOption
SockSetXPubWelcomeMsg sets the xpub_welcome_msg option for the socket
func SockSetZapDomain ¶
func SockSetZapDomain(v string) SockOption
SockSetZapDomain sets the zap_domain option for the socket