managedstream

package
v0.0.0-testrgm3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2023 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FrameCache

type FrameCache interface {
	// GetActiveChannels returns active managed stream channels with JSON schema.
	GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)
	// GetFrame returns full JSON frame for a channel in org.
	GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error)
	// Update updates frame cache and returns true if schema changed.
	Update(ctx context.Context, orgID int64, channel string, frameJson data.FrameJSONCache) (bool, error)
}

FrameCache allows updating frame schema. Returns true is schema not changed.

type LocalPublisher

type LocalPublisher interface {
	PublishLocal(channel string, data []byte) error
}

type ManagedChannel

type ManagedChannel struct {
	Channel    string          `json:"channel"`
	MinuteRate int64           `json:"minute_rate"`
	Data       json.RawMessage `json:"data"`
}

ManagedChannel represents a managed stream.

type MemoryFrameCache

type MemoryFrameCache struct {
	// contains filtered or unexported fields
}

MemoryFrameCache ...

func NewMemoryFrameCache

func NewMemoryFrameCache() *MemoryFrameCache

NewMemoryFrameCache ...

func (*MemoryFrameCache) GetActiveChannels

func (c *MemoryFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)

func (*MemoryFrameCache) GetFrame

func (c *MemoryFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error)

func (*MemoryFrameCache) Update

func (c *MemoryFrameCache) Update(ctx context.Context, orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error)

type NamespaceStream

type NamespaceStream struct {
	// contains filtered or unexported fields
}

NamespaceStream holds the state of a managed stream.

func NewNamespaceStream

func NewNamespaceStream(orgID int64, scope string, namespace string, publisher model.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream

NewNamespaceStream creates new NamespaceStream.

func (*NamespaceStream) GetHandlerForPath

func (s *NamespaceStream) GetHandlerForPath(_ string) (model.ChannelHandler, error)

func (*NamespaceStream) Push

func (s *NamespaceStream) Push(ctx context.Context, path string, frame *data.Frame) error

Push sends frame to the stream and saves it for later retrieval by subscribers. * Saves the entire frame to cache. * If schema has been changed sends entire frame to channel, otherwise only data.

type RedisFrameCache

type RedisFrameCache struct {
	// contains filtered or unexported fields
}

RedisFrameCache ...

func NewRedisFrameCache

func NewRedisFrameCache(redisClient *redis.Client) *RedisFrameCache

NewRedisFrameCache ...

func (*RedisFrameCache) GetActiveChannels

func (c *RedisFrameCache) GetActiveChannels(orgID int64) (map[string]json.RawMessage, error)

func (*RedisFrameCache) GetFrame

func (c *RedisFrameCache) GetFrame(ctx context.Context, orgID int64, channel string) (json.RawMessage, bool, error)

func (*RedisFrameCache) Update

func (c *RedisFrameCache) Update(ctx context.Context, orgID int64, channel string, jsonFrame data.FrameJSONCache) (bool, error)

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner keeps NamespaceStream per namespace.

func NewRunner

func NewRunner(publisher model.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner

NewRunner creates new Runner.

func (*Runner) GetManagedChannels

func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error)

func (*Runner) GetOrCreateStream

func (r *Runner) GetOrCreateStream(orgID int64, scope string, namespace string) (*NamespaceStream, error)

GetOrCreateStream -- for now this will create new manager for each key. Eventually, the stream behavior will need to be configured explicitly

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL