Documentation
¶
Index ¶
- Constants
- Variables
- func CreateRequestHandler(codec Codec, fun interface{}, logger log.ILogger) func(*Request)
- func CreateResponseHandler(codec Codec, fun interface{}) func(Response)
- func IP() (string, error)
- func Route(name string, src Handler, dest Sender, relayTimeout time.Duration, ...)
- type BinStreamFactory
- type BindListener
- type Client
- func (this *Client) Active() bool
- func (this *Client) Address() string
- func (this *Client) Cancel(name string)
- func (this *Client) Connect(addr string) <-chan error
- func (this *Client) Destroy()
- func (this *Client) Disconnect()
- func (this *Client) Handle(name string, middlewares ...interface{})
- func (this *Client) HandleSerial(name string, middlewares ...interface{})
- func (this *Client) Publish(name string, m interface{}) <-chan error
- func (this *Client) PublishTimeout(name string, m interface{}, timeout time.Duration) <-chan error
- func (this *Client) Push(name string, m interface{}) <-chan error
- func (this *Client) PushTimeout(name string, m interface{}, timeout time.Duration) <-chan error
- func (this *Client) Reconnect() <-chan error
- func (this *Client) Request(name string, payload interface{}, handler interface{}) <-chan error
- func (this *Client) RequestAll(name string, payload interface{}, handler interface{}) <-chan error
- func (this *Client) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
- func (this *Client) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
- func (this *Client) Send(kind EKind, name string, payload interface{}, handler interface{}, ...) <-chan error
- func (this *Client) SetCodec(codec Codec) *Client
- func (this *Client) SetDefaultTimeout(timeout time.Duration)
- func (this *Client) SetGroupId(groupId string) *Client
- func (this *Client) SetReconnectInterval(reconnectInterval time.Duration) *Client
- func (this *Client) SetReconnectMaxInterval(reconnectMaxInterval time.Duration) *Client
- func (this *Client) SetStreamFactory(factory StreamFactory)
- type ClientServer
- type ClientServerConfig
- type Codec
- type Comparer
- type Context
- type EKind
- type Envelope
- type GobCodec
- type Handler
- type HysteresisPolicy
- func (this *HysteresisPolicy) Borrow(name string, initialize func(name string) Comparer) Comparer
- func (this *HysteresisPolicy) Load(name string) Comparer
- func (this *HysteresisPolicy) Quarantined(name string) bool
- func (this *HysteresisPolicy) Return(name string, loaded Comparer, err error)
- func (this *HysteresisPolicy) SetMaxFailures(maxFailures int) *HysteresisPolicy
- func (this *HysteresisPolicy) SetMinSuccesses(minSuccesses int) *HysteresisPolicy
- func (this *HysteresisPolicy) SetQuarantine(quarantine time.Duration) *HysteresisPolicy
- type HysteresisPolicyData
- type InputBinStream
- func (this *InputBinStream) Read(p []byte) (n int, err error)
- func (this *InputBinStream) ReadBytes() ([]byte, error)
- func (this *InputBinStream) ReadNBytes(size int) ([]byte, error)
- func (this *InputBinStream) ReadString() (string, error)
- func (this *InputBinStream) ReadUI16() (uint16, error)
- func (this *InputBinStream) ReadUI32() (uint32, error)
- func (this *InputBinStream) ReadUI64() (uint64, error)
- func (this *InputBinStream) ReadUI8() (uint8, error)
- type InputStream
- type InputTxtStream
- func (this *InputTxtStream) Read(p []byte) (n int, err error)
- func (this *InputTxtStream) ReadBytes() ([]byte, error)
- func (this *InputTxtStream) ReadNBytes(size int) ([]byte, error)
- func (this *InputTxtStream) ReadString() (string, error)
- func (this *InputTxtStream) ReadUI16() (uint16, error)
- func (this *InputTxtStream) ReadUI32() (uint32, error)
- func (this *InputTxtStream) ReadUI64() (uint64, error)
- func (this *InputTxtStream) ReadUI8() (uint8, error)
- type JsonCodec
- type KeyValue
- type KeyValueItem
- type LBPolicy
- type LoadBalancer
- type Log
- func (lg Log) CallerAt(depth int) l.ILogger
- func (lg Log) Debugf(format string, args ...interface{})
- func (lg Log) Errorf(format string, args ...interface{})
- func (lg Log) Fatalf(format string, args ...interface{})
- func (lg Log) Infof(format string, args ...interface{})
- func (lg Log) IsActive(level l.LogLevel) bool
- func (lg Log) Tracef(format string, args ...interface{})
- func (lg Log) Warnf(format string, args ...interface{})
- type Looper
- type Middleware
- type Msg
- type OutputBinStream
- func (this *OutputBinStream) Write(p []byte) (n int, err error)
- func (this *OutputBinStream) WriteBytes(data []byte) error
- func (this *OutputBinStream) WriteString(s string) error
- func (this *OutputBinStream) WriteUI16(data uint16) error
- func (this *OutputBinStream) WriteUI32(data uint32) error
- func (this *OutputBinStream) WriteUI64(data uint64) error
- func (this *OutputBinStream) WriteUI8(data uint8) error
- type OutputStream
- type OutputTxtStream
- func (this *OutputTxtStream) Write(p []byte) (n int, err error)
- func (this *OutputTxtStream) WriteBytes(data []byte) error
- func (this *OutputTxtStream) WriteString(s string) error
- func (this *OutputTxtStream) WriteUI16(data uint16) error
- func (this *OutputTxtStream) WriteUI32(data uint32) error
- func (this *OutputTxtStream) WriteUI64(data uint64) error
- func (this *OutputTxtStream) WriteUI8(data uint8) error
- type Request
- func (this *Request) DeferReply()
- func (this *Request) Next()
- func (this *Request) Payload() []byte
- func (this *Request) Reply() []byte
- func (this *Request) ReplyAs(kind EKind, reply []byte)
- func (this *Request) SendFault(err error)
- func (this *Request) SendReply(reply []byte)
- func (this *Request) SetFault(err error)
- func (this *Request) SetReply(payload []byte)
- func (this *Request) Terminate()
- func (this *Request) Terminated() bool
- func (this *Request) Writer() OutputStream
- type Response
- type RoundRobinPolicy
- type SendEvent
- type SendListener
- type Sender
- type Server
- func (this *Server) AddBindListeners(listener BindListener) uint64
- func (this *Server) BindAddress() net.Addr
- func (this *Server) BindPort() int
- func (this *Server) Cancel(name string)
- func (this *Server) Destroy()
- func (this *Server) Handle(name string, middlewares ...interface{})
- func (this *Server) HandleSerial(name string, middlewares ...interface{})
- func (this *Server) Listen(service string) <-chan error
- func (this *Server) Listener() net.Listener
- func (this *Server) Port() int
- func (this *Server) RemoveBindListener(idx uint64)
- func (this *Server) Route(name string, timeout time.Duration, before func(x *Request) bool, ...)
- func (this *Server) SetBufferSize(size int)
- func (this *Server) SetRateLimiterFactory(factory func() tk.Rate)
- type ServiceUnavailableError
- type SimpleLB
- func (lb *SimpleLB) Add(w *Wire)
- func (lb *SimpleLB) AllDone(msg Envelope, err error) error
- func (lb *SimpleLB) Done(wirer Wirer, msg Envelope, err error)
- func (lb *SimpleLB) PickAll(msg Envelope, wires []*Wire) ([]*Wire, error)
- func (lb *SimpleLB) PickOne(msg Envelope, wires []*Wire) (*Wire, error)
- func (lb *SimpleLB) Remove(w *Wire)
- func (lb *SimpleLB) SetPolicyFactory(factory func() LBPolicy)
- func (lb *SimpleLB) Use(wire *Wire, msg Envelope) Wirer
- type Stickies
- type Sticky
- type StreamFactory
- type SystemError
- type Timeout
- type TimeoutError
- type TopicEvent
- type TopicListener
- type TxtStreamFactory
- type UnknownTopic
- type Wire
- func (this *Wire) AddDropTopicListener(listener TopicListener) uint64
- func (this *Wire) AddNewTopicListener(listener TopicListener) uint64
- func (this *Wire) AddSendListener(listener SendListener) uint64
- func (this *Wire) Conn() net.Conn
- func (this *Wire) Destroy()
- func (this *Wire) HasRemoteTopic(name string) bool
- func (this *Wire) Logger() log.ILogger
- func (this *Wire) RemoteMetadata() map[string]interface{}
- func (this *Wire) RemoteUuid() tk.UUID
- func (this *Wire) RemoveDropTopicListener(idx uint64)
- func (this *Wire) RemoveNewTopicListener(idx uint64)
- func (this *Wire) RemoveSendListener(idx uint64)
- func (this *Wire) Send(msg Envelope) <-chan error
- func (this *Wire) SetBufferSize(size int)
- func (this *Wire) SetConn(c net.Conn)
- func (this *Wire) SetLogger(l log.ILogger)
- func (this *Wire) SetRateLimiter(limiter tk.Rate)
- func (this *Wire) SetTimeout(timeout time.Duration)
- type Wired
- type Wirer
- type Wires
- func (this *Wires) Add(wire *Wire)
- func (this *Wires) AddDropTopicListener(listener TopicListener) uint64
- func (this *Wires) AddNewTopicListener(listener TopicListener) uint64
- func (this *Wires) AddSendListener(listener SendListener) uint64
- func (this *Wires) Codec() Codec
- func (this *Wires) Destroy()
- func (this *Wires) Find(fn func(w *Wire) bool) *Wire
- func (this *Wires) Get(conn net.Conn) *Wire
- func (this *Wires) GetAll() []*Wire
- func (this *Wires) Kill(conn net.Conn)
- func (this *Wires) LoadBalancer() LoadBalancer
- func (this *Wires) Logger() log.ILogger
- func (this *Wires) Publish(name string, payload interface{}) <-chan error
- func (this *Wires) PublishTimeout(name string, payload interface{}, timeout time.Duration) <-chan error
- func (this *Wires) Push(name string, payload interface{}) <-chan error
- func (this *Wires) PushTimeout(name string, payload interface{}, timeout time.Duration) <-chan error
- func (this *Wires) RemoveDropTopicListener(idx uint64)
- func (this *Wires) RemoveNewTopicListener(idx uint64)
- func (this *Wires) RemoveSendListener(idx uint64)
- func (this *Wires) Request(name string, payload interface{}, handler interface{}) <-chan error
- func (this *Wires) RequestAll(name string, payload interface{}, handler interface{}) <-chan error
- func (this *Wires) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
- func (this *Wires) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
- func (this *Wires) Send(kind EKind, name string, payload interface{}, handler interface{}, ...) <-chan error
- func (this *Wires) SendSkip(skipWire *Wire, kind EKind, name string, payload interface{}, ...) <-chan error
- func (this *Wires) SetBufferSize(size int)
- func (this *Wires) SetCodec(codec Codec) *Wires
- func (this *Wires) SetDefaultTimeout(timeout time.Duration)
- func (this *Wires) SetLoadBalancer(loadBalancer LoadBalancer)
- func (this *Wires) SetLogger(l log.ILogger)
- func (this *Wires) SetRateLimiterFactory(factory func() tk.Rate)
- func (this *Wires) SetStreamFactory(factory StreamFactory)
- func (this *Wires) Size() int
- func (this *Wires) TopicCount(name string) int
Constants ¶
const BIN_STRING_END byte = 0
const FILTER_TOKEN = "*"
const TXT_STRING_END byte = '\n'
Variables ¶
var ( NOCODEC = errors.New("No codec defined") EOR = errors.New("End Of Multiple Reply") NACKERROR = errors.New("Not Acknowledge Error") UNKNOWNTOPIC = "No registered subscriber for %s." TIMEOUT = "Timeout (%s) while waiting for reply of call #%d %s(%s)=%s" UNAVAILABLESERVICE = "No service is available for %s." )
errors
Functions ¶
func CreateRequestHandler ¶
func CreateResponseHandler ¶
Types ¶
type BinStreamFactory ¶
type BinStreamFactory struct { }
func (BinStreamFactory) Input ¶
func (this BinStreamFactory) Input(r io.Reader) InputStream
func (BinStreamFactory) Output ¶
func (this BinStreamFactory) Output(w io.Writer) OutputStream
type BindListener ¶
type Client ¶
type Client struct { ClientServer *Wire // contains filtered or unexported fields }
func (*Client) Disconnect ¶
func (this *Client) Disconnect()
func (*Client) Handle ¶
name can have an '*' at the end, meaning that it will handle messages with the destiny name starting with the reply name whitout the '*'. When handling request messages, the function handler can have a return value and/or an error. When handling publish/push messages, any return from the function handler is discarded. When handling Request/RequestAll messages, if a return is not specified, the caller will not receive a reply until you explicitly call gomsg.Request.ReplyAs()
func (*Client) HandleSerial ¶
HandleSerial is the same as Handle except that it handles requests in sequence. It next request is handled after it returns from the previous.
func (*Client) PublishTimeout ¶
If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.
func (*Client) PushTimeout ¶
PushTimeout is the same as Push with a timeout definition
func (*Client) RequestAll ¶
RequestAll requests messages to all connected clients of the same server. If a client is not connected it is forever lost.
func (*Client) RequestAllTimeout ¶
func (this *Client) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
RequestAllTimeout requests messages to all connected clients of the same server. If a client is not connected it is forever lost.
func (*Client) RequestTimeout ¶
func (this *Client) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
RequestTimeout is the same as Request with a timeout definition
func (*Client) Send ¶
func (this *Client) Send(kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
When the payload is of type []byte it passes the raw bytes without encoding.
func (*Client) SetDefaultTimeout ¶
func (*Client) SetGroupId ¶
Make it belong to a group. Only one element at a time (round-robin) handles the messages.
func (*Client) SetReconnectInterval ¶
func (*Client) SetReconnectMaxInterval ¶
func (*Client) SetStreamFactory ¶
func (this *Client) SetStreamFactory(factory StreamFactory)
type ClientServer ¶
type ClientServer struct { OnConnect func(w *Wire) OnClose func(c net.Conn) // contains filtered or unexported fields }
func NewClientServer ¶
func NewClientServer() ClientServer
func (*ClientServer) Destroy ¶
func (this *ClientServer) Destroy()
func (*ClientServer) Metadata ¶
func (this *ClientServer) Metadata() map[string]interface{}
type ClientServerConfig ¶
type ClientServerConfig struct { }
type Context ¶
Context is used to pass all the data to replying function if we so wish. payload contains the data for incoming messages and reply contains the reply data err is used in REPlies in case something went wrong
type EKind ¶
type EKind uint8
const ( SUB EKind // indicates that wants to receive messages for a topic UNSUB REQ REQALL PUSH PUB REP // terminates the REQALL ERR // terminates the REQALL REP_PARTIAL // partial reply from a request all ERR_PARTIAL // partial error from a request all ACK // Successful delivery of a PUSH message or it is a End Of Replies NACK )
type Envelope ¶
type HysteresisPolicy ¶
type HysteresisPolicy struct { sync.RWMutex MaxFailures int MinSuccesses int Quarantine time.Duration // contains filtered or unexported fields }
HysteresisPolicy is the policy to control the connections load. For each topic name there will be a load that is the sum time that the connection is borrowed. Each load is initiated with a load, usually the minimum load for all wires for that topic. The strategy used in this policy is of a Circuit Breaker. After the circuit is open, a quarantine time is observed. After that quarantine, the topic is tested letting ONE connection pass. Until that connection is returned, no more connections are borrowed. After the connection is returned is when we test if we should close the circuit.
func NewHysteresisPolicy ¶
func NewHysteresisPolicy() *HysteresisPolicy
func (*HysteresisPolicy) Borrow ¶
func (this *HysteresisPolicy) Borrow(name string, initialize func(name string) Comparer) Comparer
func (*HysteresisPolicy) Load ¶
func (this *HysteresisPolicy) Load(name string) Comparer
func (*HysteresisPolicy) Quarantined ¶
func (this *HysteresisPolicy) Quarantined(name string) bool
func (*HysteresisPolicy) Return ¶
func (this *HysteresisPolicy) Return(name string, loaded Comparer, err error)
func (*HysteresisPolicy) SetMaxFailures ¶
func (this *HysteresisPolicy) SetMaxFailures(maxFailures int) *HysteresisPolicy
func (*HysteresisPolicy) SetMinSuccesses ¶
func (this *HysteresisPolicy) SetMinSuccesses(minSuccesses int) *HysteresisPolicy
func (*HysteresisPolicy) SetQuarantine ¶
func (this *HysteresisPolicy) SetQuarantine(quarantine time.Duration) *HysteresisPolicy
type HysteresisPolicyData ¶
type HysteresisPolicyData struct {
// contains filtered or unexported fields
}
type InputBinStream ¶
type InputBinStream struct {
// contains filtered or unexported fields
}
func NewInputBinStream ¶
func NewInputBinStream(reader io.Reader) *InputBinStream
func (*InputBinStream) ReadBytes ¶
func (this *InputBinStream) ReadBytes() ([]byte, error)
func (*InputBinStream) ReadNBytes ¶
func (this *InputBinStream) ReadNBytes(size int) ([]byte, error)
func (*InputBinStream) ReadString ¶
func (this *InputBinStream) ReadString() (string, error)
func (*InputBinStream) ReadUI16 ¶
func (this *InputBinStream) ReadUI16() (uint16, error)
func (*InputBinStream) ReadUI32 ¶
func (this *InputBinStream) ReadUI32() (uint32, error)
func (*InputBinStream) ReadUI64 ¶
func (this *InputBinStream) ReadUI64() (uint64, error)
func (*InputBinStream) ReadUI8 ¶
func (this *InputBinStream) ReadUI8() (uint8, error)
type InputStream ¶
type InputTxtStream ¶
type InputTxtStream struct {
// contains filtered or unexported fields
}
InputTxtStream will read data one item per line
func NewInputTxtStream ¶
func NewInputTxtStream(r io.Reader) *InputTxtStream
func (*InputTxtStream) ReadBytes ¶
func (this *InputTxtStream) ReadBytes() ([]byte, error)
func (*InputTxtStream) ReadNBytes ¶
func (this *InputTxtStream) ReadNBytes(size int) ([]byte, error)
func (*InputTxtStream) ReadString ¶
func (this *InputTxtStream) ReadString() (string, error)
func (*InputTxtStream) ReadUI16 ¶
func (this *InputTxtStream) ReadUI16() (uint16, error)
func (*InputTxtStream) ReadUI32 ¶
func (this *InputTxtStream) ReadUI32() (uint32, error)
func (*InputTxtStream) ReadUI64 ¶
func (this *InputTxtStream) ReadUI64() (uint64, error)
func (*InputTxtStream) ReadUI8 ¶
func (this *InputTxtStream) ReadUI8() (uint8, error)
type KeyValue ¶
type KeyValue struct {
Items []*KeyValueItem
}
func NewKeyValue ¶
func NewKeyValue() *KeyValue
func (*KeyValue) Find ¶
func (kv *KeyValue) Find(fn func(item *KeyValueItem) bool) *KeyValueItem
type KeyValueItem ¶
type KeyValueItem struct { Key interface{} Value interface{} }
type LoadBalancer ¶
type LoadBalancer interface { SetPolicyFactory(func() LBPolicy) // Add is called when a new wire is created Add(*Wire) // Remove is called when the wire is killed Remove(*Wire) // PickOne is called before the message is sent PickOne(Envelope, []*Wire) (*Wire, error) // PickAll is called before the message is sent PickAll(Envelope, []*Wire) ([]*Wire, error) // Use is called before the message is sent Use(*Wire, Envelope) Wirer // Done is called when we are done with one wire Done(Wirer, Envelope, error) // AllDone is called when ALL wires have been processed AllDone(Envelope, error) error }
type Middleware ¶
type Middleware func(*Request)
type Msg ¶
type Msg struct { OutputStream // contains filtered or unexported fields }
func NewMsg ¶
func NewMsg(factory StreamFactory) *Msg
type OutputBinStream ¶
type OutputBinStream struct {
// contains filtered or unexported fields
}
func NewOutputBinStream ¶
func NewOutputBinStream(writer io.Writer) *OutputBinStream
func (*OutputBinStream) WriteBytes ¶
func (this *OutputBinStream) WriteBytes(data []byte) error
func (*OutputBinStream) WriteString ¶
func (this *OutputBinStream) WriteString(s string) error
func (*OutputBinStream) WriteUI16 ¶
func (this *OutputBinStream) WriteUI16(data uint16) error
func (*OutputBinStream) WriteUI32 ¶
func (this *OutputBinStream) WriteUI32(data uint32) error
func (*OutputBinStream) WriteUI64 ¶
func (this *OutputBinStream) WriteUI64(data uint64) error
func (*OutputBinStream) WriteUI8 ¶
func (this *OutputBinStream) WriteUI8(data uint8) error
type OutputStream ¶
type OutputTxtStream ¶
type OutputTxtStream struct {
// contains filtered or unexported fields
}
func NewOutputTxtStream ¶
func NewOutputTxtStream(writer io.Writer) *OutputTxtStream
func (*OutputTxtStream) WriteBytes ¶
func (this *OutputTxtStream) WriteBytes(data []byte) error
func (*OutputTxtStream) WriteString ¶
func (this *OutputTxtStream) WriteString(s string) error
func (*OutputTxtStream) WriteUI16 ¶
func (this *OutputTxtStream) WriteUI16(data uint16) error
func (*OutputTxtStream) WriteUI32 ¶
func (this *OutputTxtStream) WriteUI32(data uint32) error
func (*OutputTxtStream) WriteUI64 ¶
func (this *OutputTxtStream) WriteUI64(data uint64) error
func (*OutputTxtStream) WriteUI8 ¶
func (this *OutputTxtStream) WriteUI8(data uint8) error
type Request ¶
type Request struct { Response // contains filtered or unexported fields }
func (*Request) DeferReply ¶
func (this *Request) DeferReply()
DeferReply indicates that the reply won't be sent immediatly. The reply will eventually be sent by calling with gomsg.Request.ReplyAs().
func (*Request) Terminate ¶
func (this *Request) Terminate()
Terminate terminates a series of replies by sending an ACK message to the caller. It can also be used to reject a request, since this terminates the request without sending a reply payload.
func (*Request) Terminated ¶
func (*Request) Writer ¶
func (this *Request) Writer() OutputStream
type Response ¶
type Response struct { *Context // contains filtered or unexported fields }
func NewResponse ¶
func (Response) Reader ¶
func (this Response) Reader() InputStream
type RoundRobinPolicy ¶
func (*RoundRobinPolicy) Borrow ¶
func (this *RoundRobinPolicy) Borrow(topic string, initializer func(topic string) Comparer) Comparer
func (*RoundRobinPolicy) Load ¶
func (this *RoundRobinPolicy) Load(topic string) Comparer
Load is the current load for a service
func (*RoundRobinPolicy) Quarantined ¶
func (this *RoundRobinPolicy) Quarantined(topic string) bool
Quarantined returns if it is in quarantine
type SendListener ¶
type SendListener func(event SendEvent)
type Server ¶
type Server struct { ClientServer *Wires // contains filtered or unexported fields }
func (*Server) AddBindListeners ¶
func (this *Server) AddBindListeners(listener BindListener) uint64
func (*Server) BindAddress ¶
BindAddress returns the listener address
func (*Server) Destroy ¶
func (this *Server) Destroy()
Destroy closes all connections and the listener
func (*Server) Handle ¶
Handle defines the function that will handle messages for a topic. name can have an '*' at the end, meaning that it will handle messages with the destiny name starting with the reply name (whitout the '*'). When handling request messages, the function handler can have a return value and/or an error. When handling publish/push messages, any return from the function handler is discarded. When handling Request/RequestAll messages, if a return is not specified, the caller will not receive a reply until you explicitly call gomsg.Request.ReplyAs()
func (*Server) HandleSerial ¶
func (*Server) RemoveBindListener ¶
RemoveBindListener removes a previously added listener on send messages
func (*Server) Route ¶
func (this *Server) Route(name string, timeout time.Duration, before func(x *Request) bool, after func(x *Response))
Messages are from one client and delivered to another client. The sender client does not receive his message. The handler execution is canceled. Arriving replies from endpoints are piped to the requesting wire
func (*Server) SetBufferSize ¶
func (*Server) SetRateLimiterFactory ¶
type ServiceUnavailableError ¶
type ServiceUnavailableError error
type SimpleLB ¶
func NewSimpleLB ¶
func NewSimpleLB() *SimpleLB
func (*SimpleLB) SetPolicyFactory ¶
type Stickies ¶
type StreamFactory ¶
type StreamFactory interface { Input(io.Reader) InputStream Output(io.Writer) OutputStream }
type SystemError ¶
type SystemError error
type Timeout ¶
type Timeout struct {
// contains filtered or unexported fields
}
Timeout is a timer over a generic element, that will call a function when a specified timeout occurs. It is possible to delay the timeout.
func NewTimeout ¶
NewTimeout create a timeout
type TimeoutError ¶
type TimeoutError error
type TopicEvent ¶
func (TopicEvent) String ¶
func (e TopicEvent) String() string
type TopicListener ¶
type TopicListener func(event TopicEvent)
type TxtStreamFactory ¶
type TxtStreamFactory struct { }
func (TxtStreamFactory) Input ¶
func (this TxtStreamFactory) Input(r io.Reader) InputStream
func (TxtStreamFactory) Output ¶
func (this TxtStreamFactory) Output(w io.Writer) OutputStream
type UnknownTopic ¶
type UnknownTopic error
Specific error types are define so that we can use type assertion, if needed
type Wire ¶
type Wire struct { // load balancer failure policy Policy LBPolicy // contains filtered or unexported fields }
func (*Wire) AddDropTopicListener ¶
func (this *Wire) AddDropTopicListener(listener TopicListener) uint64
func (*Wire) AddNewTopicListener ¶
func (this *Wire) AddNewTopicListener(listener TopicListener) uint64
func (*Wire) AddSendListener ¶
func (this *Wire) AddSendListener(listener SendListener) uint64
AddSendListener adds a listener on send messages (Publis/Push/RequestAll/Request)
func (*Wire) HasRemoteTopic ¶
func (*Wire) RemoteMetadata ¶
func (*Wire) RemoteUuid ¶
func (*Wire) RemoveDropTopicListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wire) RemoveNewTopicListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wire) RemoveSendListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wire) SetBufferSize ¶
func (*Wire) SetRateLimiter ¶
func (*Wire) SetTimeout ¶
type Wires ¶
type Wires struct {
// contains filtered or unexported fields
}
Wires manages a collection of connections as if they were one. Connections are grouped accordingly to its group id. A wire with an empty group id means all nodes are different.
Wires with the same non empty id are treated as mirrors of each other. This means that we only need to call one of them. The other nodes function as High Availability and load balancing nodes
func NewWires ¶
func NewWires(codec Codec, l log.ILogger, factory StreamFactory) *Wires
NewWires creates a Wires structure
func (*Wires) AddDropTopicListener ¶
func (this *Wires) AddDropTopicListener(listener TopicListener) uint64
func (*Wires) AddNewTopicListener ¶
func (this *Wires) AddNewTopicListener(listener TopicListener) uint64
func (*Wires) AddSendListener ¶
func (this *Wires) AddSendListener(listener SendListener) uint64
AddSendListener adds a listener on send messages (Publis/Push/RequestAll/Request)
func (*Wires) LoadBalancer ¶
func (this *Wires) LoadBalancer() LoadBalancer
func (*Wires) Publish ¶
Publish sends a message without any reply If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.
func (*Wires) PublishTimeout ¶
func (*Wires) Push ¶
Push sends a message and receive an acknowledge If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.
func (*Wires) PushTimeout ¶
func (*Wires) RemoveDropTopicListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wires) RemoveNewTopicListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wires) RemoveSendListener ¶
RemoveSendListener removes a previously added listener on send messages
func (*Wires) Request ¶
Request sends a message and waits for the reply If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload. This is useful if we want to implement a broker.
func (*Wires) RequestAll ¶
RequestAll requests messages to all connected clients. If a client is not connected it is forever lost.
func (*Wires) RequestAllTimeout ¶
func (this *Wires) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
RequestAll requests messages to all connected clients. If a client is not connected it is forever lost.
func (*Wires) RequestTimeout ¶
func (this *Wires) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
RequestTimeout sends a message and waits for the reply
func (*Wires) Send ¶
func (this *Wires) Send(kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
Send is the generic function to send messages When the payload is of type []byte it passes the raw bytes without encoding. When the payload is of type mybus.Msg it passes the FRAMED raw bytes without encoding.
func (*Wires) SendSkip ¶
func (this *Wires) SendSkip(skipWire *Wire, kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
SendSkip is the generic function to send messages with the possibility of ignoring the sender
func (*Wires) SetBufferSize ¶
func (*Wires) SetDefaultTimeout ¶
func (*Wires) SetLoadBalancer ¶
func (this *Wires) SetLoadBalancer(loadBalancer LoadBalancer)
func (*Wires) SetRateLimiterFactory ¶
func (*Wires) SetStreamFactory ¶
func (this *Wires) SetStreamFactory(factory StreamFactory)
func (*Wires) TopicCount ¶
TopicCount returns the number of clients providing the topic