pitaya

package
v0.0.0-...-2b6943d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 45 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddGRPCInfoToMetadata

func AddGRPCInfoToMetadata(
	metadata map[string]string,
	region string,
	host, port string,
	externalHost, externalPort string,
) map[string]string

AddGRPCInfoToMetadata adds host, external host and port into metadata

func AddMetricTagsToPropagateCtx

func AddMetricTagsToPropagateCtx(
	ctx context.Context,
	tags map[string]string,
) context.Context

AddMetricTagsToPropagateCtx adds a key and metric tags that will be propagated through RPC calls. Use the same tags that are at 'pitaya.metrics.additionalTags' config

func AddRoute

func AddRoute(serverType string, routingFunction router.RoutingFunc) error

func AddSDListener

func AddSDListener(listener cluster.SDListener)

func AddToPropagateCtx

func AddToPropagateCtx(ctx context.Context, key string, val interface{}) context.Context

AddToPropagateCtx adds a key and value that will be propagated through RPC calls

func Configure

func Configure(
	isFrontend bool,
	serverType string,
	serverMode ServerMode,
	serverMetadata map[string]string,
	cfgs ...*viper.Viper,
)

Configure configures the app

func CreatePrometheusReporter

func CreatePrometheusReporter(serverType string, config config.PrometheusConfig, customSpecs models.CustomMetricsSpec) (*metrics.PrometheusReporter, error)

CreatePrometheusReporter create a Prometheus reporter instance

func CreateStatsdReporter

func CreateStatsdReporter(serverType string, config config.StatsdConfig) (*metrics.StatsdReporter, error)

CreateStatsdReporter create a Statsd reporter instance

func Error

func Error(err error, code string, metadata ...map[string]string) *errors.Error

Error creates a new error with a code, message and metadata

func ExtractSpan

func ExtractSpan(ctx context.Context) (opentracing.SpanContext, error)

ExtractSpan retrieves an opentracing span context from the given context The span context can be received directly or via an RPC call

func GetDefaultLoggerFromCtx

func GetDefaultLoggerFromCtx(ctx context.Context) logging.Logger

GetDefaultLoggerFromCtx returns the default logger from the given context

func GetDieChan

func GetDieChan() chan bool

func GetFromPropagateCtx

func GetFromPropagateCtx(ctx context.Context, key string) interface{}

GetFromPropagateCtx adds a key and value that came through RPC calls

func GetMetricsReporters

func GetMetricsReporters() []metrics.Reporter

func GetModule

func GetModule(name string) (interfaces.Module, error)

func GetRouteFromCtx

func GetRouteFromCtx(ctx context.Context) *route.Route

GetRouteFromCtx retrieves a session from a given context

func GetServer

func GetServer() *cluster.Server

func GetServerByID

func GetServerByID(id string) (*cluster.Server, error)

func GetServerID

func GetServerID() string

GetServerUniqueID returns the generated server id

func GetServers

func GetServers() []*cluster.Server

func GetServersByType

func GetServersByType(t string) (map[string]*cluster.Server, error)

func GetSessionFromCtx

func GetSessionFromCtx(ctx context.Context) session.Session

func GetZoneID

func GetZoneID() string

func GroupAddMember

func GroupAddMember(ctx context.Context, groupName, uid string) error

func GroupBroadcast

func GroupBroadcast(ctx context.Context, frontendType, groupName, route string, v interface{}) error

func GroupContainsMember

func GroupContainsMember(ctx context.Context, groupName, uid string) (bool, error)

func GroupCountMembers

func GroupCountMembers(ctx context.Context, groupName string) (int, error)

func GroupCreate

func GroupCreate(ctx context.Context, groupName string) error

func GroupCreateWithTTL

func GroupCreateWithTTL(ctx context.Context, groupName string, ttlTime time.Duration) error

func GroupDelete

func GroupDelete(ctx context.Context, groupName string) error

func GroupMembers

func GroupMembers(ctx context.Context, groupName string) ([]string, error)

func GroupRemoveAll

func GroupRemoveAll(ctx context.Context, groupName string) error

func GroupRemoveMember

func GroupRemoveMember(ctx context.Context, groupName, uid string) error

func GroupRenewTTL

func GroupRenewTTL(ctx context.Context, groupName string) error

func IsRunning

func IsRunning() bool

func NewAfterTimer

func NewAfterTimer(duration time.Duration, fn timer.Func) *timer.Timer

NewAfterTimer returns a new Timer containing a function that will be called after duration that specified by the duration argument. The duration d must be greater than zero; if not, NewAfterTimer will panic. Stop the timer to release associated resources.

func NewCountTimer

func NewCountTimer(interval time.Duration, count int, fn timer.Func) *timer.Timer

NewCountTimer returns a new Timer containing a function that will be called with a period specified by the duration argument. After count times, timer will be stopped automatically, It adjusts the intervals for slow receivers. The duration d must be greater than zero; if not, NewCountTimer will panic. Stop the timer to release associated resources.

func NewTimer

func NewTimer(interval time.Duration, fn timer.Func) *timer.Timer

NewTimer returns a new Timer containing a function that will be called with a period specified by the duration argument. It adjusts the intervals for slow receivers. The duration d must be greater than zero; if not, NewTimer will panic. Stop the timer to release associated resources.

func RPC

func RPC(ctx context.Context, routeStr string, reply proto.Message, arg proto.Message) error

func RPCTo

func RPCTo(ctx context.Context, serverID, routeStr string, reply proto.Message, arg proto.Message) error

func Register

func Register(c component.Component, options ...component.Option)

func RegisterModule

func RegisterModule(module interfaces.Module, name string) error

func RegisterModuleAfter

func RegisterModuleAfter(module interfaces.Module, name string) error

func RegisterModuleBefore

func RegisterModuleBefore(module interfaces.Module, name string) error

func RegisterRPCJob

func RegisterRPCJob(rpcJob worker.RPCJob) error

func ReliableRPC

func ReliableRPC(routeStr string, metadata map[string]interface{}, reply, arg proto.Message) (jid string, err error)

func ReliableRPCWithOptions

func ReliableRPCWithOptions(routeStr string, metadata map[string]interface{}, reply, arg proto.Message, opts *config.EnqueueOpts) (jid string, err error)

func SendKickToUsers

func SendKickToUsers(uids []string, frontendType string) ([]string, error)

func SendPushToUser

func SendPushToUser(route string, v interface{}, uid string, frontId string, frontendType string) error

func SendPushToUsers

func SendPushToUsers(route string, v interface{}, uids []string, frontendType string) ([]string, error)

func SetDebug

func SetDebug(debug bool)

func SetDictionary

func SetDictionary(dict map[string]uint16) error

func SetHeartbeatTime

func SetHeartbeatTime(interval time.Duration)

func SetLogger

func SetLogger(l logging.Logger)

SetLogger logger setter

func SetTimerPrecision

func SetTimerPrecision(precision time.Duration)

SetTimerPrecision set the ticker precision, and time precision can not less than a Millisecond, and can not change after application running. The default precision is time.Second

func Shutdown

func Shutdown()

func Start

func Start()

func StartWorker

func StartWorker()

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App is the base app struct

func NewApp

func NewApp(
	serverMode ServerMode,
	serializer serialize.Serializer,
	acceptors []acceptor.Acceptor,
	dieChan chan bool,
	router *router.Router,
	server *cluster.Server,
	rpcClient cluster.RPCClient,
	rpcServer cluster.RPCServer,
	worker *worker.Worker,
	serviceDiscovery cluster.ServiceDiscovery,
	remoteService *service.RemoteService,
	handlerService *service.HandlerService,
	groups groups.GroupService,
	sessionPool session.SessionPool,
	metricsReporters []metrics.Reporter,
	config config.PitayaConfig,
) *App

NewApp is the base constructor for a pitaya app instance

func (*App) AddPreRouteHook

func (app *App) AddPreRouteHook(f router.PreRouteHookFunc)

func (*App) AddRoute

func (app *App) AddRoute(
	serverType string,
	routingFunction router.RoutingFunc,
) error

AddRoute adds a routing function to a server type

func (*App) AddSDListener

func (app *App) AddSDListener(listener cluster.SDListener)

func (*App) ExtendRemote

func (app *App) ExtendRemote(svc, name string, remote *component.Remote) error

func (*App) GetDieChan

func (app *App) GetDieChan() chan bool

GetDieChan gets the channel that the app sinalizes when its going to die

func (*App) GetMetricsReporters

func (app *App) GetMetricsReporters() []metrics.Reporter

GetMetricsReporters gets registered metrics reporters

func (*App) GetModule

func (app *App) GetModule(name string) (interfaces.Module, error)

GetModule gets a module with a name

func (*App) GetRouteFromCtx

func (app *App) GetRouteFromCtx(ctx context.Context) *route.Route

GetRouteFromCtx retrieves a session from a given context

func (*App) GetServer

func (app *App) GetServer() *cluster.Server

GetServer gets the local server instance

func (*App) GetServerByID

func (app *App) GetServerByID(id string) (*cluster.Server, error)

GetServerByID returns the server with the specified id

func (*App) GetServerID

func (app *App) GetServerID() string

GetServerID returns the generated server id

func (*App) GetServers

func (app *App) GetServers() []*cluster.Server

GetServers get all servers

func (*App) GetServersByType

func (app *App) GetServersByType(t string) (map[string]*cluster.Server, error)

GetServersByType get all servers of type

func (*App) GetSessionFromCtx

func (app *App) GetSessionFromCtx(ctx context.Context) session.Session

GetSessionFromCtx retrieves a session from a given context

func (*App) GetSessionPool

func (app *App) GetSessionPool() session.SessionPool

func (*App) GetZoneID

func (app *App) GetZoneID() string

GetZoneID returns the generated server id

func (*App) GroupAddMember

func (app *App) GroupAddMember(ctx context.Context, groupName, uid string) error

GroupAddMember adds UID to group

func (*App) GroupBroadcast

func (app *App) GroupBroadcast(ctx context.Context, frontendType, groupName, route string, v interface{}) error

GroupBroadcast pushes the message to all members inside group

func (*App) GroupContainsMember

func (app *App) GroupContainsMember(ctx context.Context, groupName, uid string) (bool, error)

GroupContainsMember checks whether an UID is contained in group or not

func (*App) GroupCountMembers

func (app *App) GroupCountMembers(ctx context.Context, groupName string) (int, error)

GroupCountMembers get current member amount in group

func (*App) GroupCreate

func (app *App) GroupCreate(ctx context.Context, groupName string) error

GroupCreate creates a group

func (*App) GroupCreateWithTTL

func (app *App) GroupCreateWithTTL(ctx context.Context, groupName string, ttlTime time.Duration) error

GroupCreateWithTTL creates a group with given TTL

func (*App) GroupDelete

func (app *App) GroupDelete(ctx context.Context, groupName string) error

GroupDelete deletes whole group, including UIDs and base group

func (*App) GroupMembers

func (app *App) GroupMembers(ctx context.Context, groupName string) ([]string, error)

GroupMembers returns all member's UIDs

func (*App) GroupRemoveAll

func (app *App) GroupRemoveAll(ctx context.Context, groupName string) error

GroupRemoveAll clears all UIDs

func (*App) GroupRemoveMember

func (app *App) GroupRemoveMember(ctx context.Context, groupName, uid string) error

GroupRemoveMember removes specified UID from group

func (*App) GroupRenewTTL

func (app *App) GroupRenewTTL(ctx context.Context, groupName string) error

GroupRenewTTL renews group with the initial TTL

func (*App) IsRunning

func (app *App) IsRunning() bool

IsRunning indicates if the Pitaya app has been initialized. Note: This doesn't cover acceptors, only the pitaya internal registration and modules initialization.

func (*App) OnSessionBind

func (app *App) OnSessionBind(f func(session.Session))

func (*App) OnSessionClosed

func (app *App) OnSessionClosed(f func(session.Session))

func (*App) OnSessionHeartbeat

func (app *App) OnSessionHeartbeat(f func(session.Session))

func (*App) RPC

func (app *App) RPC(ctx context.Context, routeStr string, reply proto.Message, arg proto.Message) error

RPC calls a method in a different server

func (*App) RPCTo

func (app *App) RPCTo(ctx context.Context, serverID, routeStr string, reply proto.Message, arg proto.Message) error

RPCTo send a rpc to a specific server

func (*App) Register

func (app *App) Register(c component.Component, options ...component.Option)

Register register a component with options

func (*App) RegisterModule

func (app *App) RegisterModule(module interfaces.Module, name string) error

RegisterModule registers a module, by default it register after registered modules

func (*App) RegisterModuleAfter

func (app *App) RegisterModuleAfter(module interfaces.Module, name string) error

RegisterModuleAfter registers a module after all registered modules

func (*App) RegisterModuleBefore

func (app *App) RegisterModuleBefore(module interfaces.Module, name string) error

RegisterModuleBefore registers a module before all registered modules

func (*App) RegisterRPCJob

func (app *App) RegisterRPCJob(rpcJob worker.RPCJob) error

RegisterRPCJob registers rpc job to execute jobs with retries

func (*App) ReliableRPC

func (app *App) ReliableRPC(
	routeStr string,
	metadata map[string]interface{},
	reply, arg proto.Message,
) (jid string, err error)

ReliableRPC enqueues RPC to worker so it's executed asynchronously Default enqueue options are used

func (*App) ReliableRPCWithOptions

func (app *App) ReliableRPCWithOptions(
	routeStr string,
	metadata map[string]interface{},
	reply, arg proto.Message,
	opts *config.EnqueueOpts,
) (jid string, err error)

ReliableRPCWithOptions enqueues RPC to worker Receive worker options for this specific RPC

func (*App) SendKickToUsers

func (app *App) SendKickToUsers(uids []string, frontendType string) ([]string, error)

SendKickToUsers sends kick to an user array

func (*App) SendPushBytesToUsers

func (app *App) SendPushBytesToUsers(route string, data []byte, uids []string, frontendType string) ([]string, error)

func (*App) SendPushToUser

func (app *App) SendPushToUser(route string, v interface{}, uid string, frontId string, frontType string) error

func (*App) SendPushToUsers

func (app *App) SendPushToUsers(route string, v interface{}, uids []string, frontendType string) ([]string, error)

SendPushToUsers sends a message to the given list of users

func (*App) SetClientVersion

func (app *App) SetClientVersion(v map[int]int)

func (*App) SetDebug

func (app *App) SetDebug(debug bool)

SetDebug toggles debug on/off

func (*App) SetDictionary

func (app *App) SetDictionary(dict map[string]uint16) error

SetDictionary sets routes map

func (*App) SetHeartbeatTime

func (app *App) SetHeartbeatTime(interval time.Duration)

SetHeartbeatTime sets the heartbeat time

func (*App) Shutdown

func (app *App) Shutdown()

Shutdown send a signal to let 'pitaya' shutdown itself.

func (*App) Start

func (app *App) Start()

Start starts the app

func (*App) StartWorker

func (app *App) StartWorker()

StartWorker configures, starts and returns pitaya worker

type Builder

type Builder struct {
	Config           config.BuilderConfig
	DieChan          chan bool
	PacketDecoder    codec.PacketDecoder
	PacketEncoder    codec.PacketEncoder
	MessageEncoder   *message.MessagesEncoder
	Serializer       serialize.Serializer
	Router           *router.Router
	RPCClient        cluster.RPCClient
	RPCServer        cluster.RPCServer
	MetricsReporters []metrics.Reporter
	Server           *cluster.Server
	ServerMode       ServerMode
	ServiceDiscovery cluster.ServiceDiscovery
	Groups           groups.GroupService
	SessionPool      session.SessionPool
	Worker           *worker.Worker
	HandlerHooks     *pipeline.HandlerHooks
	// contains filtered or unexported fields
}

Builder holds dependency instances for a pitaya App

func NewBuilder

func NewBuilder(isFrontend bool,
	serverType string,
	serverMode ServerMode,
	serverMetadata map[string]string,
	config config.BuilderConfig,
	customMetrics models.CustomMetricsSpec,
	prometheusConfig config.PrometheusConfig,
	statsdConfig config.StatsdConfig,
	etcdSDConfig config.EtcdServiceDiscoveryConfig,
	natsRPCServerConfig config.NatsRPCServerConfig,
	natsRPCClientConfig config.NatsRPCClientConfig,
	workerConfig config.WorkerConfig,
	enqueueOpts config.EnqueueOpts,
	groupServiceConfig config.MemoryGroupConfig,
) *Builder

NewBuilder return a builder instance with default dependency instances for a pitaya App, with configs explicitly defined

func NewBuilderWithConfigs

func NewBuilderWithConfigs(
	isFrontend bool,
	serverType string,
	serverMode ServerMode,
	serverMetadata map[string]string,
	conf *config.Config,
) *Builder

NewBuilderWithConfigs return a builder instance with default dependency instances for a pitaya App with configs defined by a config file (config.Config) and default paths (see documentation).

func NewDefaultBuilder

func NewDefaultBuilder(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, builderConfig config.BuilderConfig) *Builder

NewDefaultBuilder return a builder instance with default dependency instances for a pitaya App, with default configs

func (*Builder) AddAcceptor

func (builder *Builder) AddAcceptor(ac acceptor.Acceptor)

AddAcceptor adds a new acceptor to app

func (*Builder) Build

func (builder *Builder) Build() Pitaya

Build returns a valid App instance

type Pitaya

type Pitaya interface {
	GetDieChan() chan bool
	SetDebug(debug bool)
	SetHeartbeatTime(interval time.Duration)
	GetZoneID() string
	GetServerID() string
	GetMetricsReporters() []metrics.Reporter
	GetServer() *cluster.Server
	GetServerByID(id string) (*cluster.Server, error)
	GetServersByType(t string) (map[string]*cluster.Server, error)
	GetServers() []*cluster.Server
	GetSessionFromCtx(ctx context.Context) session.Session
	GetRouteFromCtx(ctx context.Context) *route.Route
	Start()
	SetDictionary(dict map[string]uint16) error
	AddRoute(serverType string, routingFunction router.RoutingFunc) error
	AddPreRouteHook(f router.PreRouteHookFunc)
	Shutdown()
	StartWorker()
	RegisterRPCJob(rpcJob worker.RPCJob) error
	IsRunning() bool
	SetClientVersion(v map[int]int)

	RPC(ctx context.Context, routeStr string, reply proto.Message, arg proto.Message) error
	RPCTo(ctx context.Context, serverID, routeStr string, reply proto.Message, arg proto.Message) error
	ReliableRPC(
		routeStr string,
		metadata map[string]interface{},
		reply, arg proto.Message,
	) (jid string, err error)
	ReliableRPCWithOptions(
		routeStr string,
		metadata map[string]interface{},
		reply, arg proto.Message,
		opts *config.EnqueueOpts,
	) (jid string, err error)

	SendPushToUser(route string, v interface{}, uid string, frontId string, frontType string) error
	SendPushToUsers(route string, v interface{}, uids []string, frontendType string) ([]string, error)
	SendPushBytesToUsers(route string, data []byte, uids []string, frontendType string) ([]string, error)
	SendKickToUsers(uids []string, frontendType string) ([]string, error)

	GroupCreate(ctx context.Context, groupName string) error
	GroupCreateWithTTL(ctx context.Context, groupName string, ttlTime time.Duration) error
	GroupMembers(ctx context.Context, groupName string) ([]string, error)
	GroupBroadcast(ctx context.Context, frontendType, groupName, route string, v interface{}) error
	GroupContainsMember(ctx context.Context, groupName, uid string) (bool, error)
	GroupAddMember(ctx context.Context, groupName, uid string) error
	GroupRemoveMember(ctx context.Context, groupName, uid string) error
	GroupRemoveAll(ctx context.Context, groupName string) error
	GroupCountMembers(ctx context.Context, groupName string) (int, error)
	GroupRenewTTL(ctx context.Context, groupName string) error
	GroupDelete(ctx context.Context, groupName string) error

	Register(c component.Component, options ...component.Option)

	ExtendRemote(svc, name string, remote *component.Remote) error

	RegisterModule(module interfaces.Module, name string) error
	RegisterModuleAfter(module interfaces.Module, name string) error
	RegisterModuleBefore(module interfaces.Module, name string) error
	GetModule(name string) (interfaces.Module, error)

	AddSDListener(listener cluster.SDListener)
	GetSessionPool() session.SessionPool
}

Pitaya App interface

var DefaultApp Pitaya

func NewDefaultApp

func NewDefaultApp(isFrontend bool, serverType string, serverMode ServerMode, serverMetadata map[string]string, config config.BuilderConfig) Pitaya

NewDefaultApp returns a default pitaya app instance

type PitayaBuilder

type PitayaBuilder interface {
	Build() Pitaya
}

PitayaBuilder Builder interface

type ServerMode

type ServerMode byte

ServerMode represents a server mode

const (

	// Cluster represents a server running with connection to other servers
	Cluster ServerMode
	// Standalone represents a server running without connection to other servers
	Standalone
)

Directories

Path Synopsis
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
mocks
Package mock_cluster is a generated GoMock package.
Package mock_cluster is a generated GoMock package.
conn
codec/mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
message/mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
generics
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
mocks
Package mock_protos is a generated GoMock package.
Package mock_protos is a generated GoMock package.
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
mocks
Package mock_session is a generated GoMock package.
Package mock_session is a generated GoMock package.
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL