streams

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8

The maximum number of tasks that can be queued in total before backpressure will build up and the rests will start to block.

View Source
const PDU_STREAM_WORKERS = 256

The max number of per-room goroutines to have running. Too high and this will consume lots of CPU, too low and complete sync responses will take longer to process.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccountDataStreamProvider

type AccountDataStreamProvider struct {
	DefaultStreamProvider
	// contains filtered or unexported fields
}

func (*AccountDataStreamProvider) CompleteSync

func (*AccountDataStreamProvider) IncrementalSync

func (*AccountDataStreamProvider) Setup

type DefaultStreamProvider

type DefaultStreamProvider struct {
	DB storage.Database
	// contains filtered or unexported fields
}

func (*DefaultStreamProvider) Advance

func (p *DefaultStreamProvider) Advance(
	latest types.StreamPosition,
)

func (*DefaultStreamProvider) LatestPosition

func (p *DefaultStreamProvider) LatestPosition(
	ctx context.Context,
) types.StreamPosition

func (*DefaultStreamProvider) Setup

type DeviceListStreamProvider

type DeviceListStreamProvider struct {
	DefaultStreamProvider
	// contains filtered or unexported fields
}

func (*DeviceListStreamProvider) CompleteSync

func (*DeviceListStreamProvider) IncrementalSync

type InviteStreamProvider

type InviteStreamProvider struct {
	DefaultStreamProvider
	// contains filtered or unexported fields
}

func (*InviteStreamProvider) CompleteSync

func (*InviteStreamProvider) IncrementalSync

func (*InviteStreamProvider) Setup

func (p *InviteStreamProvider) Setup(
	ctx context.Context, snapshot storage.DatabaseTransaction,
)

type NotificationDataStreamProvider

type NotificationDataStreamProvider struct {
	DefaultStreamProvider
}

func (*NotificationDataStreamProvider) CompleteSync

func (*NotificationDataStreamProvider) IncrementalSync

func (*NotificationDataStreamProvider) Setup

type PDUStreamProvider

type PDUStreamProvider struct {
	DefaultStreamProvider
	// contains filtered or unexported fields
}

func (*PDUStreamProvider) CompleteSync

func (*PDUStreamProvider) IncrementalSync

func (p *PDUStreamProvider) IncrementalSync(
	ctx context.Context,
	snapshot storage.DatabaseTransaction,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) (newPos types.StreamPosition)

func (*PDUStreamProvider) Setup

func (p *PDUStreamProvider) Setup(
	ctx context.Context, snapshot storage.DatabaseTransaction,
)

type PresenceStreamProvider

type PresenceStreamProvider struct {
	DefaultStreamProvider
	// contains filtered or unexported fields
}

func (*PresenceStreamProvider) CompleteSync

func (*PresenceStreamProvider) IncrementalSync

func (*PresenceStreamProvider) Setup

type ReceiptMRead

type ReceiptMRead struct {
	User map[string]ReceiptTS `json:"m.read"`
}

type ReceiptStreamProvider

type ReceiptStreamProvider struct {
	DefaultStreamProvider
}

func (*ReceiptStreamProvider) CompleteSync

func (*ReceiptStreamProvider) IncrementalSync

func (*ReceiptStreamProvider) Setup

type ReceiptTS

type ReceiptTS struct {
	TS spec.Timestamp `json:"ts"`
}

type SendToDeviceStreamProvider

type SendToDeviceStreamProvider struct {
	DefaultStreamProvider
}

func (*SendToDeviceStreamProvider) CompleteSync

func (*SendToDeviceStreamProvider) IncrementalSync

func (*SendToDeviceStreamProvider) Setup

type StreamProvider

type StreamProvider interface {
	Setup(ctx context.Context, snapshot storage.DatabaseTransaction)

	// Advance will update the latest position of the stream based on
	// an update and will wake callers waiting on StreamNotifyAfter.
	Advance(latest types.StreamPosition)

	// CompleteSync will update the response to include all updates as needed
	// for a complete sync. It will always return immediately.
	CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest) types.StreamPosition

	// IncrementalSync will update the response to include all updates between
	// the from and to sync positions. It will always return immediately,
	// making no changes if the range contains no updates.
	IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition

	// LatestPosition returns the latest stream position for this stream.
	LatestPosition(ctx context.Context) types.StreamPosition
}

type Streams

type Streams struct {
	PDUStreamProvider              StreamProvider
	TypingStreamProvider           StreamProvider
	ReceiptStreamProvider          StreamProvider
	InviteStreamProvider           StreamProvider
	SendToDeviceStreamProvider     StreamProvider
	AccountDataStreamProvider      StreamProvider
	DeviceListStreamProvider       StreamProvider
	NotificationDataStreamProvider StreamProvider
	PresenceStreamProvider         StreamProvider
}

func NewSyncStreamProviders

func NewSyncStreamProviders(
	d storage.Database, userAPI userapi.SyncUserAPI,
	rsAPI rsapi.SyncRoomserverAPI,
	eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams

func (*Streams) Latest

func (s *Streams) Latest(ctx context.Context) types.StreamingToken

type TypingStreamProvider

type TypingStreamProvider struct {
	DefaultStreamProvider
	EDUCache *caching.EDUCache
}

func (*TypingStreamProvider) CompleteSync

func (*TypingStreamProvider) IncrementalSync

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL