broker

package
v0.0.0-...-53883ab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2020 License: MIT Imports: 37 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Globals struct {
	Cluster   *Cluster
	ConnCache *ConnCache
	Service   *Service
}

Functions

func ClusterInit

func ClusterInit(configString json.RawMessage, self *string) int

Returns worker id

func ResponseHandler

func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte)

ResponseHandler handles responses for monitoring routes

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is the representation of the cluster.

func (*Cluster) Master

func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error

Master at topic's master node receives C2S messages from topic's proxy nodes. The message is treated like it came from a session: find or create a session locally, dispatch the message to it like it came from a normal ws/lp connection. Called by a remote node.

func (*Cluster) Ping

func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error

Ping is called by the leader node to assert leadership and check status of the followers.

func (Cluster) Proxy

func (Cluster) Proxy(resp *ClusterResp, unused *bool) error

Dispatch receives messages from the master node addressed to a specific local connection.

func (*Cluster) Start

func (c *Cluster) Start()

Start accepting connections.

func (*Cluster) Vote

func (c *Cluster) Vote(vreq *ClusterVoteRequest, response *ClusterVoteResponse) error

Vote processes request for a vote from a candidate.

type ClusterNode

type ClusterNode struct {
	// contains filtered or unexported fields
}

ClusterNode is a client's connection to another node.

type ClusterPing

type ClusterPing struct {
	// Name of the leader node
	Leader string
	// Election term
	Term int
	// Ring hash signature that represents the cluster
	Signature string
	// Names of nodes currently active in the cluster
	Nodes []string
}

ClusterPing is content of a leader node ping to a follower node.

type ClusterReq

type ClusterReq struct {
	// Name of the node sending this request
	Node string

	// Ring hash signature of the node sending this request
	// Signature must match the signature of the receiver, otherwise the
	// Cluster is desynchronized.
	Signature string

	MsgSub   *lp.Subscribe
	MsgPub   *lp.Publish
	MsgUnsub *lp.Unsubscribe
	Topic    *security.Topic
	Type     uint8
	Message  *message.Message

	// Originating session
	Conn *ClusterSess
	// True if the original session has disconnected
	ConnGone bool
}

ClusterReq is a Proxy to Master request message.

type ClusterResp

type ClusterResp struct {
	Type     uint8
	MsgSub   *lp.Subscribe
	MsgPub   *lp.Publish
	MsgUnsub *lp.Unsubscribe
	Msg      []byte
	Topic    *security.Topic
	Message  *message.Message
	// Connection ID to forward message to, if any.
	FromConnID uid.LID
}

ClusterResp is a Master to Proxy response message.

type ClusterSess

type ClusterSess struct {
	// IP address of the client. For long polling this is the IP of the last poll
	RemoteAddr string

	// Connection ID
	ConnID uid.LID

	// Client ID
	ClientID uid.ID
}

ClusterSess is a basic info on a remote session where the message was created.

type ClusterVote

type ClusterVote struct {
	// contains filtered or unexported fields
}

ClusterVote is a vote request and a response in leader election.

type ClusterVoteRequest

type ClusterVoteRequest struct {
	// Candidate node which issued this request
	Node string
	// Election term
	Term int
}

ClusterVoteRequest is a request from a leader candidate to a node to vote for the candidate.

type ClusterVoteResponse

type ClusterVoteResponse struct {
	// Actual vote
	Result bool
	// Node's term after the vote
	Term int
}

ClusterVoteResponse is a vote from a node.

type Conn

type Conn struct {
	sync.Mutex

	message.MessageIds // local identifier of messages
	// contains filtered or unexported fields
}

func (*Conn) ID

func (c *Conn) ID() string

ID returns the unique identifier of the subscriber.

func (*Conn) SendMessage

func (c *Conn) SendMessage(msg *message.Message) bool

Send forwards the message to the underlying client.

func (*Conn) SendRawBytes

func (c *Conn) SendRawBytes(buf []byte) bool

Send forwards raw bytes to the underlying client.

func (*Conn) Type

func (c *Conn) Type() message.SubscriberType

Type returns the type of the subscriber

type ConnCache

type ConnCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConnCache

func NewConnCache() *ConnCache

func (*ConnCache) Add

func (cc *ConnCache) Add(conn *Conn)

func (*ConnCache) Delete

func (cc *ConnCache) Delete(connid uid.LID)

func (*ConnCache) Get

func (cc *ConnCache) Get(connid uid.LID) *Conn

Get fetches a connection from cache by connection ID.

type Meter

type Meter struct {
	Metrics        metrics.Metrics
	ConnTimeSeries metrics.TimeSeries
	Connections    metrics.Counter
	Subscriptions  metrics.Counter
	InMsgs         metrics.Counter
	OutMsgs        metrics.Counter
	InBytes        metrics.Counter
	OutBytes       metrics.Counter
}

func NewMeter

func NewMeter() *Meter

func (*Meter) UnregisterAll

func (m *Meter) UnregisterAll()

type Service

type Service struct {
	PID uint32      // The processid is unique Id for the application
	MAC *crypto.MAC // The MAC to use for decoding and encoding keys.
	// contains filtered or unexported fields
}

Service is a main struct

func NewService

func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)

func (*Service) Close

func (s *Service) Close()

func (*Service) HandleVarz

func (m *Service) HandleVarz(w http.ResponseWriter, r *http.Request)

HandleVarz will process HTTP requests for conn stats information.

func (*Service) Listen

func (s *Service) Listen() (err error)

Listen starts the service

func (*Service) Varz

func (s *Service) Varz() (*Varz, error)

Varz returns a Varz struct containing the server information.

type Varz

type Varz struct {
	Start         time.Time `json:"start"`
	Now           time.Time `json:"now"`
	Uptime        string    `json:"uptime"`
	Connections   int64     `json:"connections"`
	InMsgs        int64     `json:"in_msgs"`
	OutMsgs       int64     `json:"out_msgs"`
	InBytes       int64     `json:"in_bytes"`
	OutBytes      int64     `json:"out_bytes"`
	Subscriptions int64     `json:"subscriptions"`
	HMean         float64   `json:"hmean"` // Event duration harmonic mean.
	P50           float64   `json:"p50"`   // Event duration nth percentiles ..
	P75           float64   `json:"p75"`
	P95           float64   `json:"p95"`
	P99           float64   `json:"p99"`
	P999          float64   `json:"p999"`
	Long5p        float64   `json:"long_5p"`  // Average of the longest 5% event durations.
	Short5p       float64   `json:"short_5p"` // Average of the shortest 5% event durations.
	Max           float64   `json:"max"`      // Highest event duration.
	Min           float64   `json:"min"`      // Lowest event duration.
	StdDev        float64   `json:"stddev"`   // Standard deviation.

}

Stats will output server information on the monitoring port at /varz.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL