broker

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster interface {
	Broadcast(msg *common.StreamMessage)
	BroadcastCommand(msg *common.RemoteCommandMessage)
	Subscribe(stream string)
	Unsubscribe(stream string)
}

Broadcaster is responsible for fanning-out messages to the stream clients and other nodes

type Broker

type Broker interface {
	Start(done chan (error)) error
	Shutdown(ctx context.Context) error

	Announce() string

	HandleBroadcast(msg *common.StreamMessage)
	HandleCommand(msg *common.RemoteCommandMessage)

	// Registers the stream and returns its (short) unique identifier
	Subscribe(stream string) string
	// (Maybe) unregisters the stream and return its unique identifier
	Unsubscribe(stream string) string
	// Retrieves stream messages from history from the specified offset within the specified epoch
	HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
	// Retrieves stream messages from history from the specified timestamp
	HistorySince(stream string, ts int64) ([]common.StreamMessage, error)

	// Saves session's state in cache
	CommitSession(sid string, session Cacheable) error
	// Fetches session's state from cache (by session id)
	RestoreSession(from string) ([]byte, error)
	// Marks session as finished (for cache expiration)
	FinishSession(sid string) error
}

Broker is responsible for: - Managing streams history. - Keeping client states for recovery. - Distributing broadcasts across nodes.

type Cacheable

type Cacheable interface {
	ToCacheEntry() ([]byte, error)
}

Cacheable is an interface which a session object must implement to be stored in cache. We use interface and not require a string cache entry to be passed to avoid unnecessary dumping when broker doesn't support storing sessions.

type Config

type Config struct {
	// Adapter name
	Adapter string `toml:"adapter"`
	// For how long to keep history in seconds
	HistoryTTL int64 `toml:"history_ttl"`
	// Max size of messages to keep in the history per stream
	HistoryLimit int `toml:"history_limit"`
	// Sessions cache TTL in seconds (after disconnect)
	SessionsTTL int64 `toml:"sessions_ttl"`
}

func NewConfig

func NewConfig() Config

func (Config) ToToml added in v1.5.4

func (c Config) ToToml() string

type LegacyBroker

type LegacyBroker struct {
	// contains filtered or unexported fields
}

LegacyBroker preserves the v1 behaviour while implementing the Broker APIs. Thus, we can use it without breaking the older behaviour

func NewLegacyBroker

func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker

func (LegacyBroker) Announce

func (LegacyBroker) Announce() string

func (LegacyBroker) CommitSession

func (LegacyBroker) CommitSession(sid string, session Cacheable) error

func (LegacyBroker) FinishSession

func (LegacyBroker) FinishSession(sid string) error

func (*LegacyBroker) HandleBroadcast

func (b *LegacyBroker) HandleBroadcast(msg *common.StreamMessage)

func (*LegacyBroker) HandleCommand

func (b *LegacyBroker) HandleCommand(msg *common.RemoteCommandMessage)

func (LegacyBroker) HistoryFrom

func (LegacyBroker) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)

func (LegacyBroker) HistorySince

func (LegacyBroker) HistorySince(stream string, ts int64) ([]common.StreamMessage, error)

func (LegacyBroker) RestoreSession

func (LegacyBroker) RestoreSession(from string) ([]byte, error)

func (LegacyBroker) Shutdown

func (LegacyBroker) Shutdown(ctx context.Context) error

func (LegacyBroker) Start

func (LegacyBroker) Start(done chan (error)) error

func (*LegacyBroker) Subscribe

func (b *LegacyBroker) Subscribe(stream string) string

Registring streams (for granular pub/sub)

func (*LegacyBroker) Unsubscribe

func (b *LegacyBroker) Unsubscribe(stream string) string

type LocalBroker added in v1.4.7

type LocalBroker interface {
	Start(done chan (error)) error
	Shutdown(ctx context.Context) error
	SetEpoch(epoch string)
	GetEpoch() string
	HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)
	HistorySince(stream string, ts int64) ([]common.StreamMessage, error)
	Store(stream string, msg []byte, seq uint64, ts time.Time) (uint64, error)
}

LocalBroker is a single-node broker that can used to store streams data locally

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

func NewMemoryBroker

func NewMemoryBroker(node Broadcaster, config *Config) *Memory

func (*Memory) Announce

func (b *Memory) Announce() string

func (*Memory) CommitSession

func (b *Memory) CommitSession(sid string, session Cacheable) error

func (*Memory) FinishSession

func (b *Memory) FinishSession(sid string) error

func (*Memory) GetEpoch

func (b *Memory) GetEpoch() string

func (*Memory) HandleBroadcast

func (b *Memory) HandleBroadcast(msg *common.StreamMessage)

func (*Memory) HandleCommand

func (b *Memory) HandleCommand(msg *common.RemoteCommandMessage)

func (*Memory) HistoryFrom

func (b *Memory) HistoryFrom(name string, epoch string, offset uint64) ([]common.StreamMessage, error)

func (*Memory) HistorySince

func (b *Memory) HistorySince(name string, ts int64) ([]common.StreamMessage, error)

func (*Memory) RestoreSession

func (b *Memory) RestoreSession(from string) ([]byte, error)

func (*Memory) SetEpoch

func (b *Memory) SetEpoch(v string)

func (*Memory) Shutdown

func (b *Memory) Shutdown(ctx context.Context) error

func (*Memory) Start

func (b *Memory) Start(done chan (error)) error

func (*Memory) Store added in v1.4.7

func (b *Memory) Store(name string, data []byte, offset uint64, ts time.Time) (uint64, error)

func (*Memory) Subscribe

func (b *Memory) Subscribe(stream string) string

func (*Memory) Unsubscribe

func (b *Memory) Unsubscribe(stream string) string

type NATS added in v1.4.7

type NATS struct {
	// contains filtered or unexported fields
}

func NewNATSBroker added in v1.4.7

func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig, l *slog.Logger, opts ...NATSOption) *NATS

func (*NATS) Announce added in v1.4.7

func (n *NATS) Announce() string

func (*NATS) CommitSession added in v1.4.7

func (n *NATS) CommitSession(sid string, session Cacheable) error

func (*NATS) Epoch added in v1.4.7

func (n *NATS) Epoch() string

func (*NATS) FinishSession added in v1.4.7

func (n *NATS) FinishSession(sid string) error

func (*NATS) HandleBroadcast added in v1.4.7

func (n *NATS) HandleBroadcast(msg *common.StreamMessage)

func (*NATS) HandleCommand added in v1.4.7

func (n *NATS) HandleCommand(msg *common.RemoteCommandMessage)

func (*NATS) HistoryFrom added in v1.4.7

func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error)

func (*NATS) HistorySince added in v1.4.7

func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error)

func (*NATS) Ready added in v1.4.7

func (n *NATS) Ready(timeout ...time.Duration) error

func (*NATS) Reset added in v1.4.7

func (n *NATS) Reset() error

func (*NATS) RestoreSession added in v1.4.7

func (n *NATS) RestoreSession(sid string) ([]byte, error)

func (*NATS) SetEpoch added in v1.4.7

func (n *NATS) SetEpoch(epoch string) error

func (*NATS) Shutdown added in v1.4.7

func (n *NATS) Shutdown(ctx context.Context) error

func (*NATS) Start added in v1.4.7

func (n *NATS) Start(done chan (error)) error

Write Broker implementtaion here

func (*NATS) Subscribe added in v1.4.7

func (n *NATS) Subscribe(stream string) string

func (*NATS) Unsubscribe added in v1.4.7

func (n *NATS) Unsubscribe(stream string) string

type NATSOption added in v1.4.7

type NATSOption func(*NATS)

func WithNATSLocalBroker added in v1.4.7

func WithNATSLocalBroker(b LocalBroker) NATSOption

type StreamsTracker

type StreamsTracker struct {
	// contains filtered or unexported fields
}

func NewStreamsTracker

func NewStreamsTracker() *StreamsTracker

func (*StreamsTracker) Add

func (s *StreamsTracker) Add(name string) (isNew bool)

func (*StreamsTracker) Has

func (s *StreamsTracker) Has(name string) bool

func (*StreamsTracker) Remove

func (s *StreamsTracker) Remove(name string) (isLast bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL