input

package
v0.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package input contains the code processes new room events

Index

Constants

View Source
const MaximumMissingProcessingTime = time.Minute * 2

TODO: Does this value make sense?

Variables

This section is empty.

Functions

This section is empty.

Types

type Inputer

type Inputer struct {
	Cfg                  *config.RoomServer
	ProcessContext       *process.ProcessContext
	DB                   storage.Database
	NATSClient           *nats.Conn
	JetStream            nats.JetStreamContext
	Durable              nats.SubOpt
	ServerName           gomatrixserverlib.ServerName
	FSAPI                fedapi.RoomserverFederationAPI
	KeyRing              gomatrixserverlib.JSONVerifier
	ACLs                 *acls.ServerACLs
	InputRoomEventTopic  string
	OutputRoomEventTopic string

	Queryer *query.Queryer
	// contains filtered or unexported fields
}

Inputer is responsible for consuming from the roomserver input streams and processing the events. All input events are queued into a single NATS stream and the order is preserved strictly. The `room_id` message header will contain the room ID which will be used to assign the pending event to a per-room worker.

The input API maintains an ephemeral headers-only consumer. It will speed through the stream working out which room IDs are pending and create durable consumers for them. The durable consumer will then be used for each room worker goroutine to fetch events one by one and process them. Each room having a durable consumer of its own means there is no head-of-line blocking between rooms. Filtering ensures that each durable consumer only receives events for the room it is interested in.

The ephemeral consumer closely tracks the newest events. The per-room durable consumers will only progress through the stream as events are processed.

A BC *  -> positions of each consumer (* = ephemeral)
⌄ ⌄⌄ ⌄

ABAABCAABCAA -> newest (letter = subject for each message)

In this example, A is still processing an event but has two pending events to process afterwards. Both B and C are caught up, so they will do nothing until a new event comes in for B or C.

func (*Inputer) InputRoomEvents

func (r *Inputer) InputRoomEvents(
	ctx context.Context,
	request *api.InputRoomEventsRequest,
	response *api.InputRoomEventsResponse,
)

InputRoomEvents implements api.RoomserverInternalAPI

func (*Inputer) Start added in v0.6.0

func (r *Inputer) Start() error

Start creates an ephemeral non-durable consumer on the roomserver input topic. It is configured to deliver us headers only because we don't actually care about the contents of the message at this point, we only care about the `room_id` field. Once a message arrives, we will look to see if we have a worker for that room which has its own consumer. If we don't, we'll start one.

func (*Inputer) WriteOutputEvents

func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error

WriteOutputEvents implements OutputRoomEventWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL