zmq

package module
v0.0.0-...-425c333 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2013 License: MIT Imports: 7 Imported by: 12

README

go-zmq

Go (golang) bindings for ZeroMQ (zmq, 0mq), currently built for ZeroMQ 3.2.x. Pull requests welcome.

View the API docs here.

Basic Usage

Import go-zmq:

import "github.com/vaughan0/go-zmq"

Create a new ZeroMQ Context and Socket:

ctx, err := zmq.NewContext()
if err != nil {
  panic(err)
}
defer ctx.Close()

Bind to a local endpoint:

sock, err := ctx.Socket(zmq.Rep)
if err != nil {
  panic(err)
}
defer sock.Close()

if err = sock.Bind("tcp://*:5555"); err != nil {
  panic(err)
}

Receive and send messages:

for {
  parts, err := sock.Recv()
  if err != nil {
    panic(err)
  }
  response := fmt.Sprintf("Received %d message parts", len(parts))
  if err = sock.Send([][]byte{
    []byte(response),
  }); err != nil {
    panic(err)
  }
}

Using Channels

ZeroMQ sockets are not thread-safe, which would make any ambitious ZeroMQ-using gopher sad. Luckily go-zmq provides a (thread-safe) way to use sockets with native Go channels. This also allows one to use the select construct with ZeroMQ sockets.

Start by using the Channels() method of a socket:

chans := sock.Channels()
defer chans.Close()

Now you can send and receive messages using the channels returned by chans.Out() and chans.In(), respectively. Don't forget to also check chans.Errors() to see if any errors occur.

for {
  select {
  case msg := <-chans.In():
    go func() {
      resp := doSomething(msg)
      chans.Out() <- resp
    }()
  case err := <-chans.Errors():
    panic(err)
  }
}

Documentation

Overview

Package zmq provides ZeroMQ bindings for Go.

Index

Constants

View Source
const (
	// An input event. Corresponds to receiving on sockets and reading from files.
	In EventSet = C.ZMQ_POLLIN
	// An output event. Corresponds to sending on sockets and writing to files.
	Out = C.ZMQ_POLLOUT
	// An error event. Corresponds to errors on files. The Error event does not apply to sockets.
	Error = C.ZMQ_POLLERR
	// No events
	None = 0
)
View Source
const (
	Req    SocketType = C.ZMQ_REQ
	Rep               = C.ZMQ_REP
	Dealer            = C.ZMQ_DEALER
	Router            = C.ZMQ_ROUTER
	Pub               = C.ZMQ_PUB
	Sub               = C.ZMQ_SUB
	XPub              = C.ZMQ_XPUB
	XSub              = C.ZMQ_XSUB
	Push              = C.ZMQ_PUSH
	Pull              = C.ZMQ_PULL
	Pair              = C.ZMQ_PAIR
)
View Source
const (
	Queue     DeviceType = C.ZMQ_QUEUE
	Forwarder            = C.ZMQ_FORWARDER
	Streamer             = C.ZMQ_STREAMER
)

Variables

View Source
var (
	// ErrTerminated is returned when a socket's context has been closed.
	ErrTerminated = errors.New("zmq context has been terminated")
	// ErrTimeout is returned when an operation times out or a non-blocking operation cannot run immediately.
	ErrTimeout     = errors.New("zmq timeout")
	ErrInterrupted = errors.New("system call interrupted")
)
View Source
var PairPrefix = "socket-pair-"

Prefix used for socket-pair endpoint names in MakePair.

Functions

func Device

func Device(deviceType DeviceType, frontend, backend *Socket)

Creates and runs a ZeroMQ Device. See zmq_device(3) for more details.

func Version

func Version() (major, minor, patch int)

Version reports 0MQ library version.

Types

type Channels

type Channels struct {
	// contains filtered or unexported fields
}

Channels provides a method for using Go channels to send and receive messages on a Socket. This is useful not only because it allows one to use select for sockets, but also because Sockets by themselves are not thread-safe (ie. one should not Send and Recv on the same socket from different threads).

func (*Channels) Close

func (c *Channels) Close()

Closes the Channels object. This will ensure that a number of internal sockets are closed, and that worker goroutines are stopped cleanly.

func (*Channels) Errors

func (c *Channels) Errors() <-chan error

func (*Channels) In

func (c *Channels) In() <-chan [][]byte

func (*Channels) Out

func (c *Channels) Out() chan<- [][]byte

type Context

type Context struct {
	// contains filtered or unexported fields
}

A Context manages multiple Sockets. Contexts are thread-safe.

func DefaultContext

func DefaultContext() *Context

Returns the default Context. Note that the context will not be created until the first call to DefaultContext.

func NewContext

func NewContext() (*Context, error)

Creates a new Context with the default number of IO threads (one).

func NewContextThreads

func NewContextThreads(nthreads int) (ctx *Context, err error)

Creates a new Context with the given number of dedicated IO threads.

func (*Context) Close

func (c *Context) Close()

Closes the Context. Close will block until all related Sockets are closed, and all pending messages are either physically transferred to the network or the socket's linger period expires.

func (*Context) MakePair

func (c *Context) MakePair() (a *Socket, b *Socket)

Creates a pair of connected inproc sockets that can be used for safe inter-thread communication. Returns both sockets.

func (*Context) Socket

func (c *Context) Socket(socktype SocketType) (sock *Socket, err error)

Creates a new Socket of the specified type.

type DeviceType

type DeviceType int

type EventSet

type EventSet int

An EventSet is a bitmask of IO events.

func (EventSet) CanRecv

func (e EventSet) CanRecv() bool

Returns true if the associated socket can receive immediately, or if the associated file can be read from.

func (EventSet) CanSend

func (e EventSet) CanSend() bool

Returns true if the associated socket can send immediately, or if the associated file can be written to.

func (EventSet) HasError

func (e EventSet) HasError() bool

Returns true if the associated file has an error condition. HasError will always return false for sockets.

type PollSet

type PollSet struct {
	// contains filtered or unexported fields
}

A PollSet represents a set of sockets and/or file descriptors, along with monitored and triggered EventSets. The zero PollSet is valid (and empty).

func (*PollSet) Events

func (p *PollSet) Events(index int) EventSet

Returns the triggered events for a socket/file of the given index.

func (*PollSet) Fd

func (p *PollSet) Fd(fd uintptr, events EventSet) (index int)

Adds a file descriptor to the PollSet along with a set of events to monitor. Returns the index in the PollSet of the added file.

func (*PollSet) File

func (p *PollSet) File(f *os.File, events EventSet) (index int)

Shortcut that calls Fd() with the file's file descriptor.

func (*PollSet) Monitored

func (p *PollSet) Monitored(index int) (events EventSet)

Returns the monitored events for the socket/file of the given index.

func (*PollSet) Poll

func (p *PollSet) Poll(timeout time.Duration) (n int, err error)

Poll waits for activity on the monitored set of sockets and/or files. If the timeout is zero, Poll will return immediately. If it is negative, Poll will wait forever until an event is triggered. Poll returns the number of sockets/files for which events were triggered, or a non-nil error.

func (*PollSet) Socket

func (p *PollSet) Socket(sock *Socket, events EventSet) (index int)

Adds a Socket to the PollSet along with a set of events to monitor. Returns the index in the PollSet of the added socket.

func (*PollSet) Update

func (p *PollSet) Update(index int, events EventSet)

Updates the set of monitored events for the socket/file of the given index.

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

A ZeroMQ Socket.

func NewSocket

func NewSocket(socktype SocketType) (*Socket, error)

Creates a new socket using the default context (see DefaultContext).

func (*Socket) Bind

func (s *Socket) Bind(endpoint string) (err error)

Binds the socket to the specified local endpoint address.

func (*Socket) Channels

func (s *Socket) Channels() *Channels

Creates a new Channels object with the default channel buffer size (zero).

func (*Socket) ChannelsBuffer

func (s *Socket) ChannelsBuffer(chanbuf int) (c *Channels)

Creates a new Channels object with the given channel buffer size.

func (*Socket) Close

func (s *Socket) Close()

Closes the socket.

func (*Socket) Connect

func (s *Socket) Connect(endpoint string) (err error)

Connects the socket to the specified remote endpoint.

func (*Socket) Disconnect

func (s *Socket) Disconnect(endpoint string) (err error)

Disconnects the socket from the specified remote endpoint.

func (*Socket) GetAffinity

func (s *Socket) GetAffinity() uint64

func (*Socket) GetBacklog

func (s *Socket) GetBacklog() int

func (*Socket) GetDelayAttachOnConnect

func (s *Socket) GetDelayAttachOnConnect() bool

func (*Socket) GetEvents

func (s *Socket) GetEvents() EventSet

func (*Socket) GetFD

func (s *Socket) GetFD() uintptr

func (*Socket) GetIPv4Only

func (s *Socket) GetIPv4Only() bool

func (*Socket) GetIdentity

func (s *Socket) GetIdentity() []byte

func (*Socket) GetLastEndpoint

func (s *Socket) GetLastEndpoint() string

func (*Socket) GetLinger

func (s *Socket) GetLinger() time.Duration

func (*Socket) GetMaxMsgSize

func (s *Socket) GetMaxMsgSize() int64

func (*Socket) GetMulticastHops

func (s *Socket) GetMulticastHops() int

func (*Socket) GetRate

func (s *Socket) GetRate() (kbits int)

func (*Socket) GetReconnectIVL

func (s *Socket) GetReconnectIVL() time.Duration

func (*Socket) GetReconnectIVLMax

func (s *Socket) GetReconnectIVLMax() time.Duration

func (*Socket) GetRecoveryIVL

func (s *Socket) GetRecoveryIVL() time.Duration

func (*Socket) GetRecvBuffer

func (s *Socket) GetRecvBuffer() (bytes int)

func (*Socket) GetRecvHWM

func (s *Socket) GetRecvHWM() uint64

func (*Socket) GetRecvTimeout

func (s *Socket) GetRecvTimeout() time.Duration

func (*Socket) GetSendBuffer

func (s *Socket) GetSendBuffer() (bytes int)

func (*Socket) GetSendHWM

func (s *Socket) GetSendHWM() uint64

func (*Socket) GetSendTimeout

func (s *Socket) GetSendTimeout() time.Duration

func (*Socket) GetTCPKeepAlive

func (s *Socket) GetTCPKeepAlive() int

func (*Socket) GetTCPKeepAliveCount

func (s *Socket) GetTCPKeepAliveCount() int

func (*Socket) GetTCPKeepAliveIdle

func (s *Socket) GetTCPKeepAliveIdle() int

func (*Socket) GetTCPKeepAliveInterval

func (s *Socket) GetTCPKeepAliveInterval() int

func (*Socket) GetType

func (s *Socket) GetType() SocketType

func (*Socket) Recv

func (s *Socket) Recv() (parts [][]byte, err error)

Receives a multi-part message.

func (*Socket) RecvPart

func (s *Socket) RecvPart() (part []byte, more bool, err error)

Receives a single part along with a boolean flag (more) indicating whether more parts of the same message follow (true), or this is the last part of the message (false). As with Send/SendPart, this is fairly low-level and Recv should generally be used instead.

func (*Socket) Send

func (s *Socket) Send(parts [][]byte) (err error)

Sends a message containing a number of parts.

func (*Socket) SendPart

func (s *Socket) SendPart(part []byte, more bool) (err error)

Sends a single message part. The `more` flag is used to specify whether this is the last part of the message (false), or if there are more parts to follow (true). SendPart is fairly low-level, and usually Send will be the preferred method to use.

func (*Socket) SetAffinity

func (s *Socket) SetAffinity(affinity uint64)

func (*Socket) SetBacklog

func (s *Socket) SetBacklog(backlog int)

func (*Socket) SetDelayAttachOnConnect

func (s *Socket) SetDelayAttachOnConnect(delay bool)

func (*Socket) SetIPv4Only

func (s *Socket) SetIPv4Only(ipv4only bool)

func (*Socket) SetIdentitiy

func (s *Socket) SetIdentitiy(ident []byte)

func (*Socket) SetLinger

func (s *Socket) SetLinger(linger time.Duration)

func (*Socket) SetMaxMsgSize

func (s *Socket) SetMaxMsgSize(bytes int64)

func (*Socket) SetMulticastHops

func (s *Socket) SetMulticastHops(ttl int)

func (*Socket) SetRate

func (s *Socket) SetRate(kbits int)

func (*Socket) SetReconnectIVL

func (s *Socket) SetReconnectIVL(ivl time.Duration)

func (*Socket) SetReconnectIVLMax

func (s *Socket) SetReconnectIVLMax(max time.Duration)

func (*Socket) SetRecoveryIVL

func (s *Socket) SetRecoveryIVL(ivl time.Duration)

func (*Socket) SetRecvBuffer

func (s *Socket) SetRecvBuffer(bytes int)

func (*Socket) SetRecvHWM

func (s *Socket) SetRecvHWM(hwm uint64)

func (*Socket) SetRecvTimeout

func (s *Socket) SetRecvTimeout(timeo time.Duration)

func (*Socket) SetRouterMandatory

func (s *Socket) SetRouterMandatory(errorUnroutable bool)

func (*Socket) SetSendBuffer

func (s *Socket) SetSendBuffer(bytes int)

func (*Socket) SetSendHWM

func (s *Socket) SetSendHWM(hwm uint64)

func (*Socket) SetSendTimeout

func (s *Socket) SetSendTimeout(timeo time.Duration)

func (*Socket) SetTCPAcceptFilter

func (s *Socket) SetTCPAcceptFilter(filter string)

func (*Socket) SetTCPKeepAlive

func (s *Socket) SetTCPKeepAlive(keepalive int)

func (*Socket) SetTCPKeepAliveCount

func (s *Socket) SetTCPKeepAliveCount(count int)

func (*Socket) SetTCPKeepAliveIdle

func (s *Socket) SetTCPKeepAliveIdle(idle int)

func (*Socket) SetTCPKeepAliveInterval

func (s *Socket) SetTCPKeepAliveInterval(interval int)

func (*Socket) SetXPubVerbose

func (s *Socket) SetXPubVerbose(verbose bool)

func (*Socket) Subscribe

func (s *Socket) Subscribe(filter []byte)

Subscribe sets up a filter for incoming messages on Sub sockets.

func (*Socket) Unbind

func (s *Socket) Unbind(endpoint string) (err error)

Unbinds the socket from the specified local endpoint address.

func (*Socket) Unsubscribe

func (s *Socket) Unsubscribe(filter []byte)

Unsubscribes from a filter on a Sub socket.

type SocketType

type SocketType int

Directories

Path Synopsis
Hello World client in Go Connects REQ socket to tcp://localhost:5555 Sends "Hello" to server, expects "World" back Hello World server in Go Binds REP socket to tcp://*:5555 Expects "Hello" from client, replies with "World" Demonstrate identities as used by the request-reply pattern.
Hello World client in Go Connects REQ socket to tcp://localhost:5555 Sends "Hello" to server, expects "World" back Hello World server in Go Binds REP socket to tcp://*:5555 Expects "Hello" from client, replies with "World" Demonstrate identities as used by the request-reply pattern.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL