messenger

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2018 License: Apache-2.0 Imports: 28 Imported by: 0

README

####Benchmark of the messenger.

$ go test -v -run=Benckmark* -bench=. 
PASS
BenchmarkMessengerSendSmallMessage	   50000	     70568 ns/op
BenchmarkMessengerSendMediumMessage	   50000	     70265 ns/op
BenchmarkMessengerSendBigMessage	   50000	     72693 ns/op
BenchmarkMessengerSendLargeMessage	   50000	     72896 ns/op
BenchmarkMessengerSendMixedMessage	   50000	     72631 ns/op
BenchmarkMessengerSendRecvSmallMessage	   20000	     78409 ns/op
BenchmarkMessengerSendRecvMediumMessage	   20000	     80471 ns/op
BenchmarkMessengerSendRecvBigMessage	   20000	     82629 ns/op
BenchmarkMessengerSendRecvLargeMessage	   20000	     85987 ns/op
BenchmarkMessengerSendRecvMixedMessage	   20000	     83678 ns/op
ok  	github.com/mesos/mesos-go/messenger	115.135s

$ go test -v -run=Benckmark* -bench=. -cpu=4 -send-routines=4 2>/dev/null
PASS
BenchmarkMessengerSendSmallMessage-4	   50000	     35529 ns/op
BenchmarkMessengerSendMediumMessage-4	   50000	     35997 ns/op
BenchmarkMessengerSendBigMessage-4	   50000	     36871 ns/op
BenchmarkMessengerSendLargeMessage-4	   50000	     37310 ns/op
BenchmarkMessengerSendMixedMessage-4	   50000	     37419 ns/op
BenchmarkMessengerSendRecvSmallMessage-4	   50000	     39320 ns/op
BenchmarkMessengerSendRecvMediumMessage-4	   50000	     41990 ns/op
BenchmarkMessengerSendRecvBigMessage-4	   50000	     42157 ns/op
BenchmarkMessengerSendRecvLargeMessage-4	   50000	     45472 ns/op
BenchmarkMessengerSendRecvMixedMessage-4	   50000	     47393 ns/op
ok  	github.com/mesos/mesos-go/messenger	105.173s

####environment:

OS: Linux yifan-laptop 3.13.0-32-generic #57-Ubuntu SMP Tue Jul 15 03:51:08 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
CPU: Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz
MEM: 4G DDR3 1600MHz

Documentation

Overview

Package messenger includes a messenger and a transporter. The messenger provides interfaces to send a protobuf message through the underlying transporter. It also dispatches messages to installed handlers.

Index

Constants

View Source
const (
	DefaultReadTimeout  = 10 * time.Second
	DefaultWriteTimeout = 10 * time.Second
)

Variables

View Source
var (
	ReadTimeout  = DefaultReadTimeout
	WriteTimeout = DefaultWriteTimeout

	// HttpClient is used for sending messages to remote processes
	HttpClient = http.Client{
		Timeout: DefaultReadTimeout,
	}
)

Functions

func ClientTLSConfig

func ClientTLSConfig(config *tls.Config, handshakeTimeout time.Duration) httpOpt

func ServerTLSConfig

func ServerTLSConfig(config *tls.Config, nextProto map[string]func(*http.Server, *tls.Conn, http.Handler)) httpOpt

func UPIDBindingAddress

func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error)

UPIDBindingAddress determines the value of UPID.Host that will be used to build a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP address and the UPID.Host is set to that address and the bindingAddress is passed through to the Transport.

Types

type Decoder

type Decoder interface {
	Requests() <-chan *Request
	Err() <-chan error
	Cancel(bool)
}

func DecodeHTTP

func DecodeHTTP(w http.ResponseWriter, r *http.Request) Decoder

DecodeHTTP hijacks an HTTP server connection and generates mesos libprocess HTTP requests via the returned chan. Upon generation of an error in the error chan the decoder's internal goroutine will terminate. This func returns immediately. The caller should immediately *stop* using the ResponseWriter and Request that were passed as parameters; the decoder assumes full control of the HTTP transport.

type HTTPTransporter

type HTTPTransporter struct {
	// contains filtered or unexported fields
}

HTTPTransporter implements the interfaces of the Transporter.

func NewHTTPTransporter

func NewHTTPTransporter(upid upid.UPID, address net.IP, opts ...httpOpt) *HTTPTransporter

NewHTTPTransporter creates a new http transporter with an optional binding address.

func (*HTTPTransporter) Install

func (t *HTTPTransporter) Install(msgName string)

Install the request URI according to the message's name.

func (*HTTPTransporter) Recv

func (t *HTTPTransporter) Recv() (*Message, error)

Recv returns the message, one at a time.

func (*HTTPTransporter) Send

func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error)

Send sends the message to its specified upid.

func (*HTTPTransporter) Start

func (t *HTTPTransporter) Start() (upid.UPID, <-chan error)

Start starts the http transporter

func (*HTTPTransporter) Stop

func (t *HTTPTransporter) Stop(graceful bool) error

Stop stops the http transporter by closing the listener.

func (*HTTPTransporter) UPID

func (t *HTTPTransporter) UPID() upid.UPID

UPID returns the upid of the transporter.

type MesosMessenger

type MesosMessenger struct {
	// contains filtered or unexported fields
}

MesosMessenger is an implementation of the Messenger interface.

func New

func NewHttp

func NewHttp(upid upid.UPID, opts ...httpOpt) *MesosMessenger

NewMesosMessenger creates a new mesos messenger.

func NewHttpWithBindingAddress

func NewHttpWithBindingAddress(upid upid.UPID, address net.IP, opts ...httpOpt) *MesosMessenger

func (*MesosMessenger) Install

func (m *MesosMessenger) Install(handler MessageHandler, msg proto.Message) error

/ Install installs the handler with the given message.

func (*MesosMessenger) Route

func (m *MesosMessenger) Route(ctx context.Context, upid *upid.UPID, msg proto.Message) error

Route puts a message either in the incoming or outgoing queue. This method is useful for: 1) routing internal error to callback handlers 2) testing components without starting remote servers.

func (*MesosMessenger) Send

func (m *MesosMessenger) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error

Send puts a message into the outgoing queue, waiting to be sent. With buffered channels, this will not block under moderate throughput. When an error is generated, the error can be communicated by placing a message on the incoming queue to be handled upstream.

func (*MesosMessenger) Start

func (m *MesosMessenger) Start() error

Start starts the messenger; expects to be called once and only once.

func (*MesosMessenger) Stop

func (m *MesosMessenger) Stop() (err error)

Stop stops the messenger and clean up all the goroutines.

func (*MesosMessenger) UPID

func (m *MesosMessenger) UPID() upid.UPID

UPID returns the upid of the messenger.

type Message

type Message struct {
	UPID         *upid.UPID
	Name         string
	ProtoMessage proto.Message
	Bytes        []byte
}

Message defines the type that passes in the Messenger.

func (*Message) RequestURI

func (m *Message) RequestURI() string

RequestURI returns the request URI of the message.

type MessageHandler

type MessageHandler func(from *upid.UPID, pbMsg proto.Message)

MessageHandler is the callback of the message. When the callback is invoked, the sender's upid and the message is passed to the callback.

type Messenger

type Messenger interface {
	Install(handler MessageHandler, msg proto.Message) error
	Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error
	Route(ctx context.Context, from *upid.UPID, msg proto.Message) error
	Start() error
	Stop() error
	UPID() upid.UPID
}

Messenger defines the interfaces that should be implemented.

func ForHostname

func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16, publishedAddress net.IP) (Messenger, error)

ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to determine the binding-address used for both the UPID.Host and Transport binding address.

type Request

type Request struct {
	*http.Request
	// contains filtered or unexported fields
}

type Response

type Response struct {
	// contains filtered or unexported fields
}

type Transporter

type Transporter interface {
	//Send sends message to remote process. Must use context to determine
	//cancelled requests. Will stop sending when transport is stopped.
	Send(ctx context.Context, msg *Message) error

	//Rcvd receives and delegate message handling to installed handlers.
	//Will stop receiving when transport is stopped.
	Recv() (*Message, error)

	//Install mount an handler based on incoming message name.
	Install(messageName string)

	//Start starts the transporter and returns immediately. The error chan
	//is never nil.
	Start() (upid.UPID, <-chan error)

	//Stop kills the transporter.
	Stop(graceful bool) error

	//UPID returns the PID for transporter.
	UPID() upid.UPID
}

Transporter defines methods for communicating with remote processes.

Directories

Path Synopsis
Package testmessage is a generated protocol buffer package.
Package testmessage is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL