Documentation ¶
Index ¶
- type FrameCache
- type LocalPublisher
- type ManagedChannel
- type MemoryFrameCache
- func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
- func (c *MemoryFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error)
- func (c *MemoryFrameCache) Update(ctx context.Context, orgID int64, channel string, ...) (bool, error)
- type NamespaceStream
- func (s *NamespaceStream) GetHandlerForPath(_ string) (model.ChannelHandler, error)
- func (s *NamespaceStream) OnPublish(_ context.Context, _ *user.SignedInUser, _ model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error)
- func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error)
- func (s *NamespaceStream) Push(ctx context.Context, path string, frame *data.Frame) error
- type RedisFrameCache
- func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
- func (c *RedisFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error)
- func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel string, ...) (bool, error)
- type Runner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FrameCache ¶
type FrameCache interface { // GetActiveChannels returns active managed stream channels with JSON schema. GetActiveChannels(orgID int64) (map[string]json.RawMessage, error) // GetFrame returns full JSON frame for a channel in org. GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error) // Update updates frame cache and returns true if schema changed. Update(ctx context.Context, orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error) }
FrameCache allows updating frame schema. Returns true is schema not changed.
type LocalPublisher ¶
type ManagedChannel ¶
type ManagedChannel struct { Channel string `json:"channel"` MinuteRate int64 `json:"minute_rate"` Data json.RawMessage `json:"data"` }
ManagedChannel represents a managed stream.
type MemoryFrameCache ¶
type MemoryFrameCache struct {
// contains filtered or unexported fields
}
MemoryFrameCache ...
func (*MemoryFrameCache) GetActiveChannels ¶
func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
type NamespaceStream ¶
type NamespaceStream struct {
// contains filtered or unexported fields
}
NamespaceStream holds the state of a managed stream.
func NewNamespaceStream ¶
func NewNamespaceStream(orgID int64, scope string, namespace string, publisher model.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream
NewNamespaceStream creates new NamespaceStream.
func (*NamespaceStream) GetHandlerForPath ¶
func (s *NamespaceStream) GetHandlerForPath(_ string) (model.ChannelHandler, error)
func (*NamespaceStream) OnPublish ¶
func (s *NamespaceStream) OnPublish(_ context.Context, _ *user.SignedInUser, _ model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error)
func (*NamespaceStream) OnSubscribe ¶
func (s *NamespaceStream) OnSubscribe(ctx context.Context, u *user.SignedInUser, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error)
type RedisFrameCache ¶
type RedisFrameCache struct {
// contains filtered or unexported fields
}
RedisFrameCache ...
func NewRedisFrameCache ¶
func NewRedisFrameCache(redisClient *redis.Client) *RedisFrameCache
NewRedisFrameCache ...
func (*RedisFrameCache) GetActiveChannels ¶
func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner keeps NamespaceStream per namespace.
func NewRunner ¶
func NewRunner(publisher model.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner
NewRunner creates new Runner.
func (*Runner) GetManagedChannels ¶
func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error)
func (*Runner) GetOrCreateStream ¶
func (r *Runner) GetOrCreateStream(orgID int64, scope string, namespace string) (*NamespaceStream, error)
GetOrCreateStream -- for now this will create new manager for each key. Eventually, the stream behavior will need to be configured explicitly
Click to show internal directories.
Click to hide internal directories.