Documentation ¶
Overview ¶
Package input contains the code processes new room events
Index ¶
Constants ¶
const MaximumMissingProcessingTime = time.Minute * 2
TODO: Does this value make sense?
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Inputer ¶
type Inputer struct { Cfg *config.RoomServer Base *base.BaseDendrite ProcessContext *process.ProcessContext DB storage.Database NATSClient *nats.Conn JetStream nats.JetStreamContext Durable nats.SubOpt ServerName gomatrixserverlib.ServerName FSAPI fedapi.RoomserverFederationAPI KeyRing gomatrixserverlib.JSONVerifier ACLs *acls.ServerACLs InputRoomEventTopic string OutputProducer *producers.RoomEventProducer Queryer *query.Queryer // contains filtered or unexported fields }
In this example, A is still processing an event but has two pending events to process afterwards. Both B and C are caught up, so they will do nothing until a new event comes in for B or C.
func (*Inputer) InputRoomEvents ¶
func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, )
InputRoomEvents implements api.RoomserverInternalAPI
func (*Inputer) Start ¶
Start creates an ephemeral non-durable consumer on the roomserver input topic. It is configured to deliver us headers only because we don't actually care about the contents of the message at this point, we only care about the `room_id` field. Once a message arrives, we will look to see if we have a worker for that room which has its own consumer. If we don't, we'll start one.