gomsg

package module
v0.0.0-...-43cd4a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2017 License: BSD-2-Clause Imports: 20 Imported by: 2

README

gomsg

Building blocks for message networks solutions in Go

This is a WORK IN PROGRESS.

This is an exercise to see what I could do in Go regarding networking and also to learn a bit more about Go.

The idea is that one client endpoint connects to a server endpoint, and as soon the connection is established you can send and receive messages in either endpoint. Also a server can act as a router/broker for messages between clients.

This is loosely inspired by zeromq

Features

  • Publish/Subscribe
  • Push/Pull
  • Request/Reply (only one provider replies)
  • RequestAll/ReplyAll (all providers reply)
  • Message handler functions receiving any kind of data
  • Message Filters
  • Client Groups
  • Temporary sticky endpoint by topic

A simple example

// my custom data
type Sample struct {
    instant     time.Time
    voltage     float64
}
// create server
var server = gomsg.NewServer()
// we can handle messages in the server or in the client
server.Handle("TELEMETRY", func(data Sample) {
    fmt.Printf("received %+v\n", data)
})
server.Listen(":7777")

// create a client - the one that connects
var cli = gomsg.NewClient()
cli.Connect("localhost:7777")
// Publish data to the other end
cli.Publish("TELEMETRY", Sample{time.Now(), 234.56})

// give time for things to happen
time.Sleep(time.Millisecond * 10)

The following example show a simple message bus. Three clients connect to a server, and one of them publishs a message that will be received by the other two.

server := gomsg.NewServer()
 // route all messages between the clients
server.Route("*", time.Second, nil)
server.Listen(":7777")

cli1 := gomsg.NewClient()
cli1.Handle("HELLO", func(m string) {
	fmt.Println("[1] Hello", m)
})
cli1.Connect("localhost:7777")

cli2 := gomsg.NewClient()
cli2.Handle("HELLO", func(m string) {
	fmt.Println("[2] Hello", m)
})
cli2.Connect("localhost:7777")

cli := gomsg.NewClient()
cli.Connect("localhost:7777")

cli.Publish("HELLO", "World")

// give time for things to happen
time.Sleep(time.Millisecond * 100)

Another nice feature is possibility of grouping clients.

If a client identifies the group that it belongs the server will only send the message to one of the clients. This way we can have the server publish messages and have the clients decide if they want to behave like Publish/Subscribe (ungrouped) or like Push/Push (grouped).

server := gomsg.NewServer()
server.Listen(":7777")
server.Route("*", time.Second, nil)

// Group HA subscriber
cli := gomsg.NewClient()
cli.Handle("HELLO", func(m string) {
	fmt.Println("<=== [1] processing:", m)
})
cli.SetGroupId("HA")
cli.Connect("localhost:7777")

// Group HA subscriber
cli2 := gomsg.NewClient()
cli2.Handle("HELLO", func(m string) {
	fmt.Println("<=== [2] processing:", m)
})
cli2.SetGroupId("HA")
cli2.Connect("localhost:7777")

// publisher
cli3 := gomsg.NewClient()
cli3.Connect("localhost:7777")

// Only one element of the group HA will process
// each message, alternately (round robin).
cli3.Publish("HELLO", "Hello World!")
cli3.Publish("HELLO", "Olá Mundo!")
cli3.Publish("HELLO", "YESSSS!")
cli3.Publish("HELLO", "one")
cli3.Publish("HELLO", "two")
cli3.Publish("HELLO", "three")
cli3.Publish("HELLO", "test")

time.Sleep(time.Millisecond * 100)

Dependencies

Go 1.1+

Todo

  • More Tests
  • Documentation

Documentation

Index

Constants

View Source
const BIN_STRING_END byte = 0
View Source
const FILTER_TOKEN = "*"
View Source
const TXT_STRING_END byte = '\n'

Variables

View Source
var (
	NOCODEC   = errors.New("No codec defined")
	EOR       = errors.New("End Of Multiple Reply")
	NACKERROR = errors.New("Not Acknowledge Error")

	UNKNOWNTOPIC       = "No registered subscriber for %s."
	TIMEOUT            = "Timeout (%s) while waiting for reply of call #%d %s(%s)=%s"
	UNAVAILABLESERVICE = "No service is available for %s."
)

errors

Functions

func CreateRequestHandler

func CreateRequestHandler(codec Codec, fun interface{}, logger log.ILogger) func(*Request)

func CreateResponseHandler

func CreateResponseHandler(codec Codec, fun interface{}) func(Response)

func IP

func IP() (string, error)

func Route

func Route(name string, src Handler, dest Sender, relayTimeout time.Duration, before func(c *Request) bool, after func(c *Response))

Route messages between to different binding ports.

Types

type BinStreamFactory

type BinStreamFactory struct {
}

func (BinStreamFactory) Input

func (this BinStreamFactory) Input(r io.Reader) InputStream

func (BinStreamFactory) Output

func (this BinStreamFactory) Output(w io.Writer) OutputStream

type BindListener

type BindListener func(net.Listener)

type Client

type Client struct {
	ClientServer
	*Wire
	// contains filtered or unexported fields
}

func NewClient

func NewClient() *Client

NewClient creates a Client

func (*Client) Active

func (this *Client) Active() bool

Active check if this wire is running, i.e., if it has a connection

func (*Client) Address

func (this *Client) Address() string

Address returns the Server address

func (*Client) Cancel

func (this *Client) Cancel(name string)

func (*Client) Connect

func (this *Client) Connect(addr string) <-chan error

connect is separated to allow the definition and use of OnConnect

func (*Client) Destroy

func (this *Client) Destroy()

func (*Client) Disconnect

func (this *Client) Disconnect()

func (*Client) Handle

func (this *Client) Handle(name string, middlewares ...interface{})

name can have an '*' at the end, meaning that it will handle messages with the destiny name starting with the reply name whitout the '*'. When handling request messages, the function handler can have a return value and/or an error. When handling publish/push messages, any return from the function handler is discarded. When handling Request/RequestAll messages, if a return is not specified, the caller will not receive a reply until you explicitly call gomsg.Request.ReplyAs()

func (*Client) HandleSerial

func (this *Client) HandleSerial(name string, middlewares ...interface{})

HandleSerial is the same as Handle except that it handles requests in sequence. It next request is handled after it returns from the previous.

func (*Client) Publish

func (this *Client) Publish(name string, m interface{}) <-chan error

Publish sends a message without any reply

func (*Client) PublishTimeout

func (this *Client) PublishTimeout(name string, m interface{}, timeout time.Duration) <-chan error

If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.

func (*Client) Push

func (this *Client) Push(name string, m interface{}) <-chan error

Push sends a message and receive an acknowledge

func (*Client) PushTimeout

func (this *Client) PushTimeout(name string, m interface{}, timeout time.Duration) <-chan error

PushTimeout is the same as Push with a timeout definition

func (*Client) Reconnect

func (this *Client) Reconnect() <-chan error

func (*Client) Request

func (this *Client) Request(name string, payload interface{}, handler interface{}) <-chan error

Request sends a message and waits for the reply

func (*Client) RequestAll

func (this *Client) RequestAll(name string, payload interface{}, handler interface{}) <-chan error

RequestAll requests messages to all connected clients of the same server. If a client is not connected it is forever lost.

func (*Client) RequestAllTimeout

func (this *Client) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

RequestAllTimeout requests messages to all connected clients of the same server. If a client is not connected it is forever lost.

func (*Client) RequestTimeout

func (this *Client) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

RequestTimeout is the same as Request with a timeout definition

func (*Client) Send

func (this *Client) Send(kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

When the payload is of type []byte it passes the raw bytes without encoding.

func (*Client) SetCodec

func (this *Client) SetCodec(codec Codec) *Client

func (*Client) SetDefaultTimeout

func (this *Client) SetDefaultTimeout(timeout time.Duration)

func (*Client) SetGroupId

func (this *Client) SetGroupId(groupId string) *Client

Make it belong to a group. Only one element at a time (round-robin) handles the messages.

func (*Client) SetReconnectInterval

func (this *Client) SetReconnectInterval(reconnectInterval time.Duration) *Client

func (*Client) SetReconnectMaxInterval

func (this *Client) SetReconnectMaxInterval(reconnectMaxInterval time.Duration) *Client

func (*Client) SetStreamFactory

func (this *Client) SetStreamFactory(factory StreamFactory)

type ClientServer

type ClientServer struct {
	OnConnect func(w *Wire)
	OnClose   func(c net.Conn)
	// contains filtered or unexported fields
}

func NewClientServer

func NewClientServer() ClientServer

func (*ClientServer) Destroy

func (this *ClientServer) Destroy()

func (*ClientServer) Metadata

func (this *ClientServer) Metadata() map[string]interface{}

type ClientServerConfig

type ClientServerConfig struct {
}

type Codec

type Codec interface {
	Encode(interface{}) ([]byte, error)
	Decode([]byte, interface{}) error
}

type Comparer

type Comparer interface {
	// Compare returns 0 if equal, -1 if lesser and 1 if greater
	Compare(Comparer) int
}

type Context

type Context struct {
	Kind EKind
	Name string
	// contains filtered or unexported fields
}

Context is used to pass all the data to replying function if we so wish. payload contains the data for incoming messages and reply contains the reply data err is used in REPlies in case something went wrong

func (*Context) Connection

func (this *Context) Connection() net.Conn

Connection getter

type EKind

type EKind uint8
const (
	SUB EKind // indicates that wants to receive messages for a topic
	UNSUB
	REQ
	REQALL
	PUSH
	PUB
	REP         // terminates the REQALL
	ERR         // terminates the REQALL
	REP_PARTIAL // partial reply from a request all
	ERR_PARTIAL // partial error from a request all
	ACK         // Successful delivery of a PUSH message or it is a End Of Replies
	NACK
)

func (EKind) String

func (this EKind) String() string

type Envelope

type Envelope struct {
	Kind     EKind
	Sequence uint32
	Name     string
	Payload  []byte
	// contains filtered or unexported fields
}

func (Envelope) Reply

func (this Envelope) Reply(r Response)

func (Envelope) String

func (this Envelope) String() string

type GobCodec

type GobCodec struct {
}

func (GobCodec) Decode

func (this GobCodec) Decode(payload []byte, p interface{}) error

func (GobCodec) Encode

func (this GobCodec) Encode(data interface{}) ([]byte, error)

type Handler

type Handler interface {
	Handle(string, ...interface{})
}

type HysteresisPolicy

type HysteresisPolicy struct {
	sync.RWMutex

	MaxFailures  int
	MinSuccesses int
	Quarantine   time.Duration
	// contains filtered or unexported fields
}

HysteresisPolicy is the policy to control the connections load. For each topic name there will be a load that is the sum time that the connection is borrowed. Each load is initiated with a load, usually the minimum load for all wires for that topic. The strategy used in this policy is of a Circuit Breaker. After the circuit is open, a quarantine time is observed. After that quarantine, the topic is tested letting ONE connection pass. Until that connection is returned, no more connections are borrowed. After the connection is returned is when we test if we should close the circuit.

func NewHysteresisPolicy

func NewHysteresisPolicy() *HysteresisPolicy

func (*HysteresisPolicy) Borrow

func (this *HysteresisPolicy) Borrow(name string, initialize func(name string) Comparer) Comparer

func (*HysteresisPolicy) Load

func (this *HysteresisPolicy) Load(name string) Comparer

func (*HysteresisPolicy) Quarantined

func (this *HysteresisPolicy) Quarantined(name string) bool

func (*HysteresisPolicy) Return

func (this *HysteresisPolicy) Return(name string, loaded Comparer, err error)

func (*HysteresisPolicy) SetMaxFailures

func (this *HysteresisPolicy) SetMaxFailures(maxFailures int) *HysteresisPolicy

func (*HysteresisPolicy) SetMinSuccesses

func (this *HysteresisPolicy) SetMinSuccesses(minSuccesses int) *HysteresisPolicy

func (*HysteresisPolicy) SetQuarantine

func (this *HysteresisPolicy) SetQuarantine(quarantine time.Duration) *HysteresisPolicy

type HysteresisPolicyData

type HysteresisPolicyData struct {
	// contains filtered or unexported fields
}

type InputBinStream

type InputBinStream struct {
	// contains filtered or unexported fields
}

func NewInputBinStream

func NewInputBinStream(reader io.Reader) *InputBinStream

func (*InputBinStream) Read

func (this *InputBinStream) Read(p []byte) (n int, err error)

func (*InputBinStream) ReadBytes

func (this *InputBinStream) ReadBytes() ([]byte, error)

func (*InputBinStream) ReadNBytes

func (this *InputBinStream) ReadNBytes(size int) ([]byte, error)

func (*InputBinStream) ReadString

func (this *InputBinStream) ReadString() (string, error)

func (*InputBinStream) ReadUI16

func (this *InputBinStream) ReadUI16() (uint16, error)

func (*InputBinStream) ReadUI32

func (this *InputBinStream) ReadUI32() (uint32, error)

func (*InputBinStream) ReadUI64

func (this *InputBinStream) ReadUI64() (uint64, error)

func (*InputBinStream) ReadUI8

func (this *InputBinStream) ReadUI8() (uint8, error)

type InputStream

type InputStream interface {
	Read([]byte) (int, error)
	ReadNBytes(int) ([]byte, error)
	ReadUI8() (uint8, error)
	ReadUI16() (uint16, error)
	ReadUI32() (uint32, error)
	ReadUI64() (uint64, error)
	ReadString() (string, error)
	ReadBytes() ([]byte, error)
}

type InputTxtStream

type InputTxtStream struct {
	// contains filtered or unexported fields
}

InputTxtStream will read data one item per line

func NewInputTxtStream

func NewInputTxtStream(r io.Reader) *InputTxtStream

func (*InputTxtStream) Read

func (this *InputTxtStream) Read(p []byte) (n int, err error)

func (*InputTxtStream) ReadBytes

func (this *InputTxtStream) ReadBytes() ([]byte, error)

func (*InputTxtStream) ReadNBytes

func (this *InputTxtStream) ReadNBytes(size int) ([]byte, error)

func (*InputTxtStream) ReadString

func (this *InputTxtStream) ReadString() (string, error)

func (*InputTxtStream) ReadUI16

func (this *InputTxtStream) ReadUI16() (uint16, error)

func (*InputTxtStream) ReadUI32

func (this *InputTxtStream) ReadUI32() (uint32, error)

func (*InputTxtStream) ReadUI64

func (this *InputTxtStream) ReadUI64() (uint64, error)

func (*InputTxtStream) ReadUI8

func (this *InputTxtStream) ReadUI8() (uint8, error)

type JsonCodec

type JsonCodec struct {
}

func (JsonCodec) Decode

func (this JsonCodec) Decode(payload []byte, p interface{}) error

func (JsonCodec) Encode

func (this JsonCodec) Encode(data interface{}) ([]byte, error)

type KeyValue

type KeyValue struct {
	Items []*KeyValueItem
}

func NewKeyValue

func NewKeyValue() *KeyValue

func (*KeyValue) Delete

func (kv *KeyValue) Delete(key interface{}) interface{}

func (*KeyValue) Find

func (kv *KeyValue) Find(fn func(item *KeyValueItem) bool) *KeyValueItem

func (*KeyValue) Get

func (kv *KeyValue) Get(key interface{}) interface{}

func (*KeyValue) Put

func (kv *KeyValue) Put(key interface{}, value interface{}) interface{}

type KeyValueItem

type KeyValueItem struct {
	Key   interface{}
	Value interface{}
}

type LBPolicy

type LBPolicy interface {
	// Borrow returns
	Borrow(string, func(string) Comparer) Comparer
	Return(string, Comparer, error)

	// Load is the current load for a service
	Load(string) Comparer
	// Quarantined returns if it is in quarantine
	Quarantined(string) bool
}

type LoadBalancer

type LoadBalancer interface {
	SetPolicyFactory(func() LBPolicy)
	// Add is called when a new wire is created
	Add(*Wire)
	// Remove is called when the wire is killed
	Remove(*Wire)
	// PickOne is called before the message is sent
	PickOne(Envelope, []*Wire) (*Wire, error)
	// PickAll is called before the message is sent
	PickAll(Envelope, []*Wire) ([]*Wire, error)
	// Use is called before the message is sent
	Use(*Wire, Envelope) Wirer
	// Done is called when we are done with one wire
	Done(Wirer, Envelope, error)
	// AllDone is called when ALL wires have been processed
	AllDone(Envelope, error) error
}

type Log

type Log struct{}

func (Log) CallerAt

func (lg Log) CallerAt(depth int) l.ILogger

func (Log) Debugf

func (lg Log) Debugf(format string, args ...interface{})

func (Log) Errorf

func (lg Log) Errorf(format string, args ...interface{})

func (Log) Fatalf

func (lg Log) Fatalf(format string, args ...interface{})

func (Log) Infof

func (lg Log) Infof(format string, args ...interface{})

func (Log) IsActive

func (lg Log) IsActive(level l.LogLevel) bool

func (Log) Tracef

func (lg Log) Tracef(format string, args ...interface{})

func (Log) Warnf

func (lg Log) Warnf(format string, args ...interface{})

type Looper

type Looper struct {
	// contains filtered or unexported fields
}

func NewLooper

func NewLooper(start int, max int) *Looper

func (*Looper) HasNext

func (this *Looper) HasNext() bool

func (*Looper) Last

func (this *Looper) Last() int

func (*Looper) Next

func (this *Looper) Next() int

type Middleware

type Middleware func(*Request)

type Msg

type Msg struct {
	OutputStream
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(factory StreamFactory) *Msg

type OutputBinStream

type OutputBinStream struct {
	// contains filtered or unexported fields
}

func NewOutputBinStream

func NewOutputBinStream(writer io.Writer) *OutputBinStream

func (*OutputBinStream) Write

func (this *OutputBinStream) Write(p []byte) (n int, err error)

func (*OutputBinStream) WriteBytes

func (this *OutputBinStream) WriteBytes(data []byte) error

func (*OutputBinStream) WriteString

func (this *OutputBinStream) WriteString(s string) error

func (*OutputBinStream) WriteUI16

func (this *OutputBinStream) WriteUI16(data uint16) error

func (*OutputBinStream) WriteUI32

func (this *OutputBinStream) WriteUI32(data uint32) error

func (*OutputBinStream) WriteUI64

func (this *OutputBinStream) WriteUI64(data uint64) error

func (*OutputBinStream) WriteUI8

func (this *OutputBinStream) WriteUI8(data uint8) error

type OutputStream

type OutputStream interface {
	Write([]byte) (n int, err error)
	WriteUI8(uint8) error
	WriteUI16(uint16) error
	WriteUI32(uint32) error
	WriteUI64(uint64) error
	WriteString(string) error
	WriteBytes([]byte) error
}

type OutputTxtStream

type OutputTxtStream struct {
	// contains filtered or unexported fields
}

func NewOutputTxtStream

func NewOutputTxtStream(writer io.Writer) *OutputTxtStream

func (*OutputTxtStream) Write

func (this *OutputTxtStream) Write(p []byte) (n int, err error)

func (*OutputTxtStream) WriteBytes

func (this *OutputTxtStream) WriteBytes(data []byte) error

func (*OutputTxtStream) WriteString

func (this *OutputTxtStream) WriteString(s string) error

func (*OutputTxtStream) WriteUI16

func (this *OutputTxtStream) WriteUI16(data uint16) error

func (*OutputTxtStream) WriteUI32

func (this *OutputTxtStream) WriteUI32(data uint32) error

func (*OutputTxtStream) WriteUI64

func (this *OutputTxtStream) WriteUI64(data uint64) error

func (*OutputTxtStream) WriteUI8

func (this *OutputTxtStream) WriteUI8(data uint8) error

type Request

type Request struct {
	Response
	// contains filtered or unexported fields
}

func NewRequest

func NewRequest(wire *Wire, c net.Conn, msg Envelope) *Request

func (*Request) DeferReply

func (this *Request) DeferReply()

DeferReply indicates that the reply won't be sent immediatly. The reply will eventually be sent by calling with gomsg.Request.ReplyAs().

func (*Request) Next

func (this *Request) Next()

Next calls the next handler

func (*Request) Payload

func (this *Request) Payload() []byte

func (*Request) Reply

func (this *Request) Reply() []byte

func (*Request) ReplyAs

func (this *Request) ReplyAs(kind EKind, reply []byte)

func (*Request) SendFault

func (this *Request) SendFault(err error)

sets a multi reply error (ERR_PARTIAL)

func (*Request) SendReply

func (this *Request) SendReply(reply []byte)

sets a multi reply (REQ_PARTIAL)

func (*Request) SetFault

func (this *Request) SetFault(err error)

sets a single reply error (ERR)

func (*Request) SetReply

func (this *Request) SetReply(payload []byte)

sets a single reply (REQ)

func (*Request) Terminate

func (this *Request) Terminate()

Terminate terminates a series of replies by sending an ACK message to the caller. It can also be used to reject a request, since this terminates the request without sending a reply payload.

func (*Request) Terminated

func (this *Request) Terminated() bool

func (*Request) Writer

func (this *Request) Writer() OutputStream

type Response

type Response struct {
	*Context
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse(wire *Wire, c net.Conn, kind EKind, seq uint32, payload []byte) Response

func (Response) EndMark

func (this Response) EndMark() bool

is this the end mark?

func (Response) Fault

func (this Response) Fault() error

func (Response) Last

func (this Response) Last() bool

is the last reply?

func (Response) Reader

func (this Response) Reader() InputStream

func (Response) Reply

func (this Response) Reply() []byte

type RoundRobinPolicy

type RoundRobinPolicy struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*RoundRobinPolicy) Borrow

func (this *RoundRobinPolicy) Borrow(topic string, initializer func(topic string) Comparer) Comparer

func (*RoundRobinPolicy) Load

func (this *RoundRobinPolicy) Load(topic string) Comparer

Load is the current load for a service

func (*RoundRobinPolicy) Quarantined

func (this *RoundRobinPolicy) Quarantined(topic string) bool

Quarantined returns if it is in quarantine

func (*RoundRobinPolicy) Return

func (this *RoundRobinPolicy) Return(topic string, comp Comparer, err error)

type SendEvent

type SendEvent struct {
	Kind    EKind
	Name    string
	Payload interface{}
	Handler interface{}
}

type SendListener

type SendListener func(event SendEvent)

type Sender

type Sender interface {
	Send(kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error
}

type Server

type Server struct {
	ClientServer
	*Wires
	// contains filtered or unexported fields
}

func NewServer

func NewServer() *Server

func (*Server) AddBindListeners

func (this *Server) AddBindListeners(listener BindListener) uint64

func (*Server) BindAddress

func (this *Server) BindAddress() net.Addr

BindAddress returns the listener address

func (*Server) BindPort

func (this *Server) BindPort() int

BindPort returns the listener port

func (*Server) Cancel

func (this *Server) Cancel(name string)

func (*Server) Destroy

func (this *Server) Destroy()

Destroy closes all connections and the listener

func (*Server) Handle

func (this *Server) Handle(name string, middlewares ...interface{})

Handle defines the function that will handle messages for a topic. name can have an '*' at the end, meaning that it will handle messages with the destiny name starting with the reply name (whitout the '*'). When handling request messages, the function handler can have a return value and/or an error. When handling publish/push messages, any return from the function handler is discarded. When handling Request/RequestAll messages, if a return is not specified, the caller will not receive a reply until you explicitly call gomsg.Request.ReplyAs()

func (*Server) HandleSerial

func (this *Server) HandleSerial(name string, middlewares ...interface{})

func (*Server) Listen

func (this *Server) Listen(service string) <-chan error

func (*Server) Listener

func (this *Server) Listener() net.Listener

func (*Server) Port

func (this *Server) Port() int

func (*Server) RemoveBindListener

func (this *Server) RemoveBindListener(idx uint64)

RemoveBindListener removes a previously added listener on send messages

func (*Server) Route

func (this *Server) Route(name string, timeout time.Duration, before func(x *Request) bool, after func(x *Response))

Messages are from one client and delivered to another client. The sender client does not receive his message. The handler execution is canceled. Arriving replies from endpoints are piped to the requesting wire

func (*Server) SetBufferSize

func (this *Server) SetBufferSize(size int)

func (*Server) SetRateLimiterFactory

func (this *Server) SetRateLimiterFactory(factory func() tk.Rate)

type ServiceUnavailableError

type ServiceUnavailableError error

type SimpleLB

type SimpleLB struct {
	sync.RWMutex
	Stickies
	// contains filtered or unexported fields
}

func NewSimpleLB

func NewSimpleLB() *SimpleLB

func (*SimpleLB) Add

func (lb *SimpleLB) Add(w *Wire)

Add adds wire to load balancer

func (*SimpleLB) AllDone

func (lb *SimpleLB) AllDone(msg Envelope, err error) error

func (*SimpleLB) Done

func (lb *SimpleLB) Done(wirer Wirer, msg Envelope, err error)

func (*SimpleLB) PickAll

func (lb *SimpleLB) PickAll(msg Envelope, wires []*Wire) ([]*Wire, error)

func (*SimpleLB) PickOne

func (lb *SimpleLB) PickOne(msg Envelope, wires []*Wire) (*Wire, error)

func (*SimpleLB) Remove

func (lb *SimpleLB) Remove(w *Wire)

Remove removes wire from load balancer

func (*SimpleLB) SetPolicyFactory

func (lb *SimpleLB) SetPolicyFactory(factory func() LBPolicy)

func (*SimpleLB) Use

func (lb *SimpleLB) Use(wire *Wire, msg Envelope) Wirer

type Stickies

type Stickies map[string]*Sticky

func (Stickies) IsSticky

func (st Stickies) IsSticky(name string, wires []*Wire) (*Wire, *Sticky)

func (Stickies) Stick

func (st Stickies) Stick(name string, duration time.Duration)

Stick forces the messages to go to the same wire (in a multi wire cenario) if the time between messages is smaller than the duration argument.

func (Stickies) Unstick

func (st Stickies) Unstick(w *Wire)

type Sticky

type Sticky struct {
	// contains filtered or unexported fields
}

type StreamFactory

type StreamFactory interface {
	Input(io.Reader) InputStream
	Output(io.Writer) OutputStream
}

type SystemError

type SystemError error

type Timeout

type Timeout struct {
	// contains filtered or unexported fields
}

Timeout is a timer over a generic element, that will call a function when a specified timeout occurs. It is possible to delay the timeout.

func NewTimeout

func NewTimeout(tick time.Duration, duration time.Duration, expired func(o interface{})) *Timeout

NewTimeout create a timeout

func (*Timeout) Delay

func (timeout *Timeout) Delay(o interface{})

Delay delays the timeout occurence. If this is the first time it is called, only from now the timeout will occur.

type TimeoutError

type TimeoutError error

type TopicEvent

type TopicEvent struct {
	Wire *Wire
	// Name is the topic name
	Name string
}

func (TopicEvent) String

func (e TopicEvent) String() string

type TopicListener

type TopicListener func(event TopicEvent)

type TxtStreamFactory

type TxtStreamFactory struct {
}

func (TxtStreamFactory) Input

func (this TxtStreamFactory) Input(r io.Reader) InputStream

func (TxtStreamFactory) Output

func (this TxtStreamFactory) Output(w io.Writer) OutputStream

type UnknownTopic

type UnknownTopic error

Specific error types are define so that we can use type assertion, if needed

type Wire

type Wire struct {

	// load balancer failure policy
	Policy LBPolicy
	// contains filtered or unexported fields
}

func NewWire

func NewWire(
	codec Codec,
	l log.ILogger,
	factory StreamFactory,
) *Wire

func (*Wire) AddDropTopicListener

func (this *Wire) AddDropTopicListener(listener TopicListener) uint64

func (*Wire) AddNewTopicListener

func (this *Wire) AddNewTopicListener(listener TopicListener) uint64

func (*Wire) AddSendListener

func (this *Wire) AddSendListener(listener SendListener) uint64

AddSendListener adds a listener on send messages (Publis/Push/RequestAll/Request)

func (*Wire) Conn

func (this *Wire) Conn() net.Conn

Conn gets the connection

func (*Wire) Destroy

func (this *Wire) Destroy()

func (*Wire) HasRemoteTopic

func (this *Wire) HasRemoteTopic(name string) bool

func (*Wire) Logger

func (this *Wire) Logger() log.ILogger

func (*Wire) RemoteMetadata

func (this *Wire) RemoteMetadata() map[string]interface{}

func (*Wire) RemoteUuid

func (this *Wire) RemoteUuid() tk.UUID

func (*Wire) RemoveDropTopicListener

func (this *Wire) RemoveDropTopicListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wire) RemoveNewTopicListener

func (this *Wire) RemoveNewTopicListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wire) RemoveSendListener

func (this *Wire) RemoveSendListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wire) Send

func (this *Wire) Send(msg Envelope) <-chan error

func (*Wire) SetBufferSize

func (this *Wire) SetBufferSize(size int)

func (*Wire) SetConn

func (this *Wire) SetConn(c net.Conn)

func (*Wire) SetLogger

func (this *Wire) SetLogger(l log.ILogger)

func (*Wire) SetRateLimiter

func (this *Wire) SetRateLimiter(limiter tk.Rate)

func (*Wire) SetTimeout

func (this *Wire) SetTimeout(timeout time.Duration)

type Wired

type Wired struct {
	// contains filtered or unexported fields
}

func (*Wired) Wire

func (this *Wired) Wire() *Wire

type Wirer

type Wirer interface {
	Wire() *Wire
}

type Wires

type Wires struct {
	// contains filtered or unexported fields
}

Wires manages a collection of connections as if they were one. Connections are grouped accordingly to its group id. A wire with an empty group id means all nodes are different.

Wires with the same non empty id are treated as mirrors of each other. This means that we only need to call one of them. The other nodes function as High Availability and load balancing nodes

func NewWires

func NewWires(codec Codec, l log.ILogger, factory StreamFactory) *Wires

NewWires creates a Wires structure

func (*Wires) Add

func (this *Wires) Add(wire *Wire)

func (*Wires) AddDropTopicListener

func (this *Wires) AddDropTopicListener(listener TopicListener) uint64

func (*Wires) AddNewTopicListener

func (this *Wires) AddNewTopicListener(listener TopicListener) uint64

func (*Wires) AddSendListener

func (this *Wires) AddSendListener(listener SendListener) uint64

AddSendListener adds a listener on send messages (Publis/Push/RequestAll/Request)

func (*Wires) Codec

func (this *Wires) Codec() Codec

func (*Wires) Destroy

func (this *Wires) Destroy()

func (*Wires) Find

func (this *Wires) Find(fn func(w *Wire) bool) *Wire

func (*Wires) Get

func (this *Wires) Get(conn net.Conn) *Wire

func (*Wires) GetAll

func (this *Wires) GetAll() []*Wire

func (*Wires) Kill

func (this *Wires) Kill(conn net.Conn)

func (*Wires) LoadBalancer

func (this *Wires) LoadBalancer() LoadBalancer

func (*Wires) Logger

func (this *Wires) Logger() log.ILogger

func (*Wires) Publish

func (this *Wires) Publish(name string, payload interface{}) <-chan error

Publish sends a message without any reply If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.

func (*Wires) PublishTimeout

func (this *Wires) PublishTimeout(name string, payload interface{}, timeout time.Duration) <-chan error

func (*Wires) Push

func (this *Wires) Push(name string, payload interface{}) <-chan error

Push sends a message and receive an acknowledge If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload.

func (*Wires) PushTimeout

func (this *Wires) PushTimeout(name string, payload interface{}, timeout time.Duration) <-chan error

func (*Wires) RemoveDropTopicListener

func (this *Wires) RemoveDropTopicListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wires) RemoveNewTopicListener

func (this *Wires) RemoveNewTopicListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wires) RemoveSendListener

func (this *Wires) RemoveSendListener(idx uint64)

RemoveSendListener removes a previously added listener on send messages

func (*Wires) Request

func (this *Wires) Request(name string, payload interface{}, handler interface{}) <-chan error

Request sends a message and waits for the reply If the type of the payload is *mybus.Msg it will ignore encoding and use the internal bytes as the payload. This is useful if we want to implement a broker.

func (*Wires) RequestAll

func (this *Wires) RequestAll(name string, payload interface{}, handler interface{}) <-chan error

RequestAll requests messages to all connected clients. If a client is not connected it is forever lost.

func (*Wires) RequestAllTimeout

func (this *Wires) RequestAllTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

RequestAll requests messages to all connected clients. If a client is not connected it is forever lost.

func (*Wires) RequestTimeout

func (this *Wires) RequestTimeout(name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

RequestTimeout sends a message and waits for the reply

func (*Wires) Send

func (this *Wires) Send(kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

Send is the generic function to send messages When the payload is of type []byte it passes the raw bytes without encoding. When the payload is of type mybus.Msg it passes the FRAMED raw bytes without encoding.

func (*Wires) SendSkip

func (this *Wires) SendSkip(skipWire *Wire, kind EKind, name string, payload interface{}, handler interface{}, timeout time.Duration) <-chan error

SendSkip is the generic function to send messages with the possibility of ignoring the sender

func (*Wires) SetBufferSize

func (this *Wires) SetBufferSize(size int)

func (*Wires) SetCodec

func (this *Wires) SetCodec(codec Codec) *Wires

func (*Wires) SetDefaultTimeout

func (this *Wires) SetDefaultTimeout(timeout time.Duration)

func (*Wires) SetLoadBalancer

func (this *Wires) SetLoadBalancer(loadBalancer LoadBalancer)

func (*Wires) SetLogger

func (this *Wires) SetLogger(l log.ILogger)

func (*Wires) SetRateLimiterFactory

func (this *Wires) SetRateLimiterFactory(factory func() tk.Rate)

func (*Wires) SetStreamFactory

func (this *Wires) SetStreamFactory(factory StreamFactory)

func (*Wires) Size

func (this *Wires) Size() int

func (*Wires) TopicCount

func (this *Wires) TopicCount(name string) int

TopicCount returns the number of clients providing the topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL