Documentation ¶
Index ¶
- Constants
- type AccountData
- type AccountDataChunker
- type AccountDataTable
- func (t *AccountDataTable) Insert(txn *sqlx.Tx, accDatas []AccountData) ([]AccountData, error)
- func (t *AccountDataTable) Select(txn *sqlx.Tx, userID, eventType, roomID string) (*AccountData, error)
- func (t *AccountDataTable) SelectMany(txn *sqlx.Tx, userID string, roomIDs ...string) (datas []AccountData, err error)
- type Accumulator
- func (a *Accumulator) Accumulate(roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, latestNID int64, err error)
- func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (eventsJSON []json.RawMessage, latest int64, err error)
- func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, error)
- type Event
- type EventChunker
- type EventTable
- func (t *EventTable) BeforeStateSnapshotIDForEventNID(txn *sqlx.Tx, roomID string, eventNID int64) (lastEventNID, replacesNID, snapID int64, err error)
- func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (int, error)
- func (t *EventTable) SelectByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (events []Event, err error)
- func (t *EventTable) SelectByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (events []Event, err error)
- func (t *EventTable) SelectClosestPrevBatch(roomID string, eventNID int64) (prevBatch string, err error)
- func (t *EventTable) SelectClosestPrevBatchByID(roomID string, eventID string) (prevBatch string, err error)
- func (t *EventTable) SelectEventNIDsWithTypeInRoom(txn *sqlx.Tx, eventType string, limit int, targetRoom string, ...) (eventNIDs []int64, err error)
- func (t *EventTable) SelectEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error)
- func (t *EventTable) SelectEventsWithTypeStateKey(eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
- func (t *EventTable) SelectEventsWithTypeStateKeyInRooms(roomIDs []string, eventType, stateKey string, ...) ([]Event, error)
- func (t *EventTable) SelectHighestNID() (highest int64, err error)
- func (t *EventTable) SelectLatestEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error)
- func (t *EventTable) SelectNIDsByIDs(txn *sqlx.Tx, ids []string) (nids []int64, err error)
- func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (StrippedEvents, error)
- func (t *EventTable) SelectStrippedEventsByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (StrippedEvents, error)
- func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error
- type InvitesTable
- type RoomsTable
- func (t *RoomsTable) CurrentAfterSnapshotID(txn *sqlx.Tx, roomID string) (snapshotID int64, err error)
- func (t *RoomsTable) SelectEncryptedRooms(txn *sqlx.Tx) (encrypted []string, err error)
- func (t *RoomsTable) SelectTombstonedRooms(txn *sqlx.Tx) (tombstones []string, err error)
- func (t *RoomsTable) UpdateCurrentAfterSnapshotID(txn *sqlx.Tx, roomID string, snapshotID int64, isEncrypted, isTombstoned bool) (err error)
- type SnapshotRow
- type SnapshotTable
- func (t *SnapshotTable) CurrentSnapshots() (map[string][]int64, error)
- func (s *SnapshotTable) Delete(txn *sqlx.Tx, snapshotIDs []int64) error
- func (s *SnapshotTable) Insert(txn *sqlx.Tx, row *SnapshotRow) error
- func (s *SnapshotTable) Select(txn *sqlx.Tx, snapshotID int64) (row SnapshotRow, err error)
- type Storage
- func (s *Storage) AccountData(userID, roomID, eventType string) (data *AccountData, err error)
- func (s *Storage) AccountDatas(userID string, roomIDs ...string) (datas []AccountData, err error)
- func (s *Storage) Accumulate(roomID, prevBatch string, timeline []json.RawMessage) (numNew int, latestNID int64, err error)
- func (s *Storage) AllJoinedMembers() (map[string][]string, error)
- func (s *Storage) Initialise(roomID string, state []json.RawMessage) (bool, error)
- func (s *Storage) InsertAccountData(userID, roomID string, events []json.RawMessage) (data []AccountData, err error)
- func (s *Storage) JoinedRoomsAfterPosition(userID string, pos int64) ([]string, error)
- func (s *Storage) LatestEventNID() (int64, error)
- func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string][]json.RawMessage, map[string]string, error)
- func (s *Storage) LatestTypingID() (int64, error)
- func (s *Storage) MetadataForAllRooms() (map[string]internal.RoomMetadata, error)
- func (s *Storage) RoomMembershipDelta(roomID string, from, to int64, limit int) (eventJSON []json.RawMessage, upTo int64, err error)
- func (s *Storage) RoomStateAfterEventPosition(roomID string, pos int64, eventTypes ...string) (events []Event, err error)
- func (s *Storage) RoomStateBeforeEventPosition(roomID string, pos int64) (events []Event, err error)
- func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error)
- type StrippedEvents
- type ToDeviceRow
- type ToDeviceRowChunker
- type ToDeviceTable
- func (t *ToDeviceTable) DeleteMessagesUpToAndIncluding(deviceID string, toIncl int64) error
- func (t *ToDeviceTable) InsertMessages(deviceID string, msgs []json.RawMessage) (pos int64, err error)
- func (t *ToDeviceTable) Messages(deviceID string, from, to, limit int64) (msgs []json.RawMessage, upTo int64, err error)
- type TypingTable
- type UnreadTable
- func (t *UnreadTable) SelectAllNonZeroCountsForUser(userID string, ...) error
- func (t *UnreadTable) SelectUnreadCounters(userID, roomID string) (highlightCount, notificationCount int, err error)
- func (t *UnreadTable) UpdateUnreadCounters(userID, roomID string, highlightCount, notificationCount *int) error
Constants ¶
const ( EventsStart = -1 EventsEnd = math.MaxInt64 - 1 )
const AccountDataGlobalRoom = ""
const MaxPostgresParameters = 65535
Max number of parameters in a single SQL command
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AccountData ¶
type AccountDataChunker ¶
type AccountDataChunker []AccountData
func (AccountDataChunker) Len ¶
func (c AccountDataChunker) Len() int
type AccountDataTable ¶
type AccountDataTable struct{}
AccountDataTable stores the account data for users.
func NewAccountDataTable ¶
func NewAccountDataTable(db *sqlx.DB) *AccountDataTable
func (*AccountDataTable) Insert ¶
func (t *AccountDataTable) Insert(txn *sqlx.Tx, accDatas []AccountData) ([]AccountData, error)
Insert account data.
func (*AccountDataTable) Select ¶
func (t *AccountDataTable) Select(txn *sqlx.Tx, userID, eventType, roomID string) (*AccountData, error)
func (*AccountDataTable) SelectMany ¶
func (t *AccountDataTable) SelectMany(txn *sqlx.Tx, userID string, roomIDs ...string) (datas []AccountData, err error)
type Accumulator ¶
type Accumulator struct {
// contains filtered or unexported fields
}
Accumulator tracks room state and timelines.
In order for it to remain simple(ish), the accumulator DOES NOT SUPPORT arbitrary timeline gaps. There is an Initialise function for new rooms (with some pre-determined state) and then a constant Accumulate function for timeline events. v2 sync must be called with a large enough timeline.limit for this to work!
func NewAccumulator ¶
func NewAccumulator(db *sqlx.DB) *Accumulator
func (*Accumulator) Accumulate ¶
func (a *Accumulator) Accumulate(roomID string, prevBatch string, timeline []json.RawMessage) (numNew int, latestNID int64, err error)
Accumulate internal state from a user's sync response. The timeline order MUST be in the order received from the server. Returns the number of new events in the timeline.
This function does several things:
- It ensures all events are persisted in the database. This is shared amongst users.
- If all events have been stored before, then it short circuits and returns. This is because we must have already processed this part of the timeline in order for the event to exist in the database, and the sync stream is already linearised for us.
- Else it creates a new room state snapshot if the timeline contains state events (as this now represents the current state)
- It adds entries to the membership log for membership events.
func (*Accumulator) Delta ¶
func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (eventsJSON []json.RawMessage, latest int64, err error)
Delta returns a list of events of at most `limit` for the room not including `lastEventNID`. Returns the latest NID of the last event (most recent)
func (*Accumulator) Initialise ¶
func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (bool, error)
Initialise starts a new sync accumulator for the given room using the given state as a baseline. This will only take effect if this is the first time the v3 server has seen this room, and it wasn't possible to get all events up to the create event (e.g Matrix HQ). Returns true if this call actually added new events
This function: - Stores these events - Sets up the current snapshot based on the state list given.
type Event ¶
type Event struct { NID int64 `db:"event_nid"` Type string `db:"event_type"` StateKey string `db:"state_key"` Membership string `db:"membership"` // This is a snapshot ID which corresponds to some room state BEFORE this event has been applied. BeforeStateSnapshotID int `db:"before_state_snapshot_id"` ID string `db:"event_id"` RoomID string `db:"room_id"` // not all events include a prev batch (e.g if it was part of state not timeline, and only the first // event in a timeline has a prev_batch attached), but we'll look for the 'closest' prev batch // when returning these tokens to the caller (closest = next newest, assume clients de-dupe) PrevBatch sql.NullString `db:"prev_batch"` // stripped events will be missing this field JSON []byte `db:"event"` }
type EventChunker ¶
type EventChunker []Event
func (EventChunker) Len ¶
func (c EventChunker) Len() int
type EventTable ¶
type EventTable struct {
// contains filtered or unexported fields
}
EventTable stores events. A unique numeric ID is associated with each event.
func NewEventTable ¶
func NewEventTable(db *sqlx.DB) *EventTable
NewEventTable makes a new EventTable
func (*EventTable) BeforeStateSnapshotIDForEventNID ¶
func (*EventTable) Insert ¶
Insert events into the event table. Returns the number of rows added. If the number of rows is >0, and the list of events is in sync stream order, it can be inferred that the last element(s) are new.
func (*EventTable) SelectByIDs ¶
func (*EventTable) SelectByNIDs ¶
func (*EventTable) SelectClosestPrevBatch ¶
func (t *EventTable) SelectClosestPrevBatch(roomID string, eventNID int64) (prevBatch string, err error)
Select the closest prev batch token for the provided event NID. Returns the empty string if there is no closest.
func (*EventTable) SelectClosestPrevBatchByID ¶
func (t *EventTable) SelectClosestPrevBatchByID(roomID string, eventID string) (prevBatch string, err error)
SelectClosestPrevBatchByID is the same as SelectClosestPrevBatch but works on event IDs not NIDs
func (*EventTable) SelectEventNIDsWithTypeInRoom ¶
func (t *EventTable) SelectEventNIDsWithTypeInRoom(txn *sqlx.Tx, eventType string, limit int, targetRoom string, lowerExclusive, upperInclusive int64) (eventNIDs []int64, err error)
Select all events matching the given event type in a room. Used to implement the room member stream (paginated room lists)
func (*EventTable) SelectEventsBetween ¶
func (*EventTable) SelectEventsWithTypeStateKey ¶
func (t *EventTable) SelectEventsWithTypeStateKey(eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
Select all events between the bounds matching the type, state_key given. Used to work out which rooms the user was joined to at a given point in time.
func (*EventTable) SelectEventsWithTypeStateKeyInRooms ¶
func (t *EventTable) SelectEventsWithTypeStateKeyInRooms(roomIDs []string, eventType, stateKey string, lowerExclusive, upperInclusive int64) ([]Event, error)
Select all events between the bounds matching the type, state_key given, in the rooms specified only. Used to work out which rooms the user was joined to at a given point in time.
func (*EventTable) SelectHighestNID ¶
func (t *EventTable) SelectHighestNID() (highest int64, err error)
func (*EventTable) SelectLatestEventsBetween ¶
func (*EventTable) SelectNIDsByIDs ¶
func (*EventTable) SelectStrippedEventsByIDs ¶
func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids []string) (StrippedEvents, error)
func (*EventTable) SelectStrippedEventsByNIDs ¶
func (t *EventTable) SelectStrippedEventsByNIDs(txn *sqlx.Tx, verifyAll bool, nids []int64) (StrippedEvents, error)
func (*EventTable) UpdateBeforeSnapshotID ¶
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error
UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
type InvitesTable ¶
type InvitesTable struct {
// contains filtered or unexported fields
}
InvitesTable stores invites for each user. Originally, invites were stored with the main events in a room. We ignored stripped state and just kept the m.room.member invite event. This had many problems though:
- The room would be initialised by the invite event, causing the room to not populate with state correctly when the user joined the room.
- The user could read room data in the room without being joined to the room e.g could pull `required_state` and `timeline` as they would be authorised by the invite to see this data.
Instead, we now completely split out invites from the normal event flow. This fixes the issues outlined above but introduce more problems:
- How do you sort the invite with rooms?
- How do you calculate the room name when you lack heroes?
For now, we say that invites:
- are treated as a highlightable event for the purposes of sorting by highlight count.
- are given the timestamp of when the invite arrived.
- calculate the room name on a best-effort basis given the lack of heroes (same as element-web).
When an invite is rejected, it appears in the `leave` section which then causes the invite to be removed from this table.
func NewInvitesTable ¶
func NewInvitesTable(db *sqlx.DB) *InvitesTable
func (*InvitesTable) InsertInvite ¶
func (t *InvitesTable) InsertInvite(userID, roomID string, inviteRoomState []json.RawMessage) error
func (*InvitesTable) RemoveInvite ¶
func (t *InvitesTable) RemoveInvite(userID, roomID string) error
func (*InvitesTable) SelectAllInvitesForUser ¶
func (t *InvitesTable) SelectAllInvitesForUser(userID string) (map[string][]json.RawMessage, error)
Select all invites for this user. Returns a map of room ID to invite_state (json array).
type RoomsTable ¶
type RoomsTable struct{}
RoomsTable stores the current snapshot for a room.
func NewRoomsTable ¶
func NewRoomsTable(db *sqlx.DB) *RoomsTable
func (*RoomsTable) CurrentAfterSnapshotID ¶
func (t *RoomsTable) CurrentAfterSnapshotID(txn *sqlx.Tx, roomID string) (snapshotID int64, err error)
Return the snapshot for this room AFTER the latest event has been applied.
func (*RoomsTable) SelectEncryptedRooms ¶
func (t *RoomsTable) SelectEncryptedRooms(txn *sqlx.Tx) (encrypted []string, err error)
func (*RoomsTable) SelectTombstonedRooms ¶
func (t *RoomsTable) SelectTombstonedRooms(txn *sqlx.Tx) (tombstones []string, err error)
func (*RoomsTable) UpdateCurrentAfterSnapshotID ¶
type SnapshotRow ¶
type SnapshotRow struct { SnapshotID int64 `db:"snapshot_id"` RoomID string `db:"room_id"` Events pq.Int64Array `db:"events"` }
type SnapshotTable ¶
type SnapshotTable struct {
// contains filtered or unexported fields
}
SnapshotTable stores room state snapshots. Each snapshot has a unique numeric ID. Not every event will be associated with a snapshot.
func NewSnapshotsTable ¶
func NewSnapshotsTable(db *sqlx.DB) *SnapshotTable
func (*SnapshotTable) CurrentSnapshots ¶
func (t *SnapshotTable) CurrentSnapshots() (map[string][]int64, error)
func (*SnapshotTable) Delete ¶
func (s *SnapshotTable) Delete(txn *sqlx.Tx, snapshotIDs []int64) error
Delete the snapshot IDs given
func (*SnapshotTable) Insert ¶
func (s *SnapshotTable) Insert(txn *sqlx.Tx, row *SnapshotRow) error
Insert the row. Modifies SnapshotID to be the inserted primary key.
func (*SnapshotTable) Select ¶
func (s *SnapshotTable) Select(txn *sqlx.Tx, snapshotID int64) (row SnapshotRow, err error)
Select a row based on its snapshot ID.
type Storage ¶
type Storage struct { EventsTable *EventTable TypingTable *TypingTable ToDeviceTable *ToDeviceTable UnreadTable *UnreadTable AccountDataTable *AccountDataTable InvitesTable *InvitesTable // contains filtered or unexported fields }
func NewStorage ¶
func (*Storage) AccountData ¶
func (s *Storage) AccountData(userID, roomID, eventType string) (data *AccountData, err error)
func (*Storage) AccountDatas ¶
func (s *Storage) AccountDatas(userID string, roomIDs ...string) (datas []AccountData, err error)
Pull out all account data for this user. If roomIDs is empty, global account data is returned. If roomIDs is non-empty, all account data for these rooms are extracted.
func (*Storage) Accumulate ¶
func (*Storage) AllJoinedMembers ¶
func (*Storage) Initialise ¶
func (*Storage) InsertAccountData ¶
func (s *Storage) InsertAccountData(userID, roomID string, events []json.RawMessage) (data []AccountData, err error)
func (*Storage) JoinedRoomsAfterPosition ¶
func (*Storage) LatestEventNID ¶
func (*Storage) LatestEventsInRooms ¶
func (*Storage) LatestTypingID ¶
func (*Storage) MetadataForAllRooms ¶
func (s *Storage) MetadataForAllRooms() (map[string]internal.RoomMetadata, error)
Extract hero info for all rooms. MUST BE CALLED AT STARTUP ONLY AS THIS WILL RACE WITH LIVE TRAFFIC.
func (*Storage) RoomMembershipDelta ¶
func (*Storage) RoomStateAfterEventPosition ¶
func (*Storage) RoomStateBeforeEventPosition ¶
func (*Storage) VisibleEventNIDsBetween ¶
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error)
Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position, this function returns a map of room ID to a slice of 2-element from|to positions. These positions are all INCLUSIVE, and the client should be informed of these events at some point. For example:
Stream Positions 1 2 3 4 5 6 7 8 9 10 Room A Maj E E E Room B E Maj E Room C E Mal E (a already joined to this room at position 0) E=message event, M=membership event, followed by user letter, followed by 'i' or 'j' or 'l' for invite|join|leave - For Room A: from=1, to=10, returns { RoomA: [ [1,10] ]} (tests events in joined room) - For Room B: from=1, to=10, returns { RoomB: [ [5,10] ]} (tests joining a room starts events) - For Room C: from=1, to=10, returns { RoomC: [ [0,9] ]} (tests leaving a room stops events)
Multiple slices can occur when a user leaves and re-joins the same room, and invites are same-element positions:
Stream Positions 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Room D Maj E Mal E Maj E Mal E Room E E Mai E E Maj E E - For Room D: from=1, to=15 returns { RoomD: [ [1,6], [8,10] ] } (tests multi-join/leave) - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
type StrippedEvents ¶
type StrippedEvents []Event
func (StrippedEvents) NIDs ¶
func (se StrippedEvents) NIDs() (result []int64)
type ToDeviceRow ¶
type ToDeviceRowChunker ¶
type ToDeviceRowChunker []ToDeviceRow
func (ToDeviceRowChunker) Len ¶
func (c ToDeviceRowChunker) Len() int
type ToDeviceTable ¶
type ToDeviceTable struct {
// contains filtered or unexported fields
}
ToDeviceTable stores to_device messages for devices.
func NewToDeviceTable ¶
func NewToDeviceTable(db *sqlx.DB) *ToDeviceTable
func (*ToDeviceTable) DeleteMessagesUpToAndIncluding ¶
func (t *ToDeviceTable) DeleteMessagesUpToAndIncluding(deviceID string, toIncl int64) error
func (*ToDeviceTable) InsertMessages ¶
func (t *ToDeviceTable) InsertMessages(deviceID string, msgs []json.RawMessage) (pos int64, err error)
func (*ToDeviceTable) Messages ¶
func (t *ToDeviceTable) Messages(deviceID string, from, to, limit int64) (msgs []json.RawMessage, upTo int64, err error)
Query to-device messages for this device, exclusive of from and inclusive of to. If a to value is unknown, use -1.
type TypingTable ¶
type TypingTable struct {
// contains filtered or unexported fields
}
TypingTable stores who is currently typing TODO: If 2 users are in the same room and 1 is on a laggy synchotron, we'll flip flop who is typing with live / stale data. Maybe do this per user per room?
func NewTypingTable ¶
func NewTypingTable(db *sqlx.DB) *TypingTable
func (*TypingTable) SelectHighestID ¶
func (t *TypingTable) SelectHighestID() (id int64, err error)
type UnreadTable ¶
type UnreadTable struct {
// contains filtered or unexported fields
}
UnreadTable stores unread counts per-user
func NewUnreadTable ¶
func NewUnreadTable(db *sqlx.DB) *UnreadTable
func (*UnreadTable) SelectAllNonZeroCountsForUser ¶
func (t *UnreadTable) SelectAllNonZeroCountsForUser(userID string, callback func(roomID string, highlightCount, notificationCount int)) error
func (*UnreadTable) SelectUnreadCounters ¶
func (t *UnreadTable) SelectUnreadCounters(userID, roomID string) (highlightCount, notificationCount int, err error)
func (*UnreadTable) UpdateUnreadCounters ¶
func (t *UnreadTable) UpdateUnreadCounters(userID, roomID string, highlightCount, notificationCount *int) error