grav

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 11 Imported by: 6

Documentation

Index

Constants

View Source
const (
	MsgTypeDefault string = "grav.default"
)

MsgTypeDefault and other represent message consts

Variables

View Source
var (
	ErrTransportNotConfigured = errors.New("transport plugin not configured")
	ErrTunnelNotEstablished   = errors.New("tunnel cannot be established")
)

ErrTransportNotConfigured represent package-level vars

View Source
var (
	ErrConnectionClosed    = errors.New("connection was closed")
	ErrNodeUUIDMismatch    = errors.New("handshake UUID did not match node UUID")
	ErrBelongsToMismatch   = errors.New("new connection doesn't belongTo the same group or *")
	ErrNotBridgeTransport  = errors.New("transport is not a bridge")
	ErrBridgeOnlyTransport = errors.New("transport only supports bridge connection")
	ErrNodeWithdrawn       = errors.New("node has withdrawn from the mesh")
)

ErrConnectionClosed and others are transport and connection related errors

View Source
var (
	TransportTypeMesh   = TransportType("transport.mesh")
	TransportTypeBridge = TransportType("transport.bridge")
)
View Source
var ErrMsgNotWanted = errors.New("message not wanted")

ErrMsgNotWanted is used by WaitOn to determine if the current message is what's being waited on

View Source
var ErrNoReceipt = errors.New("message receipt is nil")

ErrNoReceipt is returned when a method is called on a nil ticket

View Source
var ErrWaitTimeout = errors.New("waited past timeout")

ErrWaitTimeout is returned if a timeout is exceeded

Functions

This section is empty.

Types

type ConnectFunc added in v0.0.4

type ConnectFunc func(Connection)

ConnectFunc is a function that provides a new Connection

type Connection added in v0.3.0

type Connection interface {
	// Called when the connection handshake is complete and the connection can actively start exchanging messages
	// The Connection is responsible for sending a 'withdraw' message when the provided context is canceled
	Start(recvFunc ReceiveFunc, signaler *WithdrawSignaler)
	// Send a message from the local instance to the connected node
	Send(msg Message) error
	// CanReplace returns true if the connection can be replaced (i.e. is not a persistent connection like a websocket)
	CanReplace() bool
	// Initiate a handshake for an outgoing connection and return the remote Ack
	DoOutgoingHandshake(handshake *TransportHandshake) (*TransportHandshakeAck, error)
	// Wait for an incoming handshake and return the provided Ack to the remote connection
	DoIncomingHandshake(HandshakeCallback) (*TransportHandshake, error)
	// Close requests that the Connection close itself
	Close() error
}

Connection represents a connection to another node

type Discovery added in v0.1.0

type Discovery interface {
	// Start is called to start the Discovery plugin
	Start(*DiscoveryOpts, DiscoveryFunc) error
	Stop() error
}

Discovery represents a discovery plugin

type DiscoveryFunc added in v0.3.0

type DiscoveryFunc func(endpoint string, uuid string)

DiscoveryFunc is a function that allows a plugin to report a newly discovered node

type DiscoveryOpts added in v0.1.0

type DiscoveryOpts struct {
	NodeUUID      string
	TransportPort string
	TransportURI  string
	Logger        *vlog.Logger
	Custom        interface{}
}

DiscoveryOpts is a set of options for transports

type FindFunc added in v0.3.0

type FindFunc func(uuid string) (Connection, bool)

FindFunc allows a Transport to query Grav for an active connection for the given UUID

type Grav

type Grav struct {
	NodeUUID  string
	BelongsTo string
	Interests []string
	// contains filtered or unexported fields
}

Grav represents a Grav message bus instance

func New

func New(opts ...OptionsModifier) *Grav

New creates a new Grav with the provided options

func (*Grav) Connect

func (g *Grav) Connect() *Pod

Connect creates a new connection (pod) to the bus

func (*Grav) ConnectBridgeTopic added in v0.4.0

func (g *Grav) ConnectBridgeTopic(topic string) error

ConnectBridgeTopic connects the Grav instance to a particular topic on the connected bridge

func (*Grav) ConnectEndpoint added in v0.0.4

func (g *Grav) ConnectEndpoint(endpoint string) error

ConnectEndpoint uses the configured transport to connect the bus to an external endpoint

func (*Grav) ConnectWithReplay added in v0.0.11

func (g *Grav) ConnectWithReplay() *Pod

ConnectWithReplay creates a new connection (pod) to the bus and replays recent messages when the pod sets its onFunc

func (*Grav) Stop added in v0.5.0

func (g *Grav) Stop() error

Stop stops Grav's meshing entirely, causing all connections to peers to close. It is reccomended to call `Withdraw` first to give peers notice and stop recieving messages

func (*Grav) Tunnel added in v0.5.0

func (g *Grav) Tunnel(capability string, msg Message) error

Tunnel sends a message to a specific connection that has advertised it has the required capability. This bypasses the main Grav bus, which is why it isn't a method on Pod. Messages are load balanced between the connections that advertise the capability in question.

func (*Grav) Withdraw added in v0.5.0

func (g *Grav) Withdraw() error

Withdraw cancels discovery, sends withdraw messages to all peers, and returns when all peers have acknowledged the withdraw

type HandshakeCallback added in v0.5.0

type HandshakeCallback func(*TransportHandshake) *TransportHandshakeAck

HandshakeCallback allows the hub to determine if a connection should be acceted

type Message added in v0.0.3

type Message interface {
	// Unique ID for this message
	UUID() string
	// ID of the parent event or request, such as HTTP request
	ParentID() string
	// The UUID of the message being replied to, if any
	ReplyTo() string
	// Allow setting a message UUID that this message is a response to
	SetReplyTo(string)
	// Type of message (application-specific)
	Type() string
	// Time the message was sent
	Timestamp() time.Time
	// Raw data of message
	Data() []byte
	// Unmarshal the message's data into a struct
	UnmarshalData(interface{}) error
	// Marshal the message itself to encoded bytes (JSON or otherwise)
	Marshal() ([]byte, error)
	// Unmarshal encoded Message into object
	Unmarshal([]byte) error
}

Message represents a message

func MsgFromBytes added in v0.0.3

func MsgFromBytes(bytes []byte) (Message, error)

MsgFromBytes returns a default _message that has been unmarshalled from bytes. Should only be used if the default _message type is being used.

func MsgFromRequest added in v0.0.3

func MsgFromRequest(r *http.Request) (Message, error)

MsgFromRequest extracts an encoded Message from an HTTP request

func NewMsg added in v0.0.3

func NewMsg(msgType string, data []byte) Message

NewMsg creates a new Message with the built-in `_message` type

func NewMsgReplyTo added in v0.1.0

func NewMsgReplyTo(ticket MsgReceipt, msgType string, data []byte) Message

NewMsgReplyTo creates a new message in response to a previous message

func NewMsgWithParentID added in v0.0.3

func NewMsgWithParentID(msgType, parentID string, data []byte) Message

NewMsgWithParentID returns a new message with the provided parent ID

type MsgBuffer added in v0.3.3

type MsgBuffer struct {
	// contains filtered or unexported fields
}

MsgBuffer is a buffer of messages with a particular size limit. Oldest messages are automatically evicted as new ones are added past said limit. Push(), Next(), and Iter() are thread-safe.

func NewMsgBuffer added in v0.3.3

func NewMsgBuffer(limit int) *MsgBuffer

func (*MsgBuffer) Iter added in v0.3.3

func (m *MsgBuffer) Iter(msgFunc MsgFunc)

Iter calls msgFunc once per message in the buffer

func (*MsgBuffer) Next added in v0.5.0

func (m *MsgBuffer) Next() Message

Next returns the next message in the buffer, continually looping over the buffer (all callers share ordering)

func (*MsgBuffer) Push added in v0.3.3

func (m *MsgBuffer) Push(msg Message)

Push pushes a new message onto the end of the buffer and evicts the oldest, if needed (based on limit)

type MsgChan added in v0.0.3

type MsgChan chan Message

MsgChan is a channel that accepts a message

type MsgFunc added in v0.0.3

type MsgFunc func(Message) error

MsgFunc is a callback function that accepts a message and returns an error

type MsgReceipt added in v0.2.0

type MsgReceipt struct {
	UUID string
	// contains filtered or unexported fields
}

MsgReceipt represents a "ticket" that references a message that was sent with the hopes of getting a response The embedded pod is a pointer to the pod that sent the original message, and therefore any ticket methods used will replace the OnFunc of the pod.

func (*MsgReceipt) OnReply added in v0.2.0

func (m *MsgReceipt) OnReply(mfn MsgFunc) error

OnReply will set the pod's OnFunc to the provided MsgFunc and set it to run asynchronously when a reply is received onFunc errors are discarded.

func (*MsgReceipt) WaitOn added in v0.2.0

func (m *MsgReceipt) WaitOn(onFunc MsgFunc) error

WaitOn will block until a response to the message is recieved and passes it to the provided onFunc. onFunc errors are propogated to the caller.

func (*MsgReceipt) WaitUntil added in v0.2.0

func (m *MsgReceipt) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error

WaitUntil will block until a response to the message is recieved and passes it to the provided onFunc. ErrWaitTimeout is returned if the timeout elapses, onFunc errors are propogated to the caller.

type Options added in v0.1.0

type Options struct {
	Logger    *vlog.Logger
	Transport Transport
	Discovery Discovery
	Port      string
	URI       string
	BelongsTo string
	Interests []string
}

Options represent Grav options

type OptionsModifier added in v0.1.0

type OptionsModifier func(*Options)

OptionsModifier is function that modifies an option

func UseBelongsTo added in v0.5.0

func UseBelongsTo(belongsTo string) OptionsModifier

UseBelongsTo sets the 'BelongsTo' property for the Grav instance

func UseDiscovery added in v0.1.0

func UseDiscovery(discovery Discovery) OptionsModifier

UseDiscovery sets the discovery plugin to be used

func UseEndpoint added in v0.3.0

func UseEndpoint(port, uri string) OptionsModifier

UseEndpoint sets the endpoint settings for the instance to broadcast for discovery Pass empty strings for either if you would like to keep the defaults (8080 and /meta/message)

func UseInterests added in v0.5.0

func UseInterests(interests ...string) OptionsModifier

UseInterests sets the 'Interests' property for the Grav instance

func UseLogger added in v0.1.0

func UseLogger(logger *vlog.Logger) OptionsModifier

UseLogger allows a custom logger to be used

func UseTransport added in v0.1.0

func UseTransport(transport Transport) OptionsModifier

UseTransport sets the transport plugin to be used.

type Pod added in v0.0.3

type Pod struct {
	// contains filtered or unexported fields
}

Pod is a connection to Grav Pods are bi-directional. Messages can be sent to them from the bus, and they can be used to send messages to the bus. Pods are meant to be extremely lightweight with no persistence they are meant to quickly and immediately route a message between its owner and the Bus. The Bus is responsible for any "smarts". Messages coming from the bus are filtered using the pod's messageFilter, which is configurable by the caller.

func (*Pod) Disconnect added in v0.1.1

func (p *Pod) Disconnect()

Disconnect indicates to the bus that this pod is no longer needed and should be disconnected. Sending will immediately become unavailable, and the pod will soon stop recieving messages.

func (Pod) FilterType added in v0.0.3

func (mf Pod) FilterType(msgType string, allow bool)

func (Pod) FilterUUID added in v0.0.3

func (mf Pod) FilterUUID(uuid string, allow bool)

FilterUUID likely should not be used in normal cases, it adds a message UUID to the pod's filter.

func (*Pod) On added in v0.0.3

func (p *Pod) On(onFunc MsgFunc)

On sets the function to be called whenever this pod recieves a message from the bus. If nil is passed, the pod will ignore all messages. Calling On multiple times causes the function to be overwritten. To recieve using two different functions, create two pods. Errors returned from the onFunc are interpreted as problems handling messages. Too many errors will result in the pod being disconnected. Failed messages will be replayed when messages begin to succeed. Returning an error is inadvisable unless there is a real problem handling messages.

func (*Pod) OnType added in v0.0.3

func (p *Pod) OnType(msgType string, onFunc MsgFunc)

OnType sets the function to be called whenever this pod recieves a message and sets the pod's filter to only receive certain message types. The same rules as `On` about error handling apply to OnType.

func (*Pod) ReplyTo added in v0.1.0

func (p *Pod) ReplyTo(inReplyTo Message, msg Message) *MsgReceipt

ReplyTo sends a response to a message. The reply message's ticket is returned.

func (*Pod) Send added in v0.0.3

func (p *Pod) Send(msg Message) *MsgReceipt

Send emits a message to be routed to the bus If the returned ticket is nil, it means the pod was unable to send It is safe to call methods on a nil ticket, they will error with ErrNoTicket This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...)

func (*Pod) WaitOn added in v0.0.3

func (p *Pod) WaitOn(onFunc MsgFunc) error

WaitOn takes a function to be called whenever this pod recieves a message and blocks until that function returns something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message. When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitOn will return and set the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller. WaitOn will block forever if the desired message is never found. Use WaitUntil if a timeout is desired.

func (*Pod) WaitUntil added in v0.2.0

func (p *Pod) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error

WaitUntil takes a function to be called whenever this pod recieves a message and blocks until that function returns something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message. When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitUntil will return and set the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller. A timeout can be provided. If the timeout is non-nil and greater than 0, ErrWaitTimeout is returned if the time is exceeded.

type ReceiveFunc added in v0.3.0

type ReceiveFunc func(msg Message)

ReceiveFunc is a function that allows passing along a received message

type TimeoutFunc added in v0.2.0

type TimeoutFunc func() chan time.Time

TimeoutFunc is a function that takes a value (a number of seconds) and returns a channel that fires after that given amount of time

func TO added in v0.2.0

func TO(seconds int) TimeoutFunc

TO is a shorthand for Timeout

func Timeout added in v0.2.0

func Timeout(seconds int) TimeoutFunc

Timeout returns a function that returns a channel that fires after the provided number of seconds have elapsed if the value passed is less than or equal to 0, the timeout will never fire

type TopicConnection added in v0.4.0

type TopicConnection interface {
	// Called when the connection can actively start exchanging messages
	Start(pod *Pod)
	// Close requests that the Connection close itself
	Close()
}

TopicConnection is a connection to something via a bridge such as a topic

type Transport added in v0.0.4

type Transport interface {
	// Type returns the transport's type (mesh or bridge)
	Type() TransportType
	// Setup is a transport-specific function that allows bootstrapping
	// Setup can block forever if needed; for example if a webserver is bring run
	Setup(opts *TransportOpts, connFunc ConnectFunc, findFunc FindFunc) error
	// CreateConnection connects to an endpoint and returns the Connection
	CreateConnection(endpoint string) (Connection, error)
	// ConnectBridgeTopic connects to a topic and returns a TopicConnection
	ConnectBridgeTopic(topic string) (TopicConnection, error)
}

Transport represents a Grav transport plugin

type TransportHandshake added in v0.1.0

type TransportHandshake struct {
	UUID      string   `json:"uuid"`
	BelongsTo string   `json:"belongsTo"`
	Interests []string `json:"interests"`
}

TransportHandshake represents a handshake sent to a node that you're trying to connect to

type TransportHandshakeAck added in v0.1.0

type TransportHandshakeAck struct {
	Accept    bool     `json:"accept"`
	UUID      string   `json:"uuid"`
	BelongsTo string   `json:"belongsTo"`
	Interests []string `json:"interests"`
}

TransportHandshakeAck represents a handshake response

type TransportOpts added in v0.0.5

type TransportOpts struct {
	NodeUUID string
	Port     string
	URI      string
	Logger   *vlog.Logger
	Custom   interface{}
}

TransportOpts is a set of options for transports

type TransportType added in v0.4.0

type TransportType string

TransportType defines the type of Transport (mesh or bridge)

type TransportWithdraw added in v0.5.0

type TransportWithdraw struct {
	UUID string `json:"uuid"`
}

TransportWithdraw represents a message sent to a peer indicating a withdrawal from the mesh

type WithdrawSignaler added in v0.5.0

type WithdrawSignaler struct {
	Ctx      context.Context
	DoneChan chan bool
}

WithdrawSignaler allows a connection to be notified about a withdraw event and report back to the hub that a withdraw has completed (using the Ctx and DoneChan, respectively)

func NewSignaler added in v0.5.0

func NewSignaler(ctx context.Context) *WithdrawSignaler

NewSignaler creates a new WithdrawSignaler based on the provided context

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL