hub

package
v0.0.0-...-b226945 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2024 License: BSD-2-Clause Imports: 9 Imported by: 4

Documentation

Overview

Package hub provides a transport agnostic connection hub.

Index

Constants

View Source
const (
	Signon  = "_signon"
	Signoff = "_signoff"
)

Variables

This section is empty.

Functions

func NextID

func NextID() int64

NextID returns a new unused normal connection id.

func Send

func Send(c Conn, m *Msg) bool

Send sends a message to a connection that might have signed off and returns the success.

Types

type ChanConn

type ChanConn struct {
	// contains filtered or unexported fields
}

ChanConn is a channel based connection used for simple in-process hub participants.

func NewChanConn

func NewChanConn(ctx context.Context, id int64, user string, c chan *Msg) *ChanConn

NewChanConn returns a new channel connection with the given id and channel.

func (*ChanConn) Chan

func (c *ChanConn) Chan() chan<- *Msg

func (*ChanConn) Ctx

func (c *ChanConn) Ctx() context.Context

func (*ChanConn) ID

func (c *ChanConn) ID() int64

func (*ChanConn) User

func (c *ChanConn) User() string

type Conn

type Conn interface {
	// Ctx returns the connection context.
	Ctx() context.Context
	// ID is an internal connection identifier, the hub has id 0, transient connections have a
	// negative and normal connections positive ids.
	ID() int64
	// User is an external user identifier
	User() string
	// Chan returns an unchanging receiver channel. The hub sends a nil message to this
	// channel after a signoff message from this conn was received.
	Chan() chan<- *Msg
}

Conn is a connection abstraction providing a ID, user field and a channel to send messages. Connections can represent one-off calls, connected clients of any kind, or the hub itself.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the central message server that manages connection. Hub itself represents conn id 0. Connection creators are also responsible for sending the signon message and validating messages. One-off connections used for a simple request-response round trips can be used without signon and must use id -1. These connections can only be responded to directly and must not be stored.

func NewHub

func NewHub(ctx context.Context) *Hub

NewHub creates and returns a new hub.

func (*Hub) Chan

func (h *Hub) Chan() chan<- *Msg

func (*Hub) Ctx

func (h *Hub) Ctx() context.Context

func (*Hub) ID

func (h *Hub) ID() int64

func (*Hub) Run

func (h *Hub) Run(rs ...Router)

Run starts routing received messages with the given routers. It is usually run as go routine.

func (*Hub) User

func (h *Hub) User() string

type MatchFilter

type MatchFilter struct {
	Router
	Match []string
}

MatchFilter only routes messages, that match one of a list of subjects.

type Msg

type Msg struct {
	// From is the origin connection of this message or nil for server internal messages.
	From Conn
	// Subj is the required message header used for routing and determining the data type.
	Subj string
	// Tok is a client token that is used in replies, so they can be matched to a request.
	Tok string
	// Raw is the message body as bytes usually encoded as JSON.
	Raw []byte
	// Data is the typed body data and can be used to avoid serialization of internal messages.
	Data interface{}
}

Msg is the central structure passed between connections. The optional body is represented by raw bytes or typed data. If raw bytes are required but not set, non nil data is encoded as JSON.

func Parse

func Parse(str string) (*Msg, error)

Parse parses str and returns a message or an error.

func RawMsg

func RawMsg(subj string, v interface{}) (m *Msg, err error)

RawMsg returns a new message with subj and v encoded as JSON or an error. This is helpful encode the message body immediately to not hand off ownership of v.

func Read

func Read(input []byte) (*Msg, error)

Read parses input bytes and returns a message or an error. The byte slice is then owned by the message and cannot be reused.

func Req

func Req(hub chan<- *Msg, user string, req *Msg, timeout time.Duration) (*Msg, error)

Req sends req to the hub from a newly created transient connection and returns the first response or an error if the timeout was reached.

func (*Msg) Reply

func (m *Msg) Reply(data interface{}) *Msg

Reply returns a new message with the same subj and tok and a new data. The message body will be encoded directly and data will not set.

func (*Msg) ReplyErr

func (m *Msg) ReplyErr(err error) *Msg

ReplyErr returns a new reply with the error encoded as the single json object field err

func (*Msg) ReplyRes

func (m *Msg) ReplyRes(result interface{}) *Msg

ReplyRes returns a new reply with result encoded as the single json object field res

func (*Msg) String

func (m *Msg) String() string

String returns the default string format of this message.

func (*Msg) Unmarshal

func (m *Msg) Unmarshal(v interface{}) error

Unmarshal reads a JSON message body into v and sets it as message data or returns an error.

type Router

type Router interface{ Route(*Msg) }

Router routes a received message to connection.

type RouterFunc

type RouterFunc func(*Msg)

RouterFunc implements Router for simple route functions.

func NewMatchFilter

func NewMatchFilter(r Router, match ...string) RouterFunc

NewMatchFilter returns a new router that matches message subject and calls r.

func NewPrefixFilter

func NewPrefixFilter(r Router, match ...string) RouterFunc

NewMatchFilter returns a new router that matches message subject prefixes and calls r.

func NewRegexpFilter

func NewRegexpFilter(r Router, pat *regexp.Regexp) RouterFunc

NewRegexpFilter returns a new router that matches message subjects with regexp and calls r.

func (RouterFunc) Route

func (r RouterFunc) Route(m *Msg)

type Routers

type Routers []Router

Routers is a slice of routers, all of them are called with incoming messages.

func (Routers) Route

func (rs Routers) Route(m *Msg)

type Service

type Service interface {
	// Serve handles the message and returns the response or nil, or an error.
	Serve(*Msg) (*Msg, error)
}

A service is a common interface for the last message processor in line. It usually is used by wrappers, that handles request parsing and delegate.

type Services

type Services map[string]Service

Services maps message subjects to services.

func (Services) Handle

func (s Services) Handle(m *Msg) bool

Handle calls the service with m's subject or returns an error. If the service returns data and c is not nil, a reply is sent to the sender.

func (Services) Merge

func (s Services) Merge(o Services) Services

type TokMap

type TokMap struct {
	// contains filtered or unexported fields
}

func (*TokMap) Add

func (r *TokMap) Add(m *Msg) string

func (*TokMap) Respond

func (r *TokMap) Respond(m *Msg) error

Directories

Path Synopsis
Package wshub provides a websocket server and client using gorilla/websocket for package hub.
Package wshub provides a websocket server and client using gorilla/websocket for package hub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL