streams

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2021 License: Apache-2.0 Imports: 14 Imported by: 8

Documentation

Index

Constants

View Source
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8

The maximum number of tasks that can be queued in total before backpressure will build up and the rests will start to block.

View Source
const PDU_STREAM_WORKERS = 256

The max number of per-room goroutines to have running. Too high and this will consume lots of CPU, too low and complete sync responses will take longer to process.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccountDataStreamProvider

type AccountDataStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*AccountDataStreamProvider) CompleteSync

func (*AccountDataStreamProvider) IncrementalSync

func (*AccountDataStreamProvider) Setup

func (p *AccountDataStreamProvider) Setup()

type DeviceListStreamProvider

type DeviceListStreamProvider struct {
	PartitionedStreamProvider
	// contains filtered or unexported fields
}

func (*DeviceListStreamProvider) CompleteSync

func (*DeviceListStreamProvider) IncrementalSync

func (p *DeviceListStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.LogPosition,
) types.LogPosition

type InviteStreamProvider

type InviteStreamProvider struct {
	StreamProvider
}

func (*InviteStreamProvider) CompleteSync

func (*InviteStreamProvider) IncrementalSync

func (p *InviteStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

func (*InviteStreamProvider) Setup

func (p *InviteStreamProvider) Setup()

type PDUStreamProvider

type PDUStreamProvider struct {
	StreamProvider
	// contains filtered or unexported fields
}

func (*PDUStreamProvider) CompleteSync

func (p *PDUStreamProvider) CompleteSync(
	ctx context.Context,
	req *types.SyncRequest,
) types.StreamPosition

func (*PDUStreamProvider) IncrementalSync

func (p *PDUStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) (newPos types.StreamPosition)

nolint:gocyclo

func (*PDUStreamProvider) Setup

func (p *PDUStreamProvider) Setup()

type PartitionedStreamProvider

type PartitionedStreamProvider struct {
	DB storage.Database
	// contains filtered or unexported fields
}

func (*PartitionedStreamProvider) Advance

func (p *PartitionedStreamProvider) Advance(
	latest types.LogPosition,
)

func (*PartitionedStreamProvider) LatestPosition

func (p *PartitionedStreamProvider) LatestPosition(
	ctx context.Context,
) types.LogPosition

func (*PartitionedStreamProvider) Setup

func (p *PartitionedStreamProvider) Setup()

type ReceiptStreamProvider

type ReceiptStreamProvider struct {
	StreamProvider
}

func (*ReceiptStreamProvider) CompleteSync

func (*ReceiptStreamProvider) IncrementalSync

func (p *ReceiptStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

func (*ReceiptStreamProvider) Setup

func (p *ReceiptStreamProvider) Setup()

type SendToDeviceStreamProvider

type SendToDeviceStreamProvider struct {
	StreamProvider
}

func (*SendToDeviceStreamProvider) CompleteSync

func (*SendToDeviceStreamProvider) IncrementalSync

func (*SendToDeviceStreamProvider) Setup added in v0.3.6

func (p *SendToDeviceStreamProvider) Setup()

type StreamProvider

type StreamProvider struct {
	DB storage.Database
	// contains filtered or unexported fields
}

func (*StreamProvider) Advance

func (p *StreamProvider) Advance(
	latest types.StreamPosition,
)

func (*StreamProvider) LatestPosition

func (p *StreamProvider) LatestPosition(
	ctx context.Context,
) types.StreamPosition

func (*StreamProvider) Setup

func (p *StreamProvider) Setup()

type Streams

type Streams struct {
	PDUStreamProvider          types.StreamProvider
	TypingStreamProvider       types.StreamProvider
	ReceiptStreamProvider      types.StreamProvider
	InviteStreamProvider       types.StreamProvider
	SendToDeviceStreamProvider types.StreamProvider
	AccountDataStreamProvider  types.StreamProvider
	DeviceListStreamProvider   types.PartitionedStreamProvider
}

func NewSyncStreamProviders

func NewSyncStreamProviders(
	d storage.Database, userAPI userapi.UserInternalAPI,
	rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
	eduCache *cache.EDUCache,
) *Streams

func (*Streams) Latest

func (s *Streams) Latest(ctx context.Context) types.StreamingToken

type TypingStreamProvider

type TypingStreamProvider struct {
	StreamProvider
	EDUCache *cache.EDUCache
}

func (*TypingStreamProvider) CompleteSync

func (*TypingStreamProvider) IncrementalSync

func (p *TypingStreamProvider) IncrementalSync(
	ctx context.Context,
	req *types.SyncRequest,
	from, to types.StreamPosition,
) types.StreamPosition

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL