Documentation
¶
Index ¶
- type Broadcaster
- type Broker
- type Cacheable
- type Config
- type LegacyBroker
- func (LegacyBroker) Announce() string
- func (LegacyBroker) CommitSession(sid string, session Cacheable) error
- func (LegacyBroker) FinishSession(sid string) error
- func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
- func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
- func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
- func (LegacyBroker) RestoreSession(from string) ([]byte, error)
- func (LegacyBroker) Shutdown(ctx context.Context) error
- func (LegacyBroker) Start() error
- func (b *LegacyBroker) Subscribe(stream string) string
- func (b *LegacyBroker) Unsubscribe(stream string) string
- type Memory
- func (b *Memory) Announce() string
- func (b *Memory) CommitSession(sid string, session Cacheable) error
- func (b *Memory) FinishSession(sid string) error
- func (b *Memory) GetEpoch() string
- func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
- func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
- func (b *Memory) HistoryFrom(name string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (b *Memory) HistorySince(name string, ts int64) ([]common.StreamMessage, error)
- func (b *Memory) RestoreSession(from string) ([]byte, error)
- func (b *Memory) SetEpoch(v string)
- func (b *Memory) Shutdown(ctx context.Context) error
- func (b *Memory) Start() error
- func (b *Memory) Subscribe(stream string) string
- func (b *Memory) Unsubscribe(stream string) string
- type StreamsTracker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface { Broadcast(msg *common.StreamMessage) BroadcastCommand(msg *common.RemoteCommandMessage) Subscribe(stream string) Unsubscribe(stream string) }
Broadcaster is responsible for fanning-out messages to the stream clients and other nodes
type Broker ¶
type Broker interface { Start() error Shutdown(ctx context.Context) error Announce() string HandleBroadcast(msg *common.StreamMessage) HandleCommand(msg *common.RemoteCommandMessage) // Registers the stream and returns its (short) unique identifier Subscribe(stream string) string // (Maybe) unregisters the stream and return its unique identifier Unsubscribe(stream string) string // Retrieves stream messages from history from the specified offset within the specified epoch HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) // Retrieves stream messages from history from the specified timestamp HistorySince(stream string, ts int64) ([]common.StreamMessage, error) // Saves session's state in cache CommitSession(sid string, session Cacheable) error // Fetches session's state from cache (by session id) RestoreSession(from string) ([]byte, error) // Marks session as finished (for cache expiration) FinishSession(sid string) error }
Broker is responsible for: - Managing streams history. - Keeping client states for recovery. - Distributing broadcasts across nodes.
type Cacheable ¶
Cacheable is an interface which a session object must implement to be stored in cache. We use interface and not require a string cache entry to be passed to avoid unnecessary dumping when broker doesn't support storing sessions.
type Config ¶
type LegacyBroker ¶
type LegacyBroker struct {
// contains filtered or unexported fields
}
LegacyBroker preserves the v1 behaviour while implementing the Broker APIs. Thus, we can use it without breaking the older behaviour
func NewLegacyBroker ¶
func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker
func (LegacyBroker) Announce ¶
func (LegacyBroker) Announce() string
func (LegacyBroker) CommitSession ¶
func (LegacyBroker) CommitSession(sid string, session Cacheable) error
func (LegacyBroker) FinishSession ¶
func (LegacyBroker) FinishSession(sid string) error
func (*LegacyBroker) HandleBroadcast ¶
func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
func (*LegacyBroker) HandleCommand ¶
func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
func (LegacyBroker) HistoryFrom ¶
func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
func (LegacyBroker) HistorySince ¶
func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
func (LegacyBroker) RestoreSession ¶
func (LegacyBroker) RestoreSession(from string) ([]byte, error)
func (LegacyBroker) Start ¶
func (LegacyBroker) Start() error
func (*LegacyBroker) Subscribe ¶
func (b *LegacyBroker) Subscribe(stream string) string
Registring streams (for granular pub/sub)
func (*LegacyBroker) Unsubscribe ¶
func (b *LegacyBroker) Unsubscribe(stream string) string
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
func NewMemoryBroker ¶
func NewMemoryBroker(node Broadcaster, config *Config) *Memory
func (*Memory) FinishSession ¶
func (*Memory) HandleBroadcast ¶
func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
func (*Memory) HandleCommand ¶
func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
func (*Memory) HistoryFrom ¶
func (*Memory) HistorySince ¶
func (*Memory) Unsubscribe ¶
type StreamsTracker ¶
type StreamsTracker struct {
// contains filtered or unexported fields
}
func NewStreamsTracker ¶
func NewStreamsTracker() *StreamsTracker
func (*StreamsTracker) Add ¶
func (s *StreamsTracker) Add(name string) (isNew bool)
func (*StreamsTracker) Has ¶
func (s *StreamsTracker) Has(name string) bool
func (*StreamsTracker) Remove ¶
func (s *StreamsTracker) Remove(name string) (isLast bool)
Click to show internal directories.
Click to hide internal directories.