Documentation ¶
Index ¶
- Constants
- func DecodeEvent(encodingType internal.UserDataEncType, eventType api.EventType, message []byte) (proto.Message, error)
- func DecodeRealtime(encodingType internal.UserDataEncType, message []byte) (*api.RealTimeMessage, error)
- func EncodeAsJSON(event proto.Message) ([]byte, error)
- func EncodeAsMsgPack(data interface{}) ([]byte, error)
- func EncodeEvent(encodingType internal.UserDataEncType, event proto.Message) ([]byte, error)
- func EncodeEventAsMsgPack(event proto.Message) ([]byte, error)
- func EncodeRealtime(encodingType internal.UserDataEncType, msg *api.RealTimeMessage) ([]byte, error)
- func EncodeStreamMD(md *StreamMessageMD) ([]byte, error)
- func JsonByteToMsgPack(data []byte) ([]byte, error)
- func NewEventDataFromMessage(encType internal.UserDataEncType, clientId string, socketId string, ...) (*internal.StreamData, error)
- func NewMessageData(encType internal.UserDataEncType, clientId string, socketId string, ...) (*internal.StreamData, error)
- func NewPresenceData(encType internal.UserDataEncType, clientId string, socketId string, ...) (*internal.StreamData, error)
- func SanitizeUserData(toEnc internal.UserDataEncType, data *internal.StreamData) ([]byte, error)
- func SendReply(conn *websocket.Conn, encType internal.UserDataEncType, ...) error
- type Channel
- func (ch *Channel) Close(ctx context.Context)
- func (ch *Channel) DisconnectWatcher(watcher string)
- func (ch *Channel) GetWatcher(ctx context.Context, watcherName string, resume string) (*ChannelWatcher, error)
- func (ch *Channel) ListWatchers() []string
- func (ch *Channel) Name() string
- func (ch *Channel) PublishMessage(ctx context.Context, data *internal.StreamData) (string, error)
- func (ch *Channel) PublishPresence(ctx context.Context, data *internal.StreamData) (string, error)
- func (ch *Channel) Read(ctx context.Context, pos string) (*cache.StreamMessages, bool, error)
- func (ch *Channel) StopWatcher(watcher string)
- type ChannelFactory
- func (factory *ChannelFactory) CreateChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)
- func (factory *ChannelFactory) DeleteChannel(ctx context.Context, ch *Channel)
- func (factory *ChannelFactory) GetChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)
- func (factory *ChannelFactory) GetOrCreateChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)
- func (factory *ChannelFactory) ListChannels(ctx context.Context, tenantId uint32, projId uint32, prefix string) ([]string, error)
- type ChannelRunner
- func (runner *ChannelRunner) Run(ctx context.Context, tenant *metadata.Tenant) (Response, error)
- func (runner *ChannelRunner) SetChannelReq(req *api.GetRTChannelRequest)
- func (runner *ChannelRunner) SetChannelsReq(req *api.GetRTChannelsRequest)
- func (runner *ChannelRunner) SetListSubscriptionsReq(req *api.ListSubscriptionRequest)
- type ChannelWatcher
- type ConnectionParams
- type DevicePusher
- type HeartbeatFactory
- type HeartbeatTable
- type MessagesRunner
- type RTMRunner
- type RTMRunnerFactory
- type ReadMessagesRunner
- type Response
- type Session
- func (session *Session) Close() error
- func (session *Session) IsActive() bool
- func (session *Session) OnClose(code int, _ string) error
- func (session *Session) OnPing(_ string) error
- func (session *Session) OnPong(_ string) error
- func (session *Session) SendConnSuccess() error
- func (session *Session) Start(ctx context.Context) error
- type Sessions
- func (s *Sessions) AddDevice(ctx context.Context, conn *websocket.Conn, params ConnectionParams) (*Session, error)
- func (s *Sessions) CreateDeviceSession(ctx context.Context, conn *websocket.Conn, params ConnectionParams) (*Session, error)
- func (s *Sessions) ExecuteRunner(ctx context.Context, runner RTMRunner) (Response, error)
- func (s *Sessions) RemoveDevice(_ context.Context, session *Session)
- func (s *Sessions) TrackSessions()
- type StreamMessageMD
- type Streaming
- type Watch
Constants ¶
const ( PresenceChannelData = "presence" MessageChannelData = "message" )
Variables ¶
This section is empty.
Functions ¶
func DecodeEvent ¶
func DecodeRealtime ¶
func DecodeRealtime(encodingType internal.UserDataEncType, message []byte) (*api.RealTimeMessage, error)
func EncodeAsMsgPack ¶
func EncodeEvent ¶
func EncodeRealtime ¶
func EncodeRealtime(encodingType internal.UserDataEncType, msg *api.RealTimeMessage) ([]byte, error)
func EncodeStreamMD ¶
func EncodeStreamMD(md *StreamMessageMD) ([]byte, error)
func JsonByteToMsgPack ¶
func NewEventDataFromMessage ¶
func NewEventDataFromMessage(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.Message) (*internal.StreamData, error)
func NewMessageData ¶
func NewMessageData(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.MessageEvent) (*internal.StreamData, error)
func NewPresenceData ¶
func NewPresenceData(encType internal.UserDataEncType, clientId string, socketId string, eventName string, msg *api.MessageEvent) (*internal.StreamData, error)
func SanitizeUserData ¶
func SanitizeUserData(toEnc internal.UserDataEncType, data *internal.StreamData) ([]byte, error)
SanitizeUserData is an optimization so that we can store received raw data and return as-is in case encoding that is used during writing is same as during reading. In case encoding changed then this method is doing a conversion. A typical case of needing this conversion is when the message is published through websocket and encoding used was msgpack and then these messages were read back using HTTP APIs in that case we need to convert from msgpack to JSON.
Types ¶
type Channel ¶
func (*Channel) DisconnectWatcher ¶
DisconnectWatcher ToDo: call it during leave.
func (*Channel) GetWatcher ¶
func (*Channel) ListWatchers ¶
func (*Channel) PublishMessage ¶
func (*Channel) PublishPresence ¶
func (*Channel) StopWatcher ¶
type ChannelFactory ¶
func NewChannelFactory ¶
func NewChannelFactory(cache cache.Cache, encoder metadata.CacheEncoder, heartbeatF *HeartbeatFactory) *ChannelFactory
func (*ChannelFactory) CreateChannel ¶
func (factory *ChannelFactory) CreateChannel(ctx context.Context, tenantId uint32, projId uint32, channelName string) (*Channel, error)
CreateChannel will throw an error if stream already exists. Use CreateOrGet to create if not exists primitive.
func (*ChannelFactory) DeleteChannel ¶
func (factory *ChannelFactory) DeleteChannel(ctx context.Context, ch *Channel)
func (*ChannelFactory) GetChannel ¶
func (*ChannelFactory) GetOrCreateChannel ¶
func (*ChannelFactory) ListChannels ¶
type ChannelRunner ¶
type ChannelRunner struct {
// contains filtered or unexported fields
}
func (*ChannelRunner) SetChannelReq ¶
func (runner *ChannelRunner) SetChannelReq(req *api.GetRTChannelRequest)
func (*ChannelRunner) SetChannelsReq ¶
func (runner *ChannelRunner) SetChannelsReq(req *api.GetRTChannelsRequest)
func (*ChannelRunner) SetListSubscriptionsReq ¶
func (runner *ChannelRunner) SetListSubscriptionsReq(req *api.ListSubscriptionRequest)
type ChannelWatcher ¶
type ChannelWatcher struct {
// contains filtered or unexported fields
}
ChannelWatcher is to watch events for a single channel. It accepts a watch that will be notified when a new event is read from the stream. As ChannelWatcher is mapped to a consumer group on a stream therefore the state is restored from the cache during restart which means a watcher is only created if it doesn’t exist otherwise the existing one is returned.
func CreateWatcher ¶
func (*ChannelWatcher) Disconnect ¶
func (watcher *ChannelWatcher) Disconnect()
func (*ChannelWatcher) StartWatching ¶
func (watcher *ChannelWatcher) StartWatching(watch Watch)
func (*ChannelWatcher) Stop ¶
func (watcher *ChannelWatcher) Stop()
type ConnectionParams ¶
type ConnectionParams struct { ProjectName string SessionId string Position string Encoding string }
ConnectionParams do we need serial here as well?
func (ConnectionParams) ToEncodingType ¶
func (params ConnectionParams) ToEncodingType() internal.UserDataEncType
type DevicePusher ¶
type DevicePusher struct {
// contains filtered or unexported fields
}
func NewDevicePusher ¶
func NewDevicePusher(session *Session, channel string) *DevicePusher
func (*DevicePusher) Watch ¶
func (pusher *DevicePusher) Watch(events *cache.StreamMessages, err error) ([]string, error)
type HeartbeatFactory ¶
func NewHeartbeatFactory ¶
func NewHeartbeatFactory(cache cache.Cache, encoder metadata.CacheEncoder) *HeartbeatFactory
func (*HeartbeatFactory) GetHeartbeatTable ¶
func (factory *HeartbeatFactory) GetHeartbeatTable(tenantId uint32, projectId uint32) *HeartbeatTable
type HeartbeatTable ¶
type HeartbeatTable struct {
// contains filtered or unexported fields
}
func NewHeartbeat ¶
func NewHeartbeat(cache cache.Cache, tableName string) *HeartbeatTable
func (*HeartbeatTable) GroupsExpired ¶
func (h *HeartbeatTable) GroupsExpired(groupsName []string) bool
func (*HeartbeatTable) Ping ¶
func (h *HeartbeatTable) Ping(sessionId string) error
type MessagesRunner ¶
type MessagesRunner struct {
// contains filtered or unexported fields
}
MessagesRunner is to publish messages to a channel.
type RTMRunner ¶
RTMRunner is used to run the realtime HTTP APIs related to channel like accessing a channel, subscribing to a channel, etc.
type RTMRunnerFactory ¶
type RTMRunnerFactory struct {
// contains filtered or unexported fields
}
func NewRTMRunnerFactory ¶
func NewRTMRunnerFactory(cache cache.Cache, factory *ChannelFactory) *RTMRunnerFactory
NewRTMRunnerFactory returns RTMRunnerFactory object.
func (*RTMRunnerFactory) GetChannelRunner ¶
func (f *RTMRunnerFactory) GetChannelRunner() *ChannelRunner
func (*RTMRunnerFactory) GetMessagesRunner ¶
func (f *RTMRunnerFactory) GetMessagesRunner(r *api.MessagesRequest) *MessagesRunner
func (*RTMRunnerFactory) GetReadMessagesRunner ¶
func (f *RTMRunnerFactory) GetReadMessagesRunner(r *api.ReadMessagesRequest, streaming Streaming) *ReadMessagesRunner
type ReadMessagesRunner ¶
type ReadMessagesRunner struct {
// contains filtered or unexported fields
}
type Session ¶
func (*Session) SendConnSuccess ¶
type Sessions ¶
func NewSessionMgr ¶
func NewSessionMgr(cache cache.Cache, tenantMgr *metadata.TenantManager, txMgr *transaction.Manager, heartbeatF *HeartbeatFactory, factory *ChannelFactory) *Sessions
func (*Sessions) CreateDeviceSession ¶
func (*Sessions) ExecuteRunner ¶
func (*Sessions) RemoveDevice ¶
func (*Sessions) TrackSessions ¶
func (s *Sessions) TrackSessions()
type StreamMessageMD ¶
type StreamMessageMD struct { ClientId string SocketId string // DataType is the type of data it is storing for example "presence" or "message" data DataType string // EventName is the named identifier of this message like in case of presence "enter"/"left", etc EventName string }
func DecodeStreamMD ¶
func DecodeStreamMD(enc []byte) (*StreamMessageMD, error)
func NewStreamMessageMD ¶
func NewStreamMessageMD(dataType string, clientId string, socketId string, eventName string) *StreamMessageMD
type Streaming ¶
type Streaming interface { api.Realtime_ReadMessagesServer }