edge

package
v0.0.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2020 License: BSD-3-Clause Imports: 26 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Cluster

func (*Cluster) AddMember

func (c *Cluster) AddMember(m *ClusterMember)

func (*Cluster) GetByID

func (c *Cluster) GetByID(id string) *ClusterMember

func (*Cluster) LeaderID

func (c *Cluster) LeaderID() string

func (*Cluster) Members

func (c *Cluster) Members() []*ClusterMember

func (*Cluster) RemoveMember

func (c *Cluster) RemoveMember(m *ClusterMember)

type ClusterMember

type ClusterMember struct {
	ServerID    string
	ReplicaSet  uint64
	ShardRange  [2]uint32
	GatewayAddr []string
	ClusterAddr net.IP
	ClusterPort uint16
	RaftPort    int
	RaftState   rony.RaftState
	// contains filtered or unexported fields
}

ClusterMember

type DispatchCtx

type DispatchCtx struct {
	// contains filtered or unexported fields
}

DispatchCtx

func (*DispatchCtx) Conn

func (ctx *DispatchCtx) Conn() gateway.Conn

func (*DispatchCtx) Debug

func (ctx *DispatchCtx) Debug()

func (*DispatchCtx) FillEnvelope

func (ctx *DispatchCtx) FillEnvelope(requestID uint64, constructor int64, payload []byte)

func (*DispatchCtx) Get

func (ctx *DispatchCtx) Get(key string) interface{}

func (*DispatchCtx) GetBool

func (ctx *DispatchCtx) GetBool(key string) bool

func (*DispatchCtx) GetBytes

func (ctx *DispatchCtx) GetBytes(key string, defaultValue []byte) []byte

func (*DispatchCtx) GetInt64

func (ctx *DispatchCtx) GetInt64(key string, defaultValue int64) int64

func (*DispatchCtx) GetString

func (ctx *DispatchCtx) GetString(key string, defaultValue string) string

func (*DispatchCtx) Set

func (ctx *DispatchCtx) Set(key string, v interface{})

func (*DispatchCtx) StreamID

func (ctx *DispatchCtx) StreamID() int64

func (*DispatchCtx) UnmarshalEnvelope

func (ctx *DispatchCtx) UnmarshalEnvelope(data []byte) error

type Dispatcher

type Dispatcher interface {
	// All the input arguments are valid in the function context, if you need to pass 'envelope' to other
	// async functions, make sure to hard copy (clone) it before sending it.
	OnMessage(ctx *DispatchCtx, envelope *rony.MessageEnvelope, kvs ...*rony.KeyValue)
	// All the input arguments are valid in the function context, if you need to pass 'data' or 'envelope' to other
	// async functions, make sure to hard copy (clone) it before sending it. If 'err' is not nil then envelope will be
	// discarded, it is the user's responsibility to send back appropriate message using 'conn'
	// Note that conn IS NOT nil in any circumstances.
	Interceptor(ctx *DispatchCtx, data []byte, kvs ...gateway.KeyValue) (err error)
	// This will be called when the context has been finished, this lets cleaning up, or in case you need to flush the
	// messages and updates in one go.
	Done(ctx *DispatchCtx)
	// This will be called when a new connection has been opened
	OnOpen(conn gateway.Conn)
	// This will be called when a connection is closed
	OnClose(conn gateway.Conn)
}

Dispatcher

type GetConstructorNameFunc

type GetConstructorNameFunc func(constructor int64) string

type Handler

type Handler func(ctx *RequestCtx, in *rony.MessageEnvelope)

type Option

type Option func(edge *Server)

Option

func WithCustomConstructorName

func WithCustomConstructorName(h func(constructor int64) (name string)) Option

WithCustomerConstructorName will be used to give a human readable names to messages

func WithDataPath

func WithDataPath(path string) Option

WithDataPath set where the internal data for raft and gossip are stored.

func WithGossipPort

func WithGossipPort(gossipPort int) Option

WithGossipPort

func WithReplicaSet

func WithReplicaSet(replicaSet uint64, bindPort int, bootstrap bool) Option

WithReplicaSet

func WithTcpGateway

func WithTcpGateway(config tcpGateway.Config) Option

WithTcpGateway set the gateway to tcp which can support http and/or websocket Only one gateway could be set and if you set another gateway it panics on runtime.

func WithTestGateway

func WithTestGateway(config dummyGateway.Config) Option

WithTestGateway set the gateway to a dummy gateway which is useful for writing tests. Only one gateway could be set and if you set another gateway it panics on runtime.

type RequestCtx

type RequestCtx struct {
	// contains filtered or unexported fields
}

RequestCtx

func (*RequestCtx) Conn

func (ctx *RequestCtx) Conn() gateway.Conn

func (*RequestCtx) ConnID

func (ctx *RequestCtx) ConnID() uint64

func (*RequestCtx) Get

func (ctx *RequestCtx) Get(key string) interface{}

func (*RequestCtx) GetBool

func (ctx *RequestCtx) GetBool(key string) bool

func (*RequestCtx) GetBytes

func (ctx *RequestCtx) GetBytes(key string, defaultValue []byte) []byte

func (*RequestCtx) GetInt64

func (ctx *RequestCtx) GetInt64(key string, defaultValue int64) int64

func (*RequestCtx) GetString

func (ctx *RequestCtx) GetString(key string, defaultValue string) string

func (*RequestCtx) PushClusterMessage

func (ctx *RequestCtx) PushClusterMessage(serverID string, requestID uint64, constructor int64, proto proto.Message, kvs ...*rony.KeyValue)

func (*RequestCtx) PushCustomError

func (ctx *RequestCtx) PushCustomError(code, item string, enTxt string, enItems []string, localTxt string, localItems []string)

func (*RequestCtx) PushCustomMessage

func (ctx *RequestCtx) PushCustomMessage(requestID uint64, constructor int64, proto proto.Message, kvs ...*rony.KeyValue)

func (*RequestCtx) PushError

func (ctx *RequestCtx) PushError(code, item string)

func (*RequestCtx) PushMessage

func (ctx *RequestCtx) PushMessage(constructor int64, proto proto.Message)

func (*RequestCtx) PushRedirectLeader

func (ctx *RequestCtx) PushRedirectLeader()

func (*RequestCtx) PushRedirectShard

func (ctx *RequestCtx) PushRedirectShard(shard uint32, wait time.Duration)

func (*RequestCtx) ReqID

func (ctx *RequestCtx) ReqID() uint64

func (*RequestCtx) Return

func (ctx *RequestCtx) Return()

func (*RequestCtx) Set

func (ctx *RequestCtx) Set(key string, v interface{})

func (*RequestCtx) StopExecution

func (ctx *RequestCtx) StopExecution()

func (*RequestCtx) Stopped

func (ctx *RequestCtx) Stopped() bool

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server

func NewServer

func NewServer(serverID string, dispatcher Dispatcher, opts ...Option) *Server

func (*Server) AppendHandlers

func (edge *Server) AppendHandlers(constructor int64, handlers ...Handler)

AppendHandlers appends the handlers for the constructor in order. So handlers[n] will be called before handlers[n+1].

func (*Server) ClusterMembers

func (edge *Server) ClusterMembers() []*ClusterMember

ClusterMembers returns a list of all the discovered nodes in the cluster

func (*Server) ClusterSend

func (edge *Server) ClusterSend(serverID string, envelope *rony.MessageEnvelope, kvs ...*rony.KeyValue) (err error)

ClusterSend sends 'envelope' to the server identified by 'serverID'. It may returns ErrNotFound if the server is not in the list. The message will be send with BEST EFFORT and using UDP

func (*Server) GetGatewayConn added in v0.0.6

func (edge *Server) GetGatewayConn(connID uint64) gateway.Conn

GetGatewayConn return the gateway connection identified by connID or returns nil if not found.

func (*Server) GetServerID

func (edge *Server) GetServerID() string

GetServerID return this server id, which MUST be unique in the cluster otherwise the behaviour is unknown.

func (*Server) JoinCluster

func (edge *Server) JoinCluster(addr ...string) (int, error)

JoinCluster is used to take an existing Cluster and attempt to join a cluster by contacting all the given hosts and performing a state sync. This returns the number of hosts successfully contacted and an error if none could be reached. If an error is returned, the node did not successfully join the cluster.

func (*Server) PrependHandlers

func (edge *Server) PrependHandlers(constructor int64, handlers ...Handler)

PrependHandlers prepends the handlers for the constructor in order.

func (*Server) SetHandlers

func (edge *Server) SetHandlers(constructor int64, leaderOnly bool, handlers ...Handler)

SetHandlers set the handlers for the constructor. 'leaderOnly' is applicable ONLY if the cluster is run with Raft support. If cluster is a Raft enabled cluster, then by setting 'leaderOnly' to TRUE, requests sent to a follower server will return redirect error to the client. For standalone servers 'leaderOnly' does not affect.

func (*Server) SetPostHandlers added in v0.0.5

func (edge *Server) SetPostHandlers(handlers ...Handler)

SetPreHandlers set the handler which will be called after executing any request. These pre handlers are like middlewares which will be automatically triggered for each request. If you want to set post handler for specific request the your must use SetHandlers, PrependHandlers or AppendHandlers

func (*Server) SetPreHandlers added in v0.0.5

func (edge *Server) SetPreHandlers(handlers ...Handler)

SetPreHandlers set the handler which will be called before executing any request. These pre handlers are like middlewares which will be automatically triggered for each request. If you want to set pre handler for specific request the your must use SetHandlers, PrependHandlers or AppendHandlers

func (*Server) Shutdown

func (edge *Server) Shutdown()

Shutdown gracefully shutdown the services

func (*Server) ShutdownWithSignal

func (edge *Server) ShutdownWithSignal(signals ...os.Signal)

Shutdown blocks until any of the signals has been called

func (*Server) StartCluster

func (edge *Server) StartCluster() (err error)

StartCluster is non-blocking function which runs the gossip and raft if it is set

func (*Server) StartGateway

func (edge *Server) StartGateway()

StartGateway is non-blocking function runs the gateway in background so we can accept clients requests

func (*Server) Stats

func (edge *Server) Stats() *Stats

Stats exports some internal metrics data packed in 'Stats' struct

type SimpleDispatcher

type SimpleDispatcher struct {
	OnMessageFunc func(ctx *DispatchCtx, envelope *rony.MessageEnvelope, kvs ...*rony.KeyValue)
}

SimpleDispatcher is a naive implementation of Dispatcher. You only need to set OnMessageFunc with

func (*SimpleDispatcher) Done

func (s *SimpleDispatcher) Done(ctx *DispatchCtx)

func (*SimpleDispatcher) Interceptor

func (s *SimpleDispatcher) Interceptor(ctx *DispatchCtx, data []byte, kvs ...gateway.KeyValue) (err error)

func (*SimpleDispatcher) OnClose

func (s *SimpleDispatcher) OnClose(conn gateway.Conn)

func (*SimpleDispatcher) OnMessage

func (s *SimpleDispatcher) OnMessage(ctx *DispatchCtx, envelope *rony.MessageEnvelope, kvs ...*rony.KeyValue)

func (*SimpleDispatcher) OnOpen

func (s *SimpleDispatcher) OnOpen(conn gateway.Conn)

type Stats

type Stats struct {
	Address         string
	RaftMembers     int
	RaftState       string
	ReplicaSet      uint64
	Members         int
	MembershipScore int
	GatewayProtocol gateway.Protocol
	GatewayAddr     []string
}

Stats exports some internal metrics data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL