Documentation ¶
Index ¶
- Constants
- type AccountDataStreamProvider
- func (p *AccountDataStreamProvider) CompleteSync(ctx context.Context, req *types.SyncRequest) types.StreamPosition
- func (p *AccountDataStreamProvider) IncrementalSync(ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition
- func (p *AccountDataStreamProvider) Setup()
- type DeviceListStreamProvider
- type InviteStreamProvider
- type PDUStreamProvider
- type PartitionedStreamProvider
- type ReceiptStreamProvider
- type SendToDeviceStreamProvider
- func (p *SendToDeviceStreamProvider) CompleteSync(ctx context.Context, req *types.SyncRequest) types.StreamPosition
- func (p *SendToDeviceStreamProvider) IncrementalSync(ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition) types.StreamPosition
- func (p *SendToDeviceStreamProvider) Setup()
- type StreamProvider
- type Streams
- type TypingStreamProvider
Constants ¶
View Source
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
The maximum number of tasks that can be queued in total before backpressure will build up and the rests will start to block.
View Source
const PDU_STREAM_WORKERS = 256
The max number of per-room goroutines to have running. Too high and this will consume lots of CPU, too low and complete sync responses will take longer to process.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountDataStreamProvider ¶
type AccountDataStreamProvider struct { StreamProvider // contains filtered or unexported fields }
func (*AccountDataStreamProvider) CompleteSync ¶
func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*AccountDataStreamProvider) IncrementalSync ¶
func (p *AccountDataStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*AccountDataStreamProvider) Setup ¶
func (p *AccountDataStreamProvider) Setup()
type DeviceListStreamProvider ¶
type DeviceListStreamProvider struct { PartitionedStreamProvider // contains filtered or unexported fields }
func (*DeviceListStreamProvider) CompleteSync ¶
func (p *DeviceListStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.LogPosition
func (*DeviceListStreamProvider) IncrementalSync ¶
func (p *DeviceListStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.LogPosition, ) types.LogPosition
type InviteStreamProvider ¶
type InviteStreamProvider struct {
StreamProvider
}
func (*InviteStreamProvider) CompleteSync ¶
func (p *InviteStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*InviteStreamProvider) IncrementalSync ¶
func (p *InviteStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*InviteStreamProvider) Setup ¶
func (p *InviteStreamProvider) Setup()
type PDUStreamProvider ¶
type PDUStreamProvider struct { StreamProvider // contains filtered or unexported fields }
func (*PDUStreamProvider) CompleteSync ¶
func (p *PDUStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*PDUStreamProvider) IncrementalSync ¶
func (p *PDUStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) (newPos types.StreamPosition)
nolint:gocyclo
func (*PDUStreamProvider) Setup ¶
func (p *PDUStreamProvider) Setup()
type PartitionedStreamProvider ¶
type PartitionedStreamProvider struct { DB storage.Database // contains filtered or unexported fields }
func (*PartitionedStreamProvider) Advance ¶
func (p *PartitionedStreamProvider) Advance( latest types.LogPosition, )
func (*PartitionedStreamProvider) LatestPosition ¶
func (p *PartitionedStreamProvider) LatestPosition( ctx context.Context, ) types.LogPosition
func (*PartitionedStreamProvider) Setup ¶
func (p *PartitionedStreamProvider) Setup()
type ReceiptStreamProvider ¶
type ReceiptStreamProvider struct {
StreamProvider
}
func (*ReceiptStreamProvider) CompleteSync ¶
func (p *ReceiptStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*ReceiptStreamProvider) IncrementalSync ¶
func (p *ReceiptStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*ReceiptStreamProvider) Setup ¶
func (p *ReceiptStreamProvider) Setup()
type SendToDeviceStreamProvider ¶
type SendToDeviceStreamProvider struct {
StreamProvider
}
func (*SendToDeviceStreamProvider) CompleteSync ¶
func (p *SendToDeviceStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*SendToDeviceStreamProvider) IncrementalSync ¶
func (p *SendToDeviceStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
func (*SendToDeviceStreamProvider) Setup ¶ added in v0.3.6
func (p *SendToDeviceStreamProvider) Setup()
type StreamProvider ¶
func (*StreamProvider) Advance ¶
func (p *StreamProvider) Advance( latest types.StreamPosition, )
func (*StreamProvider) LatestPosition ¶
func (p *StreamProvider) LatestPosition( ctx context.Context, ) types.StreamPosition
func (*StreamProvider) Setup ¶
func (p *StreamProvider) Setup()
type Streams ¶
type Streams struct { PDUStreamProvider types.StreamProvider TypingStreamProvider types.StreamProvider ReceiptStreamProvider types.StreamProvider InviteStreamProvider types.StreamProvider SendToDeviceStreamProvider types.StreamProvider AccountDataStreamProvider types.StreamProvider DeviceListStreamProvider types.PartitionedStreamProvider }
func NewSyncStreamProviders ¶
func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, eduCache *cache.EDUCache, ) *Streams
type TypingStreamProvider ¶
type TypingStreamProvider struct { StreamProvider EDUCache *cache.EDUCache }
func (*TypingStreamProvider) CompleteSync ¶
func (p *TypingStreamProvider) CompleteSync( ctx context.Context, req *types.SyncRequest, ) types.StreamPosition
func (*TypingStreamProvider) IncrementalSync ¶
func (p *TypingStreamProvider) IncrementalSync( ctx context.Context, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition
Click to show internal directories.
Click to hide internal directories.