Documentation ¶
Index ¶
- Constants
- type AccountDataStreamProvider
- func (p *AccountDataStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *AccountDataStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *AccountDataStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type DefaultStreamProvider
- type DeviceListStreamProvider
- type InviteStreamProvider
- func (p *InviteStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *InviteStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *InviteStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type NotificationDataStreamProvider
- func (p *NotificationDataStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *NotificationDataStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *NotificationDataStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type PDUStreamProvider
- func (p *PDUStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *PDUStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) (newPos types.StreamPosition)
- func (p *PDUStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type PresenceStreamProvider
- func (p *PresenceStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *PresenceStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *PresenceStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type ReceiptMRead
- type ReceiptStreamProvider
- func (p *ReceiptStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *ReceiptStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *ReceiptStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type ReceiptTS
- type SendToDeviceStreamProvider
- func (p *SendToDeviceStreamProvider) CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *SendToDeviceStreamProvider) IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, ...) types.StreamPosition
- func (p *SendToDeviceStreamProvider) Setup(ctx context.Context, snapshot storage.DatabaseTransaction)
- type StreamProvider
- type Streams
- type TypingStreamProvider
Constants ¶
View Source
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
The maximum number of tasks that can be queued in total before backpressure will build up and the rests will start to block.
View Source
const PDU_STREAM_WORKERS = 256
The max number of per-room goroutines to have running. Too high and this will consume lots of CPU, too low and complete sync responses will take longer to process.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountDataStreamProvider ¶
type AccountDataStreamProvider struct { DefaultStreamProvider // contains filtered or unexported fields }
func (*AccountDataStreamProvider) CompleteSync ¶
func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*AccountDataStreamProvider) IncrementalSync ¶
func (p *AccountDataStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*AccountDataStreamProvider) Setup ¶
func (p *AccountDataStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type DefaultStreamProvider ¶
func (*DefaultStreamProvider) Advance ¶
func (p *DefaultStreamProvider) Advance( latest types.StreamPosition, )
func (*DefaultStreamProvider) LatestPosition ¶
func (p *DefaultStreamProvider) LatestPosition( ctx context.Context, ) types.StreamPosition
func (*DefaultStreamProvider) Setup ¶
func (p *DefaultStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type DeviceListStreamProvider ¶
type DeviceListStreamProvider struct { DefaultStreamProvider // contains filtered or unexported fields }
func (*DeviceListStreamProvider) CompleteSync ¶
func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*DeviceListStreamProvider) IncrementalSync ¶
func (p *DeviceListStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
type InviteStreamProvider ¶
type InviteStreamProvider struct { DefaultStreamProvider // contains filtered or unexported fields }
func (*InviteStreamProvider) CompleteSync ¶
func (p *InviteStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*InviteStreamProvider) IncrementalSync ¶
func (p *InviteStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*InviteStreamProvider) Setup ¶
func (p *InviteStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type NotificationDataStreamProvider ¶
type NotificationDataStreamProvider struct {
DefaultStreamProvider
}
func (*NotificationDataStreamProvider) CompleteSync ¶
func (p *NotificationDataStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*NotificationDataStreamProvider) IncrementalSync ¶
func (p *NotificationDataStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, _ types.StreamPosition, ) types.StreamPosition
func (*NotificationDataStreamProvider) Setup ¶
func (p *NotificationDataStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type PDUStreamProvider ¶
type PDUStreamProvider struct { DefaultStreamProvider // contains filtered or unexported fields }
func (*PDUStreamProvider) CompleteSync ¶
func (p *PDUStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*PDUStreamProvider) IncrementalSync ¶
func (p *PDUStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) (newPos types.StreamPosition)
func (*PDUStreamProvider) Setup ¶
func (p *PDUStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type PresenceStreamProvider ¶
type PresenceStreamProvider struct { DefaultStreamProvider // contains filtered or unexported fields }
func (*PresenceStreamProvider) CompleteSync ¶
func (p *PresenceStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*PresenceStreamProvider) IncrementalSync ¶
func (p *PresenceStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*PresenceStreamProvider) Setup ¶
func (p *PresenceStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type ReceiptMRead ¶
type ReceiptStreamProvider ¶
type ReceiptStreamProvider struct {
DefaultStreamProvider
}
func (*ReceiptStreamProvider) CompleteSync ¶
func (p *ReceiptStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*ReceiptStreamProvider) IncrementalSync ¶
func (p *ReceiptStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*ReceiptStreamProvider) Setup ¶
func (p *ReceiptStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type SendToDeviceStreamProvider ¶
type SendToDeviceStreamProvider struct {
DefaultStreamProvider
}
func (*SendToDeviceStreamProvider) CompleteSync ¶
func (p *SendToDeviceStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*SendToDeviceStreamProvider) IncrementalSync ¶
func (p *SendToDeviceStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*SendToDeviceStreamProvider) Setup ¶
func (p *SendToDeviceStreamProvider) Setup( ctx context.Context, snapshot storage.DatabaseTransaction, )
type StreamProvider ¶
type StreamProvider interface { Setup(ctx context.Context, snapshot storage.DatabaseTransaction) // Advance will update the latest position of the stream based on // an update and will wake callers waiting on StreamNotifyAfter. Advance(latest types.StreamPosition) // CompleteSync will update the response to include all updates as needed // for a complete sync. It will always return immediately. CompleteSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest) types.StreamPosition // IncrementalSync will update the response to include all updates between // the from and to sync positions. It will always return immediately, // making no changes if the range contains no updates. IncrementalSync(ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition // LatestPosition returns the latest stream position for this stream. LatestPosition(ctx context.Context) types.StreamPosition }
type Streams ¶
type Streams struct { PDUStreamProvider StreamProvider TypingStreamProvider StreamProvider ReceiptStreamProvider StreamProvider InviteStreamProvider StreamProvider SendToDeviceStreamProvider StreamProvider AccountDataStreamProvider StreamProvider DeviceListStreamProvider StreamProvider NotificationDataStreamProvider StreamProvider PresenceStreamProvider StreamProvider }
func NewSyncStreamProviders ¶
func NewSyncStreamProviders( d storage.Database, userAPI userapi.SyncUserAPI, rsAPI rsapi.SyncRoomserverAPI, eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams
type TypingStreamProvider ¶
type TypingStreamProvider struct { DefaultStreamProvider EDUCache *caching.EDUCache }
func (*TypingStreamProvider) CompleteSync ¶
func (p *TypingStreamProvider) CompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition
func (*TypingStreamProvider) IncrementalSync ¶
func (p *TypingStreamProvider) IncrementalSync( ctx context.Context, snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
Click to show internal directories.
Click to hide internal directories.