message

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2019 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package message. // TODO: summary statement

  • Server manages mutliple connections.
  • Client manages a single connection.
Example

This example demonstrates a client broker communicating with a server broker. A Message will be sent by the client to the server with its Word field set to "Hello". The server will convert "Hello" to "HELLO" using the Upper pipe and our handler will append " World" before sending the message back to the client. The server's Broker instance is configured to manipulate messages by extending it with instances of message.Piped. Two types of pipes are defined: Trace & Upper. Two Trace instances and one Upper instance are created. The two Trace instances surround the Upper instance, so messages flow from the client through Trace0, Upper, and Trace1 handlers, and outbound from the server through Trace1, Upper, and Trace0 senders. The expected output will be the message at each stage of the workflow. Every handler and sender will write the message to stdout with information about which processing step handled or sent the message. The order of output demonstrates message flow.

package main

import (
	"context"
	"fmt"
	"net"
	"net/http/httptest"
	"strings"
	"sync"
	"time"

	"github.com/go-goo/net/message"
)

// Message defines the message this example will send from the client to the
// server and back again.
type Message struct {
	Word string
}

// NewWord is a message.New type. It is used to register Message as a type
// managed by Broker.
func NewMessage() message.M {
	return &Message{""}
}

// MessageID returns the unique identifier for this type of message.
func (a *Message) MessageCode() uint32 {
	return 10
}

// Trace implements message.Piped by embedding message.Pipe and being a
// message.Handler and message.Sender. It writes received messages to stdout
// when both handling and sending messages.
type Trace struct {
	message.Pipe        // embed Pipe to implement part of message.Piped
	Name         string // identify this trace instance when writing to stdout
}

// Handle implements message.Handler; required to implement message.Piped.
// After writing to stdout, it calls the next handler in the pipeline.
func (a *Trace) Handle(c *message.Conn, m interface{}) error {
	fmt.Printf("%s Trace.Handle: %#v\n", a.Name, m)
	return a.Pipe.Handle(c, m)
}

// Send implements message.Sender; required to implement message.Piped.
// After writing to stdout, it calls the next sender in the pipeline.
func (a *Trace) Send(ctx context.Context, m interface{}, conns ...*message.Conn) error {
	fmt.Printf("%s   Trace.Send: %#v\n", a.Name, m)
	return a.Pipe.Send(ctx, m, conns...)
}

// Upper implements message.Piped by embedding message.Pipe and being a
// message.Handler and message.Sender. It writes received messages to stdout
// when both handling and sending messages. And comverts the message's Word
// field value to uppercase when the received message is a *Message.
type Upper struct {
	message.Pipe        // embed Pipe to implement part of message.Piped
	Name         string // identify this Upper instance when writing to stdout
}

// Handle implements message.Handler; required to implement message.Piped.
// After writing to stdout & converting Message.Word to uppercase, it calls
// the next handler in the pipeline.
func (a *Upper) Handle(c *message.Conn, m interface{}) error {
	fmt.Printf("%s Upper.Handle: %#v\n", a.Name, m)
	if w, ok := m.(*Message); ok {
		w.Word = strings.ToUpper(w.Word)
		return a.Pipe.Handle(c, w)
	}
	return a.Pipe.Handle(c, m)
}

// Send implements message.Sender; required to implement message.Piped.
// After writing to stdout, it calls the next sender in the pipeline.
func (a *Upper) Send(ctx context.Context, m interface{}, conns ...*message.Conn) error {
	fmt.Printf("%s   Upper.Send: %#v\n", a.Name, m)
	return a.Pipe.Send(ctx, m, conns...)
}

// This example demonstrates a client broker communicating with a server broker.
// A Message will be sent by the client to the server with its Word field set to
// "Hello". The server will convert "Hello" to "HELLO" using the Upper pipe
// and our handler will append " World" before sending the message back to the
// client. The server's Broker instance is configured to manipulate messages by
// extending it with instances of message.Piped. Two types of pipes are defined:
// Trace & Upper. Two Trace instances and one Upper instance are created. The
// two Trace instances surround the Upper instance, so messages flow from the
// client through Trace0, Upper, and Trace1 handlers, and outbound from the
// server through Trace1, Upper, and Trace0 senders. The expected output will
// be the message at each stage of the workflow. Every handler and sender will
// write the message to stdout with information about which processing step
// handled or sent the message. The order of output demonstrates message flow.
func main() {
	// use wg to track when the server and client receive the one message we send.
	// not doing this would cause the example to exit before the message is
	// returned back to the client.
	var wg sync.WaitGroup
	wg.Add(2)

	// client broker will send a Message and handle the Message returned by the server.
	client := message.Broker{
		Messages: []message.New{NewMessage},
		Handler: message.HandlerFunc(func(_ *message.Conn, m interface{}) error {
			fmt.Printf("Client % 20s: %#v\n", "Handle", m)
			wg.Done()
			return nil
		}),
	}
	defer client.Close()

	// server broker will handle the Message send by the client and send a
	// manipulated message back to the client. Messages will pass through three
	// on their way in an out of the server. The final handler, defined in the
	// broker's Handler field, will append " World" to the message before sending
	// back through the three pipes and back to the client.
	server := message.Broker{
		Messages: []message.New{NewMessage},
		Pipes: []message.Piped{
			&Trace{Name: "Server Pipe[0]"},
			&Upper{Name: "Server Pipe[1]"},
			&Trace{Name: "Server Pipe[2]"},
		},
		Handler: message.HandlerFunc(func(c *message.Conn, m interface{}) (err error) {
			fmt.Printf("Server % 20s: %#v\n", "Handle", m)
			if m, _ := m.(*Message); m != nil {
				m.Word += " World"
				fmt.Printf("Server % 20s: %#v\n", "Send", m)
				// send using the Broker instance so the message goes through all
				// pipes. we could use c.Send to bypass pipes and send directly
				// to the client on the other end of Conn.
				if err = c.Broker().Send(context.TODO(), m); err != nil {
					fmt.Println(err)
				}
			}
			wg.Done()
			return
		}),
	}
	defer server.Close()

	// brokers implement http.Handler. start the server.
	httpServer := httptest.NewServer(&server)
	defer httpServer.Close()

	// create connection to the server for the client. then start the client
	// broker, which sends an Upgrade HTTP request to the server at path "/".
	// The client includes "application/json" in the request's Accept header
	// which instructs the server to encode and decode messages using JSON.
	if conn, err := net.DialTimeout("tcp", httpServer.Listener.Addr().String(), time.Second); err != nil {
		fmt.Println(err)
		return
	} else if _, err = client.Start(conn, "/", "application/json"); err != nil {
		fmt.Println("Client Start:", err)
		return
	}

	// create hello message and send to the server.
	hello := &Message{"Hello"}
	fmt.Printf("Client % 20s: %#v\n", "Send", hello)
	if err := client.Send(context.TODO(), hello); err != nil {
		fmt.Println(err)
		return
	}

	// wait for the message to flow through the server and back to the client.
	wg.Wait()

}
Output:

Client                 Send: &message_test.Message{Word:"Hello"}
Server Pipe[0] Trace.Handle: &message_test.Message{Word:"Hello"}
Server Pipe[1] Upper.Handle: &message_test.Message{Word:"Hello"}
Server Pipe[2] Trace.Handle: &message_test.Message{Word:"HELLO"}
Server               Handle: &message_test.Message{Word:"HELLO"}
Server                 Send: &message_test.Message{Word:"HELLO World"}
Server Pipe[2]   Trace.Send: &message_test.Message{Word:"HELLO World"}
Server Pipe[1]   Upper.Send: &message_test.Message{Word:"HELLO World"}
Server Pipe[0]   Trace.Send: &message_test.Message{Word:"HELLO World"}
Client               Handle: &message_test.Message{Word:"HELLO World"}

Index

Examples

Constants

View Source
const (
	// SendBufferSize is the default size of bytes.Buffer instances used
	// when constructing messages before sending. Override this value by
	// setting Buffer.SendBufferSize.
	SendBufferSize = 1024

	// MaxMessageSize is the default for the largest message the broker will
	// handle or send. Connections are closed when this size is exceeded.
	// Override this value by setting Broker.MaxMessageSize.
	MaxMessageSize = 1e6

	// MaxConenctions defines the maximum number of connections the broker
	// will accept from remote clients. This value is not evaluated for
	// broker clients when calling Start. Override this value by setting
	// Broker.MaxConnections.
	MaxConnections = 100
)
View Source
const Protocol = "GHMS/0.1"

Protocol defines this message protocol. GHMS stands for go-goo HTTP message service. It is passed by clients to servers in the Upgrade HTTP request header. Server brokers will not handle other protocols.

Variables

View Source
var (
	// ConnectionClosed is returned by send methods when the broker is aleady closed.
	ConnectionClosed = errors.New("Connection Closed")

	// DuplicateMessageCode occurs when multiple messages with the same code
	// are defined in Broker.Messages. The error is set when the broker is
	// initializing, which is when the first connection is assigned for management
	// by the broker. The error prevents the broker from starting.
	DuplicateMessageCode = errors.New("Duplicate Message Code")

	// InvalidConfig occurs when the broker has not been configured correctly.
	// The error is set when the broker is initializing, which is when the first
	// connection is assigned for management by the broker. The error prevents
	// the broker from starting.
	InvalidConfig = errors.New("Invalid Configuration")

	// InvalidEncoding occurs when the broker starts managing a client connection
	// and the encoding passed as a parameter to Start is not supported by
	// the media package. Typically this means the required encoding package
	// was not properly imported/registered.
	InvalidEncoding = errors.New("Invalid Encoding")

	// InvalidMessageCode occurs when the broker is asked to handle or send a
	// message that either isn't a M or it's code isn't registered with the
	// broker (not defined in Broker.Messages).
	InvalidMessageCode = errors.New("Invalid Message Code")

	// NoConnections is returned by Send when there are no registerd connections
	// to send a message to.
	NoConnections = errors.New("No Connections")
)
View Source
var HeartbeatMessageCode uint32 = 1

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// Handler receives messages from remote brokers. It is always the last
	// handler when the broker is configured to use pipes. Handlers run in
	// the connection's read goroutine, so long running handlers will block reads.
	// Performance is maintained when handlers do little work.
	Handler Handler

	// MaxConenctions defines the maximum number of connections the broker
	// will accept from remote clients. Only server-side connections are
	// limited (http server). This value is not evaluated when calling
	// Start (clients).
	// See constant with the same name for the default value.
	MaxConnections int

	// MaxMessageSize defines largest message this broker will handle or send.
	// Connections are closed when this size is exceeded.
	// See constant with the same name for the default value.
	MaxMessageSize uint32

	// Messages defines the set of messages this broker will send and receive.
	// Both client and server brokers must define the same set of messages
	// in order to communicate using them. Messages are defined using constructor
	// functions that return a new M instance. This allows the broker to
	// encode and decode messages on behalf of users restricting user
	// implementations to simple type switches.
	Messages []New

	// Pattern defines the message distribution pattern to use. Publish is used
	// by default, which means all connections managed or those passed to the
	// send method will receive the message.
	Pattern Pattern

	// Pipes define the pipeline of message handlers and senders. Piped
	// implementations are the broker's middleware. The order of the piped
	// slice defines the order in which messages are passed through handlers
	// and senders. The zero-indexed pipe handles messages first and the
	// nth-indexed pipe handles messages last in the pipeline. The message
	// is passed to Broker.Handler after the nth-indexed pipe. Sending messages
	// works in the opposite direction, from nth to zero.
	//
	// E.g. Pipes: []Piped{A, B, C} means handling message flows through A:
	// A->B->C->Broker.Handler; and sending flows through C: C->B->A.
	//
	// Simple message passing implementations don't need to define pipes.
	// They can simply define a broker.Handler. See the topic subpackage
	// for an example protocol implementation using pipes.
	Pipes []Piped

	// Timeout defines the minimum amount of time this broker will wait
	// before closing the connection when reading or writing messages.
	// See constant with the same name for the default value.
	Timeout time.Duration

	// SendBufferSize defines the size of the bytes.Buffer used when
	// constructing messages before sending. Buffers grow automatically,
	// so this simply defines the allocation at initialization.
	// See constant with the same name for the default value.
	SendBufferSize int
	// contains filtered or unexported fields
}

Broker manages connections and abstracts sending and receiving messages to remote brokers. Configure the broker by setting exported fields before calling any methods or using it as an HTTP handler, because while connection management and methods are concurrency safe, fields are not.

func (*Broker) Close

func (a *Broker) Close() error

Close all connections. Return an error if any of the connections error on close. The returned error contains non-nil errors from all connections.

func (*Broker) Flush added in v0.0.4

func (a *Broker) Flush() error

Flush buffered messages in all connections.

func (*Broker) Len

func (a *Broker) Len() int

Len returns the number of open connections.

func (*Broker) Send

func (a *Broker) Send(ctx context.Context, m interface{}, conns ...*Conn) error

Send message m to connections without encoding the same format twice. Connections are passed through broker.Pattern before sending. All connections are passed through pattern when len(ss)==0. Only conns are passed through pattern otherwise. NoConnections is returned when no conns are passed and the broker has no registered connections to send the message to.

func (*Broker) ServeHTTP

func (a *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler. It accepts connection upgrade requests and starts new server-side connections. Upgrade requests are sent by clients by calling broker.Start. ServeHTTP receives the request, validates, hijacks the connection, and begins managing the connection by starting a new goroutine to read from the connection.

func (*Broker) Start

func (a *Broker) Start(conn net.Conn, path string, encoding string) (*Conn, error)

Start managing a new connection. This is essentially used by client-side brokers. An HTTP request is sent to the remote broker requesting to upgrade the connection. The path parameter defines the request's HTTP path. And the encoding parameter defines the encoding for the client and server to use when sending and receiving messages. Both the client and server must have the encoding registered in the media package. An error is returned for configuration errors and HTTP request/response errors. Upon success, a new goroutine is created to read from the connection.

func (*Broker) Stats added in v0.0.2

func (a *Broker) Stats() (sent, received uint64)

Stats returns the total number of messages sent and received on all connections. The counts will wraparound. Safe for conncurrent access.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn is a single connection managed by a broker. It is passed to handlers allowing them to collect and send messages to specific groups. Passing a Conn to broker.Send means the connection will be passed through the configured pipes and pattern, meaning it may or may not be sent a message depending on how the pipes and pattern are configured. Users can send directly to a single connection by using Conn.Send directly.

func (*Conn) Broker

func (a *Conn) Broker() *Broker

Broker returns the broker instance managing this connection.

func (*Conn) Close

func (a *Conn) Close() error

Close connection. An error is returned when an error is returned from closing the underlying net.Conn. Nil is returned when there were no errors or when the error is the expected io.EOF.

func (*Conn) Context

func (a *Conn) Context() context.Context

Context returns a context that is canceled when this connection closes.

func (*Conn) Flush added in v0.0.4

func (a *Conn) Flush() error

func (*Conn) Send

func (a *Conn) Send(ctx context.Context, message interface{}) error

Send message to connection. The message is encoded and buffered before sending. The write is flushed after writing the buffer to the connection. ConnectionClosed is retuned when the connection has already been closed. An error is also returned the write fails or the encoded message exceeds the MaxMessageSize configured on the broker (excluding the internal header).

A write deadline is set before writing the message. It's value is either the context deadline if it is set or the broker's Timeout.

func (*Conn) Stats added in v0.0.2

func (a *Conn) Stats() (sent, received uint64)

Stats returns the number of messages send and received. The counts will wraparound. Safe for conncurrent access.

type Encoding

type Encoding interface {
	// MessageEncoding returns the encoding media type to use when encoding
	// and decoding this message. It overrides the client's encoding choice and
	// is therefore meant for use in pipe implementations, but other uses may
	// exist.
	MessageEncoding() (encoding string)
}

Encoding defines a message managed by user-defined pipes. Implementing instructs the broker to use encoding rather than the encoding defined for the connection by the client.

Pipe messages are typically only known to the implementing pipe and are not sent beyond the pipe's send and handler methods. This design means pipes do not have to implement all the unknown encodings users may need. Instead, this interface allows pipes to communicate between remote and client independently of user-defined encodings. An example of this is in the topic package. Its subscription message is encoded using go-goo/encoding/binary. Topic imports go-goo/encoding/media/binary so it is registered in the go-goo/encoding media package, which is then used by the broker when encoding/decoding.

type Handler

type Handler interface {
	Handle(*Conn, interface{}) error
}

Handler is a type implemeting a Handle message. Handlers process messages read from the connection. The passed Conn is the connection where the message was read. Handlers run in the connection's read goroutine.

type HandlerFunc

type HandlerFunc func(*Conn, interface{}) error

HandleFunc implements handler for a handle function.

func (HandlerFunc) Handle

func (a HandlerFunc) Handle(c *Conn, m interface{}) error

Handle calls HandlerFunc a.

type M

type M interface {
	MessageCode() uint32
}

M is a message. All message sent by a broker must implement M. Each implementing type's MessageCode method must return a unique code when registered with broker.

type New

type New func() M

New is a function returning an allocated type implementing M. Each message implementing M must also define a New function to be assigned to the broker's Message field. The broker calls this function when decoding received messages and passed the decoded M through the configured handlers. New functions are also an opportunity for the user to implement message pooling.

type Pattern

type Pattern func([]*Conn) []*Conn

Pattern is a message distribution pattern. Implemeting patterns receive a set of all connections managed by the broker and return a set of connections to send the message to.

func Publish

func Publish() Pattern

Publish defines a pubsub messaging pattern, which is essentially sending a single message to all subscriptions/connections in a broadcast pattern.

func Queue

func Queue() Pattern

Queue defined a message worker queue pattern. Messages are send to a single connection in a round-robin, only-once, pattern.

type Pipe

type Pipe struct {
	Handler
	Sender
}

Pipe should be embedded in strucs implementing Piped. Once started, the broker will set the Handler to the next pipe defined in the broker's Pipes field, and Sender to the previous pipe. When handling messages, pipes should pass values to the next/previous pipe assigned in the struct.

type Piped

type Piped interface {
	Handler
	Sender
	// contains filtered or unexported methods
}

Piped defines an interface whose implementing types embed Pipe and also implemet Handler and Sender. Pipe defined an unexported identify method that this interface requires, so simply embedding Pipe fulfils the Pipe requirement for this interface.

When the broker receives messages, it passes the message to the first pipe defined in the broker's Pipes field. The pipe handles the message and optionally calls the next pipe in the pipeline by calling its embedded Pipe.Handle method. Most pipes always pass the message to the next pipe. Once all pipes have handles the message, the last pipe is configured to call the handler defined in the broker's Handler field.

Sending messages work in the opposite direction. The broker's Send message will first call the last pipe defined in the broker's Pipes field. Each pipe's, Pipe.Send method is the Send on the previous pipe defines in Pipes. The first pipe's defined Pipe.Send method is an internal method on the broker responsible for encoding the message and writing the appropriate connections. / Pipes are not used when sending directly to a connection using Conn.

type Sender

type Sender interface {
	Send(context.Context, interface{}, ...*Conn) error
}

Sender defines a type with a Send method. Broker, Conn, and Piped types all implement the Sender interface.

type SenderFunc

type SenderFunc func(context.Context, interface{}, ...*Conn) error

SenderFunc is a convenience type implementing the Sender interface. The assigned func is called by the SenderFunc's Send method.

func (SenderFunc) Send

func (a SenderFunc) Send(ctx context.Context, m interface{}, conns ...*Conn) error

Send calls SendeFunc a.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL