Documentation ¶
Index ¶
- Constants
- func PackReceiptsIntoEDU(receipts []internal.Receipt) (json.RawMessage, error)
- func UnpackReceiptsFromEDU(roomID string, ephEvent json.RawMessage) (readReceipts, privateReceipts []internal.Receipt, err error)
- type AccountData
- type AccountDataChunker
- type AccountDataTable
- func (t *AccountDataTable) Insert(txn *sqlx.Tx, accDatas []AccountData) ([]AccountData, error)
- func (t *AccountDataTable) Select(txn *sqlx.Tx, userID string, eventTypes []string, roomID string) (datas []AccountData, err error)
- func (t *AccountDataTable) SelectMany(txn *sqlx.Tx, userID string, roomIDs ...string) (datas []AccountData, err error)
- func (t *AccountDataTable) SelectWithType(txn *sqlx.Tx, userID, evType string) (datas []AccountData, err error)
- type AccumulateResult
- type Accumulator
- type DeviceDataRow
- type DeviceDataTable
- type DeviceListChunker
- type DeviceListRow
- type DeviceListTable
- func (t *DeviceListTable) Select(userID, deviceID string, swap bool) (result internal.MapStringInt, err error)
- func (t *DeviceListTable) SelectTx(txn *sqlx.Tx, userID, deviceID string, swap bool) (result internal.MapStringInt, err error)
- func (t *DeviceListTable) Upsert(userID, deviceID string, deviceListChanges map[string]int) (err error)
- func (t *DeviceListTable) UpsertTx(txn *sqlx.Tx, userID, deviceID string, deviceListChanges map[string]int) (err error)
- type Event
- type EventChunker
- type EventTable
- func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int64, error)
- func (t *EventTable) LatestEventInRooms(txn *sqlx.Tx, roomIDs []string, highestNID int64) (events []Event, err error)
- func (t *EventTable) LatestEventNIDInRooms(txn *sqlx.Tx, roomIDs []string, highestNID int64) (roomToNID map[string]int64, err error)
- func (t *EventTable) Redact(txn *sqlx.Tx, roomVer string, redacteeEventIDToRedactEvent map[string]*Event) error
- func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error)
- func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (events []Event, err error)
- func (t *EventTable) SelectClosestPrevBatch(txn *sqlx.Tx, roomID string, eventNID int64) (prevBatch string, err error)
- func (t *EventTable) SelectClosestPrevBatchByID(roomID string, eventID string) (prevBatch string, err error)
- func (t *EventTable) SelectCreateEvent(txn *sqlx.Tx, roomID string) (json.RawMessage, error)
- func (t *EventTable) SelectEventNIDsWithTypeInRoom(txn *sqlx.Tx, eventType string, limit int, targetRoom string, ...) (eventNIDs []int64, err error)
- func (t *EventTable) SelectEventsWithTypeStateKey(eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
- func (t *EventTable) SelectEventsWithTypeStateKeyInRooms(roomIDs []string, eventType, stateKey string, ...) ([]Event, error)
- func (t *EventTable) SelectHighestNID() (highest int64, err error)
- func (t *EventTable) SelectLatestEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error)
- func (t *EventTable) SelectNIDsByIDs(txn *sqlx.Tx, ids []string) (nids map[string]int64, err error)
- func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (StrippedEvents, error)
- func (t *EventTable) SelectStrippedEventsByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (StrippedEvents, error)
- func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error)
- func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error
- type InitialiseResult
- type InvitesTable
- func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error
- func (t *InvitesTable) RemoveInvite(userID, roomID string) error
- func (t *InvitesTable) RemoveSupersededInvites(txn *sqlx.Tx, roomID string, newEvents []Event) error
- func (t *InvitesTable) SelectAllInvitesForUser(userID string) (map[string][]json.RawMessage, error)
- func (t *InvitesTable) SelectInviteState(userID, roomID string) (inviteState []json.RawMessage, err error)
- type LatestEvents
- type ReceiptChunker
- type ReceiptTable
- func (t *ReceiptTable) Insert(roomID string, ephEvent json.RawMessage) (receipts []internal.Receipt, err error)
- func (t *ReceiptTable) SelectReceiptsForEvents(roomID string, eventIDs []string) (receipts []internal.Receipt, err error)
- func (t *ReceiptTable) SelectReceiptsForUser(roomIDs []string, userID string) (receiptsByRoom map[string][]internal.Receipt, err error)
- type RoomInfo
- type RoomsTable
- func (t *RoomsTable) CurrentAfterSnapshotID(txn *sqlx.Tx, roomID string) (snapshotID int64, err error)
- func (t *RoomsTable) LatestNIDs(txn *sqlx.Tx, roomIDs []string) (nids map[string]int64, err error)
- func (t *RoomsTable) SelectRoomInfos(txn *sqlx.Tx) (infos []RoomInfo, err error)
- func (t *RoomsTable) Upsert(txn *sqlx.Tx, info RoomInfo, snapshotID, latestNID int64) (err error)
- type SnapshotRow
- type SnapshotTable
- func (t *SnapshotTable) CurrentSnapshots(txn *sqlx.Tx) (map[string][]int64, error)
- func (s *SnapshotTable) Delete(txn *sqlx.Tx, snapshotIDs []int64) error
- func (s *SnapshotTable) Insert(txn *sqlx.Tx, row *SnapshotRow) error
- func (s *SnapshotTable) Select(txn *sqlx.Tx, snapshotID int64) (row SnapshotRow, err error)
- type SpaceRelation
- type SpaceRelationChunker
- type SpacesTable
- func (t *SpacesTable) BulkDelete(txn *sqlx.Tx, relations []SpaceRelation) error
- func (t *SpacesTable) BulkInsert(txn *sqlx.Tx, relations []SpaceRelation) error
- func (t *SpacesTable) HandleSpaceUpdates(txn *sqlx.Tx, events []Event) error
- func (t *SpacesTable) SelectChildren(txn *sqlx.Tx, spaces []string) (map[string][]SpaceRelation, error)
- type StartupSnapshot
- type Storage
- func (s *Storage) AccountData(userID, roomID string, eventTypes []string) (data []AccountData, err error)
- func (s *Storage) AccountDatas(userID string, roomIDs ...string) (datas []AccountData, err error)
- func (s *Storage) Accumulate(userID, roomID string, timeline sync2.TimelineResponse) (result AccumulateResult, err error)
- func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (joinedMembers map[string][]string, metadata map[string]internal.RoomMetadata, ...)
- func (s *Storage) Cleaner(n time.Duration)
- func (s *Storage) EventNIDs(eventNIDs []int64) ([]json.RawMessage, error)
- func (s *Storage) FetchMemberships(roomID string) (joins, invites, leaves []string, err error)
- func (s *Storage) GetClosestPrevBatch(roomID string, eventNID int64) (prevBatch string)
- func (s *Storage) GlobalSnapshot() (ss StartupSnapshot, err error)
- func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error)
- func (s *Storage) InsertAccountData(userID, roomID string, events []json.RawMessage) (data []AccountData, err error)
- func (s *Storage) JoinedRoomsAfterPosition(userID string, pos int64) (joinTimingByRoomID map[string]internal.EventMetadata, err error)
- func (s *Storage) LatestEventNID() (int64, error)
- func (s *Storage) LatestEventNIDInRooms(roomIDs []string, highestNID int64) (roomToNID map[string]int64, err error)
- func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error)
- func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, tempTableName string, result map[string]internal.RoomMetadata) error
- func (s *Storage) PrepareSnapshot(txn *sqlx.Tx) (tableName string, err error)
- func (s *Storage) RemoveInaccessibleStateSnapshots() error
- func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error
- func (s *Storage) RoomAccountDatasWithType(userID, eventType string) (data []AccountData, err error)
- func (s *Storage) RoomMembershipDelta(roomID string, from, to int64, limit int) (eventJSON []json.RawMessage, upTo int64, err error)
- func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []string, pos int64, ...) (roomToEvents map[string][]Event, err error)
- func (s *Storage) StateSnapshot(snapID int64) (state []json.RawMessage, err error)
- func (s *Storage) Teardown()
- func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][2]int64, error)
- type StrippedEvents
- type ToDeviceRow
- type ToDeviceRowChunker
- type ToDeviceTable
- func (t *ToDeviceTable) DeleteAllMessagesForDevice(userID, deviceID string) error
- func (t *ToDeviceTable) DeleteMessagesUpToAndIncluding(userID, deviceID string, toIncl int64) error
- func (t *ToDeviceTable) InsertMessages(userID, deviceID string, msgs []json.RawMessage) (pos int64, err error)
- func (t *ToDeviceTable) Messages(userID, deviceID string, from, limit int64) (msgs []json.RawMessage, upTo int64, err error)
- func (t *ToDeviceTable) SetUnackedPosition(userID, deviceID string, pos int64) error
- type TransactionsTable
- type TypingTable
- type UnreadTable
- func (t *UnreadTable) SelectAllNonZeroCountsForUser(userID string, ...) error
- func (t *UnreadTable) SelectUnreadCounters(userID, roomID string) (highlightCount, notificationCount int, err error)
- func (t *UnreadTable) UpdateUnreadCounters(userID, roomID string, highlightCount, notificationCount *int) error
Constants ¶
const ( BucketNew = 1 BucketSent = 2 )
const ( EventsStart = -1 EventsEnd = math.MaxInt64 - 1 )
const ( RelationMSpaceParent = 1 RelationMSpaceChild = 2 )
const ( ActionRequest = 1 ActionCancel = 2 )
const AccountDataGlobalRoom = ""
const MaxPostgresParameters = 65535
Max number of parameters in a single SQL command
Variables ¶
This section is empty.
Functions ¶
func PackReceiptsIntoEDU ¶
func PackReceiptsIntoEDU(receipts []internal.Receipt) (json.RawMessage, error)
PackReceiptsIntoEDU bundles all the receipts into a single m.receipt EDU, suitable for sending down client connections.
func UnpackReceiptsFromEDU ¶ added in v0.99.1
Types ¶
type AccountData ¶
type AccountDataChunker ¶
type AccountDataChunker []AccountData
func (AccountDataChunker) Len ¶
func (c AccountDataChunker) Len() int
type AccountDataTable ¶
type AccountDataTable struct{}
AccountDataTable stores the account data for users.
func NewAccountDataTable ¶
func NewAccountDataTable(db *sqlx.DB) *AccountDataTable
func (*AccountDataTable) Insert ¶
func (t *AccountDataTable) Insert(txn *sqlx.Tx, accDatas []AccountData) ([]AccountData, error)
Insert account data.
func (*AccountDataTable) Select ¶
func (t *AccountDataTable) Select(txn *sqlx.Tx, userID string, eventTypes []string, roomID string) (datas []AccountData, err error)
func (*AccountDataTable) SelectMany ¶
func (t *AccountDataTable) SelectMany(txn *sqlx.Tx, userID string, roomIDs ...string) (datas []AccountData, err error)
func (*AccountDataTable) SelectWithType ¶
func (t *AccountDataTable) SelectWithType(txn *sqlx.Tx, userID, evType string) (datas []AccountData, err error)
type AccumulateResult ¶ added in v0.99.11
type AccumulateResult struct { // NumNew is the number of events in timeline NIDs that were not previously known // to the proyx. NumNew int // TimelineNIDs is the list of event nids seen in a sync v2 timeline. Some of these // may already be known to the proxy. TimelineNIDs []int64 // IncludesStateRedaction is set to true when we have accumulated a redaction to a // piece of room state. IncludesStateRedaction bool }
type Accumulator ¶
type Accumulator struct {
// contains filtered or unexported fields
}
Accumulator tracks room state and timelines.
In order for it to remain simple(ish), the accumulator DOES NOT SUPPORT arbitrary timeline gaps. There is an Initialise function for new rooms (with some pre-determined state) and then a constant Accumulate function for timeline events. v2 sync must be called with a large enough timeline.limit for this to work!
func NewAccumulator ¶
func NewAccumulator(db *sqlx.DB) *Accumulator
func (*Accumulator) Accumulate ¶
func (a *Accumulator) Accumulate(txn *sqlx.Tx, userID, roomID string, timeline sync2.TimelineResponse) (AccumulateResult, error)
Accumulate internal state from a user's sync response. The timeline order MUST be in the order received from the server. Returns the number of new events in the timeline, the new timeline event NIDs or an error.
This function does several things:
- It ensures all events are persisted in the database. This is shared amongst users.
- If all events have been stored before, then it short circuits and returns. This is because we must have already processed this part of the timeline in order for the event to exist in the database, and the sync stream is already linearised for us.
- Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
- It adds entries to the membership log for membership events.
func (*Accumulator) Initialise ¶
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error)
type DeviceDataRow ¶
type DeviceDataTable ¶
type DeviceDataTable struct {
// contains filtered or unexported fields
}
func NewDeviceDataTable ¶
func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable
func (*DeviceDataTable) Select ¶
func (t *DeviceDataTable) Select(userID, deviceID string, swap bool) (result *internal.DeviceData, err error)
Atomically select the device data for this user|device and then swap DeviceLists around if set. This should only be called by the v3 HTTP APIs when servicing an E2EE extension request.
func (*DeviceDataTable) Upsert ¶
func (t *DeviceDataTable) Upsert(userID, deviceID string, keys internal.DeviceKeyData, deviceListChanges map[string]int) (err error)
Upsert combines what is in the database for this user|device with the partial entry `dd`
type DeviceListChunker ¶ added in v0.99.18
type DeviceListChunker []DeviceListRow
func (DeviceListChunker) Len ¶ added in v0.99.18
func (c DeviceListChunker) Len() int
type DeviceListRow ¶ added in v0.99.18
type DeviceListTable ¶ added in v0.99.18
type DeviceListTable struct {
// contains filtered or unexported fields
}
func NewDeviceListTable ¶ added in v0.99.18
func NewDeviceListTable(db *sqlx.DB) *DeviceListTable
func (*DeviceListTable) Select ¶ added in v0.99.18
func (t *DeviceListTable) Select(userID, deviceID string, swap bool) (result internal.MapStringInt, err error)
func (*DeviceListTable) SelectTx ¶ added in v0.99.18
func (t *DeviceListTable) SelectTx(txn *sqlx.Tx, userID, deviceID string, swap bool) (result internal.MapStringInt, err error)
Select device list changes for this client. Returns a map of user_id => change enum.
type Event ¶
type Event struct { NID int64 `db:"event_nid"` Type string `db:"event_type"` StateKey string `db:"state_key"` Membership string `db:"membership"` // whether this was part of a v2 state response and hence not part of the timeline IsState bool `db:"is_state"` // This is a snapshot ID which corresponds to some room state BEFORE this event has been applied. BeforeStateSnapshotID int64 `db:"before_state_snapshot_id"` ReplacesNID int64 `db:"event_replaces_nid"` ID string `db:"event_id"` RoomID string `db:"room_id"` // not all events include a prev batch (e.g if it was part of state not timeline, and only the first // event in a timeline has a prev_batch attached), but we'll look for the 'closest' prev batch // when returning these tokens to the caller (closest = next newest, assume clients de-dupe) PrevBatch sql.NullString `db:"prev_batch"` // stripped events will be missing this field JSON []byte `db:"event"` // MissingPrevious is true iff the previous timeline event is not known to the proxy. MissingPrevious bool `db:"missing_previous"` }
type EventChunker ¶
type EventChunker []Event
func (EventChunker) Len ¶
func (c EventChunker) Len() int
type EventTable ¶
type EventTable struct {
// contains filtered or unexported fields
}
EventTable stores events. A unique numeric ID is associated with each event.
func NewEventTable ¶
func NewEventTable(db *sqlx.DB) *EventTable
NewEventTable makes a new EventTable
func (*EventTable) Insert ¶
func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int64, error)
Insert events into the event table. Returns a map of event ID to NID for new events only. The NIDs assigned to new events will respect the order of the given events, e.g. if we insert new events A and B in that order, then NID(A) < NID(B).
func (*EventTable) LatestEventInRooms ¶
func (t *EventTable) LatestEventInRooms(txn *sqlx.Tx, roomIDs []string, highestNID int64) (events []Event, err error)
LatestEventInRooms queries the latest events in each of the room IDs given, using highestNID as the highest event.
The following query does:
- Create a list of the passed in roomIDs (`room_ids` CTE)
- Fetches the highest event_nid before or equal to $2 for each room in room_ids (`max_ev_nid` CTE)
- Fetches the latest events for each room using the data provided from room_ids and max_ev_nid (the `evs` LATERAL)
func (*EventTable) LatestEventNIDInRooms ¶ added in v0.99.3
func (t *EventTable) LatestEventNIDInRooms(txn *sqlx.Tx, roomIDs []string, highestNID int64) (roomToNID map[string]int64, err error)
LatestEventNIDInRooms queries the latest events in each of the room IDs given, using highestNID as the highest event.
The following query does:
- Create a list of the passed in roomIDs (`room_ids` CTE)
- Fetches the latest eventNIDs for each room using the data provided from room_ids (the `evs` LATERAL)
func (*EventTable) SelectByIDs ¶
func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error)
SelectByIDs fetches all events with the given event IDs from the DB as Event structs. If verifyAll is true, the function will check that each event ID has a matching event row in the database. The returned events are ordered by ascending NID; the order of the event IDs is irrelevant.
func (*EventTable) SelectByNIDs ¶
func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (events []Event, err error)
SelectByNIDs fetches events from the events table by their nids. The returned events are ordered by ascending nid; the order of the input nids is ignored. If verifyAll is true, we return an error if the number of events returned doesn't match the number of given nids.
func (*EventTable) SelectClosestPrevBatch ¶
func (t *EventTable) SelectClosestPrevBatch(txn *sqlx.Tx, roomID string, eventNID int64) (prevBatch string, err error)
Select the closest prev batch token for the provided event NID. Returns the empty string if there is no closest.
func (*EventTable) SelectClosestPrevBatchByID ¶
func (t *EventTable) SelectClosestPrevBatchByID(roomID string, eventID string) (prevBatch string, err error)
SelectClosestPrevBatchByID is the same as SelectClosestPrevBatch but works on event IDs not NIDs
func (*EventTable) SelectCreateEvent ¶ added in v0.99.9
func (t *EventTable) SelectCreateEvent(txn *sqlx.Tx, roomID string) (json.RawMessage, error)
func (*EventTable) SelectEventNIDsWithTypeInRoom ¶
func (t *EventTable) SelectEventNIDsWithTypeInRoom(txn *sqlx.Tx, eventType string, limit int, targetRoom string, lowerExclusive, upperInclusive int64) (eventNIDs []int64, err error)
Select all events matching the given event type in a room. Used to implement the room member stream (paginated room lists)
func (*EventTable) SelectEventsWithTypeStateKey ¶
func (t *EventTable) SelectEventsWithTypeStateKey(eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
Select all events between the bounds matching the type, state_key given. Used to work out which rooms the user was joined to at a given point in time.
func (*EventTable) SelectEventsWithTypeStateKeyInRooms ¶
func (t *EventTable) SelectEventsWithTypeStateKeyInRooms(roomIDs []string, eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
Select all events between the bounds matching the type, state_key given, in the rooms specified only. Used to work out which rooms the user was joined to at a given point in time.
func (*EventTable) SelectHighestNID ¶
func (t *EventTable) SelectHighestNID() (highest int64, err error)
func (*EventTable) SelectLatestEventsBetween ¶
func (*EventTable) SelectNIDsByIDs ¶
SelectNIDsByIDs does just that. Returns a map from event ID to nid, with a key-value pair for every event_id that was found in the database.
func (*EventTable) SelectStrippedEventsByIDs ¶
func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (StrippedEvents, error)
func (*EventTable) SelectStrippedEventsByNIDs ¶
func (t *EventTable) SelectStrippedEventsByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (StrippedEvents, error)
func (*EventTable) SelectUnknownEventIDs ¶ added in v0.99.2
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error)
SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB. It MUST be called within a transaction, or else will panic.
func (*EventTable) UpdateBeforeSnapshotID ¶
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error
UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
type InitialiseResult ¶ added in v0.99.2
type InitialiseResult struct { // AddedEvents is true iff this call to Initialise added new state events to the DB. AddedEvents bool // SnapshotID is the ID of the snapshot which incorporates all added events. // It has no meaning if AddedEvents is false. SnapshotID int64 // ReplacedExistingSnapshot is true when we created a new snapshot for the room and // there a pre-existing room snapshot. It has no meaning if AddedEvents is false. ReplacedExistingSnapshot bool }
type InvitesTable ¶
type InvitesTable struct {
// contains filtered or unexported fields
}
InvitesTable stores invites for each user. Originally, invites were stored with the main events in a room. We ignored stripped state and just kept the m.room.member invite event. This had many problems though:
- The room would be initialised by the invite event, causing the room to not populate with state correctly when the user joined the room.
- The user could read room data in the room without being joined to the room e.g could pull `required_state` and `timeline` as they would be authorised by the invite to see this data.
Instead, we now completely split out invites from the normal event flow. This fixes the issues outlined above but introduce more problems:
- How do you sort the invite with rooms?
- How do you calculate the room name when you lack heroes?
For now, we say that invites:
- are treated as a highlightable event for the purposes of sorting by highlight count.
- are given the timestamp of when the invite arrived.
- calculate the room name on a best-effort basis given the lack of heroes (same as element-web).
When an invite is rejected, it appears in the `leave` section which then causes the invite to be removed from this table.
func NewInvitesTable ¶
func NewInvitesTable(db *sqlx.DB) *InvitesTable
func (*InvitesTable) InsertInvite ¶
func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error
func (*InvitesTable) RemoveInvite ¶
func (t *InvitesTable) RemoveInvite(userID, roomID string) error
func (*InvitesTable) RemoveSupersededInvites ¶ added in v0.99.12
func (t *InvitesTable) RemoveSupersededInvites(txn *sqlx.Tx, roomID string, newEvents []Event) error
RemoveSupersededInvites accepts a list of events in the given room. The events should either
- contain at most one membership event per user, or else
- be in timeline order (most recent last)
(corresponding to an Accumulate and an Initialise call, respectively).
The events are scanned in order for membership changes, to determine the "final" memberships. Users who final membership is not "invite" have their outstanding invites to this room deleted.
func (*InvitesTable) SelectAllInvitesForUser ¶
func (t *InvitesTable) SelectAllInvitesForUser(userID string) (map[string][]json.RawMessage, error)
Select all invites for this user. Returns a map of room ID to invite_state (json array).
func (*InvitesTable) SelectInviteState ¶
func (t *InvitesTable) SelectInviteState(userID, roomID string) (inviteState []json.RawMessage, err error)
type LatestEvents ¶ added in v0.99.3
type LatestEvents struct { Timeline []json.RawMessage PrevBatch string LatestNID int64 }
func (*LatestEvents) DiscardIgnoredMessages ¶ added in v0.99.6
func (e *LatestEvents) DiscardIgnoredMessages(shouldIgnore func(sender string) bool)
DiscardIgnoredMessages modifies the struct in-place, replacing the Timeline with a copy that has all ignored events omitted. The order of timelines is preserved.
type ReceiptChunker ¶
func (ReceiptChunker) Len ¶
func (c ReceiptChunker) Len() int
type ReceiptTable ¶
type ReceiptTable struct {
// contains filtered or unexported fields
}
func NewReceiptTable ¶
func NewReceiptTable(db *sqlx.DB) *ReceiptTable
func (*ReceiptTable) Insert ¶
func (t *ReceiptTable) Insert(roomID string, ephEvent json.RawMessage) (receipts []internal.Receipt, err error)
Insert new receipts based on a receipt EDU Returns newly inserted receipts, or nil if there are no new receipts. These newly inserted receipts can then be sent to the API processes for live updates.
func (*ReceiptTable) SelectReceiptsForEvents ¶
func (t *ReceiptTable) SelectReceiptsForEvents(roomID string, eventIDs []string) (receipts []internal.Receipt, err error)
Select all non-private receipts for the event IDs given. Events must be in the room ID given. The parsed receipts are returned so callers can use information in the receipts in further queries e.g to pull out profile information for users read receipts. Call PackReceiptsIntoEDU when sending to clients.
func (*ReceiptTable) SelectReceiptsForUser ¶
func (t *ReceiptTable) SelectReceiptsForUser(roomIDs []string, userID string) (receiptsByRoom map[string][]internal.Receipt, err error)
Select all (including private) receipts for this user in these rooms.
type RoomsTable ¶
type RoomsTable struct{}
RoomsTable stores the current snapshot for a room.
func NewRoomsTable ¶
func NewRoomsTable(db *sqlx.DB) *RoomsTable
func (*RoomsTable) CurrentAfterSnapshotID ¶
func (t *RoomsTable) CurrentAfterSnapshotID(txn *sqlx.Tx, roomID string) (snapshotID int64, err error)
Return the snapshot for this room AFTER the latest event has been applied.
func (*RoomsTable) LatestNIDs ¶
func (*RoomsTable) SelectRoomInfos ¶
func (t *RoomsTable) SelectRoomInfos(txn *sqlx.Tx) (infos []RoomInfo, err error)
type SnapshotRow ¶
type SnapshotRow struct { SnapshotID int64 `db:"snapshot_id"` RoomID string `db:"room_id"` OtherEvents pq.Int64Array `db:"events"` MembershipEvents pq.Int64Array `db:"membership_events"` }
type SnapshotTable ¶
type SnapshotTable struct {
// contains filtered or unexported fields
}
SnapshotTable stores room state snapshots. Each snapshot has a unique numeric ID. Not every event will be associated with a snapshot.
func NewSnapshotsTable ¶
func NewSnapshotsTable(db *sqlx.DB) *SnapshotTable
func (*SnapshotTable) CurrentSnapshots ¶
func (*SnapshotTable) Delete ¶
func (s *SnapshotTable) Delete(txn *sqlx.Tx, snapshotIDs []int64) error
Delete the snapshot IDs given
func (*SnapshotTable) Insert ¶
func (s *SnapshotTable) Insert(txn *sqlx.Tx, row *SnapshotRow) error
Insert the row. Modifies SnapshotID to be the inserted primary key.
func (*SnapshotTable) Select ¶
func (s *SnapshotTable) Select(txn *sqlx.Tx, snapshotID int64) (row SnapshotRow, err error)
Select a row based on its snapshot ID.
type SpaceRelation ¶
type SpaceRelation struct { Parent string `db:"parent"` Child string `db:"child"` Relation int `db:"relation"` Ordering string `db:"ordering"` IsSuggested bool `db:"suggested"` }
func NewSpaceRelationFromEvent ¶
func NewSpaceRelationFromEvent(ev Event) (sr *SpaceRelation, isDeleted bool)
Returns a space relation from a compatible event, else nil.
func (*SpaceRelation) Key ¶
func (sr *SpaceRelation) Key() string
type SpaceRelationChunker ¶
type SpaceRelationChunker []SpaceRelation
func (SpaceRelationChunker) Len ¶
func (c SpaceRelationChunker) Len() int
type SpacesTable ¶
type SpacesTable struct{}
SpacesTable stores the space graph for all users.
func NewSpacesTable ¶
func NewSpacesTable(db *sqlx.DB) *SpacesTable
func (*SpacesTable) BulkDelete ¶
func (t *SpacesTable) BulkDelete(txn *sqlx.Tx, relations []SpaceRelation) error
Delete space relations by (parent, child, relation)
func (*SpacesTable) BulkInsert ¶
func (t *SpacesTable) BulkInsert(txn *sqlx.Tx, relations []SpaceRelation) error
Insert space relations by (parent, child, relation)
func (*SpacesTable) HandleSpaceUpdates ¶
func (t *SpacesTable) HandleSpaceUpdates(txn *sqlx.Tx, events []Event) error
func (*SpacesTable) SelectChildren ¶
func (t *SpacesTable) SelectChildren(txn *sqlx.Tx, spaces []string) (map[string][]SpaceRelation, error)
Select all children for these spaces
type StartupSnapshot ¶
type StartupSnapshot struct { GlobalMetadata map[string]internal.RoomMetadata // room_id -> metadata AllJoinedMembers map[string][]string // room_id -> [user_id] }
StartupSnapshot represents a snapshot of startup data for the sliding sync HTTP API instances
type Storage ¶
type Storage struct { Accumulator *Accumulator EventsTable *EventTable ToDeviceTable *ToDeviceTable UnreadTable *UnreadTable AccountDataTable *AccountDataTable InvitesTable *InvitesTable TransactionsTable *TransactionsTable DeviceDataTable *DeviceDataTable ReceiptTable *ReceiptTable DB *sqlx.DB MaxTimelineLimit int // contains filtered or unexported fields }
func NewStorage ¶
func NewStorageWithDB ¶ added in v0.99.4
func (*Storage) AccountData ¶
func (s *Storage) AccountData(userID, roomID string, eventTypes []string) (data []AccountData, err error)
func (*Storage) AccountDatas ¶
func (s *Storage) AccountDatas(userID string, roomIDs ...string) (datas []AccountData, err error)
Pull out all account data for this user. If roomIDs is empty, global account data is returned. If roomIDs is non-empty, all account data for these rooms are extracted.
func (*Storage) Accumulate ¶
func (s *Storage) Accumulate(userID, roomID string, timeline sync2.TimelineResponse) (result AccumulateResult, err error)
func (*Storage) AllJoinedMembers ¶
func (s *Storage) AllJoinedMembers(txn *sqlx.Tx, tempTableName string) (joinedMembers map[string][]string, metadata map[string]internal.RoomMetadata, err error)
Extract all rooms with joined members, and include the joined user list. Requires a prepared snapshot in order to be called. Populates the join/invite count and heroes for the returned metadata.
func (*Storage) EventNIDs ¶
func (s *Storage) EventNIDs(eventNIDs []int64) ([]json.RawMessage, error)
EventNIDs fetches the raw JSON form of events given a slice of eventNIDs. The events are returned in ascending NID order; the order of eventNIDs is ignored.
func (*Storage) FetchMemberships ¶ added in v0.99.12
FetchMemberships looks up the latest snapshot for the given room and determines the latest membership events in the room. Returns
- the list of joined members,
- the list of invited members, and then
- the list of all other memberships. (This is called "leaves", but includes bans. It also includes knocks, but the proxy doesn't support those.)
Each lists' members are arranged in no particular order.
TODO: there is a very similar query in ResetMetadataState which also selects events events row for memberships. It is a shame to have to do this twice---can we query once and pass the data around?
func (*Storage) GetClosestPrevBatch ¶ added in v0.99.13
func (*Storage) GlobalSnapshot ¶
func (s *Storage) GlobalSnapshot() (ss StartupSnapshot, err error)
GlobalSnapshot snapshots the entire database for the purposes of initialising a sliding sync instance. It will atomically grab metadata for all rooms and all joined members in a single transaction.
func (*Storage) Initialise ¶
func (s *Storage) Initialise(roomID string, state []json.RawMessage) (InitialiseResult, error)
func (*Storage) InsertAccountData ¶
func (s *Storage) InsertAccountData(userID, roomID string, events []json.RawMessage) (data []AccountData, err error)
func (*Storage) JoinedRoomsAfterPosition ¶
func (s *Storage) JoinedRoomsAfterPosition(userID string, pos int64) ( joinTimingByRoomID map[string]internal.EventMetadata, err error, )
Returns a map from joined room IDs to EventMetadata, which is nil iff a non-nil error is returned.
func (*Storage) LatestEventNID ¶
func (*Storage) LatestEventNIDInRooms ¶ added in v0.99.5
func (*Storage) LatestEventsInRooms ¶
func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error)
LatestEventsInRooms returns the most recent events - in the given rooms - that the user has permission to see - with NIDs <= `to`. Up to `limit` events are chosen per room. This limit be itself be limited according to MaxTimelineLimit.
func (*Storage) MetadataForAllRooms ¶
func (s *Storage) MetadataForAllRooms(txn *sqlx.Tx, tempTableName string, result map[string]internal.RoomMetadata) error
Extract hero info for all rooms. Requires a prepared snapshot in order to be called.
func (*Storage) PrepareSnapshot ¶ added in v0.99.3
Prepare a snapshot of the database for calling snapshot functions.
func (*Storage) RemoveInaccessibleStateSnapshots ¶ added in v0.99.16
Remove state snapshots which cannot be accessed by clients. The latest MaxTimelineEvents snapshots must be kept, +1 for the current state. This handles the worst case where all MaxTimelineEvents are state events and hence each event makes a new snapshot. We can safely delete all snapshots older than this, as it's not possible to reach this snapshot as the proxy does not handle historical state (deferring to the homeserver for that).
func (*Storage) ResetMetadataState ¶ added in v0.99.11
func (s *Storage) ResetMetadataState(metadata *internal.RoomMetadata) error
ResetMetadataState updates the given metadata in-place to reflect the current state of the room. This is only safe to call from the subscriber goroutine; it is not safe to call from the connection goroutines. TODO: could have this create a new RoomMetadata and get the caller to assign it.
func (*Storage) RoomAccountDatasWithType ¶
func (s *Storage) RoomAccountDatasWithType(userID, eventType string) (data []AccountData, err error)
func (*Storage) RoomMembershipDelta ¶
func (*Storage) RoomStateAfterEventPosition ¶
func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []string, pos int64, eventTypesToStateKeys map[string][]string) (roomToEvents map[string][]Event, err error)
Look up room state after the given event position and no further. eventTypesToStateKeys is a map of event type to a list of state keys for that event type. If the list of state keys is empty then all events matching that event type will be returned. If the map is empty entirely, then all room state will be returned.
func (*Storage) StateSnapshot ¶
func (s *Storage) StateSnapshot(snapID int64) (state []json.RawMessage, err error)
func (*Storage) VisibleEventNIDsBetween ¶
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][2]int64, error)
Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position, this function returns a map of room ID to a 2-element from|to positions. These positions are all INCLUSIVE, and the client should be informed of these events at some point. For example:
Stream Positions 1 2 3 4 5 6 7 8 9 10 Room A Maj E E E Room B E Maj E Room C E Mal E (a already joined to this room at position 0) E=message event, M=membership event, followed by user letter, followed by 'i' or 'j' or 'l' for invite|join|leave - For Room A: from=1, to=10, returns { RoomA: [ 1,10 ]} (tests events in joined room) - For Room B: from=1, to=10, returns { RoomB: [ 5,10 ]} (tests joining a room starts events) - For Room C: from=1, to=10, returns { RoomC: [ 0,9 ]} (tests leaving a room stops events)
In cases where a user joins/leaves a room multiple times in the nid range, only the last range is returned. This is critical to ensure we don't skip out timeline events due to history visibility (which the proxy defers to the upstream HS for). See https://github.com/matrix-org/sliding-sync/issues/365 for what happens if we returned all ranges.
Stream Positions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Room D Maj E Mal E Maj E Mal E Room E E Mai E E Maj E E - For Room D: from=1, to=15 returns { RoomD: [ 8,10 ] } (tests multi-join/leave) - For Room E: from=1, to=15 returns { RoomE: [ 13,15 ] } (tests invites)
type StrippedEvents ¶
type StrippedEvents []Event
func (StrippedEvents) NIDs ¶
func (se StrippedEvents) NIDs() (membershipNIDs, otherNIDs []int64)
type ToDeviceRow ¶
type ToDeviceRowChunker ¶
type ToDeviceRowChunker []ToDeviceRow
func (ToDeviceRowChunker) Len ¶
func (c ToDeviceRowChunker) Len() int
type ToDeviceTable ¶
type ToDeviceTable struct {
// contains filtered or unexported fields
}
ToDeviceTable stores to_device messages for devices.
func NewToDeviceTable ¶
func NewToDeviceTable(db *sqlx.DB) *ToDeviceTable
func (*ToDeviceTable) DeleteAllMessagesForDevice ¶ added in v0.99.2
func (t *ToDeviceTable) DeleteAllMessagesForDevice(userID, deviceID string) error
func (*ToDeviceTable) DeleteMessagesUpToAndIncluding ¶
func (t *ToDeviceTable) DeleteMessagesUpToAndIncluding(userID, deviceID string, toIncl int64) error
func (*ToDeviceTable) InsertMessages ¶
func (t *ToDeviceTable) InsertMessages(userID, deviceID string, msgs []json.RawMessage) (pos int64, err error)
func (*ToDeviceTable) Messages ¶
func (t *ToDeviceTable) Messages(userID, deviceID string, from, limit int64) (msgs []json.RawMessage, upTo int64, err error)
Messages fetches up to `limit` to-device messages for this device, starting from and excluding `from`. Returns the fetches messages ordered by ascending position, as well as the position of the last to-device message fetched.
func (*ToDeviceTable) SetUnackedPosition ¶
func (t *ToDeviceTable) SetUnackedPosition(userID, deviceID string, pos int64) error
type TransactionsTable ¶
type TransactionsTable struct {
// contains filtered or unexported fields
}
func NewTransactionsTable ¶
func NewTransactionsTable(db *sqlx.DB) *TransactionsTable
type TypingTable ¶
type TypingTable struct {
// contains filtered or unexported fields
}
TypingTable stores who is currently typing TODO: If 2 users are in the same room and 1 is on a laggy synchotron, we'll flip flop who is typing with live / stale data. Maybe do this per user per room?
func NewTypingTable ¶
func NewTypingTable(db *sqlx.DB) *TypingTable
func (*TypingTable) SelectHighestID ¶
func (t *TypingTable) SelectHighestID() (id int64, err error)
type UnreadTable ¶
type UnreadTable struct {
// contains filtered or unexported fields
}
UnreadTable stores unread counts per-user
func NewUnreadTable ¶
func NewUnreadTable(db *sqlx.DB) *UnreadTable
func (*UnreadTable) SelectAllNonZeroCountsForUser ¶
func (t *UnreadTable) SelectAllNonZeroCountsForUser(userID string, callback func(roomID string, highlightCount, notificationCount int)) error
func (*UnreadTable) SelectUnreadCounters ¶
func (t *UnreadTable) SelectUnreadCounters(userID, roomID string) (highlightCount, notificationCount int, err error)
func (*UnreadTable) UpdateUnreadCounters ¶
func (t *UnreadTable) UpdateUnreadCounters(userID, roomID string, highlightCount, notificationCount *int) error