Documentation ¶
Index ¶
- type Broadcaster
- type Broker
- type Cacheable
- type Config
- type LegacyBroker
- func (LegacyBroker) Announce() string
- func (LegacyBroker) CommitSession(sid string, session Cacheable) error
- func (LegacyBroker) FinishSession(sid string) error
- func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
- func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
- func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
- func (LegacyBroker) RestoreSession(from string) ([]byte, error)
- func (LegacyBroker) Shutdown(ctx context.Context) error
- func (LegacyBroker) Start(done chan (error)) error
- func (b *LegacyBroker) Subscribe(stream string) string
- func (b *LegacyBroker) Unsubscribe(stream string) string
- type LocalBroker
- type Memory
- func (b *Memory) Announce() string
- func (b *Memory) CommitSession(sid string, session Cacheable) error
- func (b *Memory) FinishSession(sid string) error
- func (b *Memory) GetEpoch() string
- func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
- func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
- func (b *Memory) HistoryFrom(name string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (b *Memory) HistorySince(name string, ts int64) ([]common.StreamMessage, error)
- func (b *Memory) RestoreSession(from string) ([]byte, error)
- func (b *Memory) SetEpoch(v string)
- func (b *Memory) Shutdown(ctx context.Context) error
- func (b *Memory) Start(done chan (error)) error
- func (b *Memory) Store(name string, data []byte, offset uint64, ts time.Time) (uint64, error)
- func (b *Memory) Subscribe(stream string) string
- func (b *Memory) Unsubscribe(stream string) string
- type NATS
- func (n *NATS) Announce() string
- func (n *NATS) CommitSession(sid string, session Cacheable) error
- func (n *NATS) Epoch() string
- func (n *NATS) FinishSession(sid string) error
- func (n *NATS) HandleBroadcast(msg *common.StreamMessage)
- func (n *NATS) HandleCommand(msg *common.RemoteCommandMessage)
- func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
- func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error)
- func (n *NATS) Ready(timeout ...time.Duration) error
- func (n *NATS) Reset() error
- func (n *NATS) RestoreSession(sid string) ([]byte, error)
- func (n *NATS) SetEpoch(epoch string) error
- func (n *NATS) Shutdown(ctx context.Context) error
- func (n *NATS) Start(done chan (error)) error
- func (n *NATS) Subscribe(stream string) string
- func (n *NATS) Unsubscribe(stream string) string
- type NATSOption
- type StreamsTracker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface { Broadcast(msg *common.StreamMessage) BroadcastCommand(msg *common.RemoteCommandMessage) Subscribe(stream string) Unsubscribe(stream string) }
Broadcaster is responsible for fanning-out messages to the stream clients and other nodes
type Broker ¶
type Broker interface { Start(done chan (error)) error Shutdown(ctx context.Context) error Announce() string HandleBroadcast(msg *common.StreamMessage) HandleCommand(msg *common.RemoteCommandMessage) // Registers the stream and returns its (short) unique identifier Subscribe(stream string) string // (Maybe) unregisters the stream and return its unique identifier Unsubscribe(stream string) string // Retrieves stream messages from history from the specified offset within the specified epoch HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) // Retrieves stream messages from history from the specified timestamp HistorySince(stream string, ts int64) ([]common.StreamMessage, error) // Saves session's state in cache CommitSession(sid string, session Cacheable) error // Fetches session's state from cache (by session id) RestoreSession(from string) ([]byte, error) // Marks session as finished (for cache expiration) FinishSession(sid string) error }
Broker is responsible for: - Managing streams history. - Keeping client states for recovery. - Distributing broadcasts across nodes.
type Cacheable ¶
Cacheable is an interface which a session object must implement to be stored in cache. We use interface and not require a string cache entry to be passed to avoid unnecessary dumping when broker doesn't support storing sessions.
type Config ¶
type Config struct { // Adapter name Adapter string `toml:"adapter"` // For how long to keep history in seconds HistoryTTL int64 `toml:"history_ttl"` // Max size of messages to keep in the history per stream HistoryLimit int `toml:"history_limit"` // Sessions cache TTL in seconds (after disconnect) SessionsTTL int64 `toml:"sessions_ttl"` }
type LegacyBroker ¶
type LegacyBroker struct {
// contains filtered or unexported fields
}
LegacyBroker preserves the v1 behaviour while implementing the Broker APIs. Thus, we can use it without breaking the older behaviour
func NewLegacyBroker ¶
func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker
func (LegacyBroker) Announce ¶
func (LegacyBroker) Announce() string
func (LegacyBroker) CommitSession ¶
func (LegacyBroker) CommitSession(sid string, session Cacheable) error
func (LegacyBroker) FinishSession ¶
func (LegacyBroker) FinishSession(sid string) error
func (*LegacyBroker) HandleBroadcast ¶
func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)
func (*LegacyBroker) HandleCommand ¶
func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)
func (LegacyBroker) HistoryFrom ¶
func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
func (LegacyBroker) HistorySince ¶
func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
func (LegacyBroker) RestoreSession ¶
func (LegacyBroker) RestoreSession(from string) ([]byte, error)
func (LegacyBroker) Start ¶
func (LegacyBroker) Start(done chan (error)) error
func (*LegacyBroker) Subscribe ¶
func (b *LegacyBroker) Subscribe(stream string) string
Registring streams (for granular pub/sub)
func (*LegacyBroker) Unsubscribe ¶
func (b *LegacyBroker) Unsubscribe(stream string) string
type LocalBroker ¶ added in v1.4.7
type LocalBroker interface { Start(done chan (error)) error Shutdown(ctx context.Context) error SetEpoch(epoch string) GetEpoch() string HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) HistorySince(stream string, ts int64) ([]common.StreamMessage, error) Store(stream string, msg []byte, seq uint64, ts time.Time) (uint64, error) }
LocalBroker is a single-node broker that can used to store streams data locally
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
func NewMemoryBroker ¶
func NewMemoryBroker(node Broadcaster, config *Config) *Memory
func (*Memory) FinishSession ¶
func (*Memory) HandleBroadcast ¶
func (b *Memory) HandleBroadcast(msg *common.StreamMessage)
func (*Memory) HandleCommand ¶
func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)
func (*Memory) HistoryFrom ¶
func (*Memory) HistorySince ¶
func (*Memory) Unsubscribe ¶
type NATS ¶ added in v1.4.7
type NATS struct {
// contains filtered or unexported fields
}
func NewNATSBroker ¶ added in v1.4.7
func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig, l *slog.Logger, opts ...NATSOption) *NATS
func (*NATS) CommitSession ¶ added in v1.4.7
func (*NATS) FinishSession ¶ added in v1.4.7
func (*NATS) HandleBroadcast ¶ added in v1.4.7
func (n *NATS) HandleBroadcast(msg *common.StreamMessage)
func (*NATS) HandleCommand ¶ added in v1.4.7
func (n *NATS) HandleCommand(msg *common.RemoteCommandMessage)
func (*NATS) HistoryFrom ¶ added in v1.4.7
func (*NATS) HistorySince ¶ added in v1.4.7
func (*NATS) RestoreSession ¶ added in v1.4.7
func (*NATS) Unsubscribe ¶ added in v1.4.7
type NATSOption ¶ added in v1.4.7
type NATSOption func(*NATS)
func WithNATSLocalBroker ¶ added in v1.4.7
func WithNATSLocalBroker(b LocalBroker) NATSOption
type StreamsTracker ¶
type StreamsTracker struct {
// contains filtered or unexported fields
}
func NewStreamsTracker ¶
func NewStreamsTracker() *StreamsTracker
func (*StreamsTracker) Add ¶
func (s *StreamsTracker) Add(name string) (isNew bool)
func (*StreamsTracker) Has ¶
func (s *StreamsTracker) Has(name string) bool
func (*StreamsTracker) Remove ¶
func (s *StreamsTracker) Remove(name string) (isLast bool)