Documentation ¶
Index ¶
- Constants
- type Client
- type Device
- type E2EEFetcher
- type EventsResponse
- type HTTPClient
- type PollerMap
- func (h *PollerMap) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage)
- func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)
- func (h *PollerMap) EnsurePolling(accessToken, userID, deviceID, v2since string, logger zerolog.Logger)
- func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)
- func (h *PollerMap) NumPollers() (count int)
- func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)
- func (h *PollerMap) OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, ...)
- func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)
- func (h *PollerMap) OnLeftRoom(userID, roomID string)
- func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
- func (h *PollerMap) OnTerminated(userID, deviceID string)
- func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
- func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage)
- func (h *PollerMap) Terminate()
- func (h *PollerMap) UpdateDeviceSince(deviceID, since string)
- func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
- type Storage
- func (s *Storage) AllDevices() (devices []Device, err error)
- func (s *Storage) Device(deviceID string) (*Device, error)
- func (s *Storage) InsertDevice(deviceID, accessToken string) (*Device, error)
- func (s *Storage) Teardown()
- func (s *Storage) UpdateDeviceSince(deviceID, since string) error
- func (s *Storage) UpdateUserIDForDevice(deviceID, userID string) error
- type SyncResponse
- type SyncRoomsResponse
- type SyncV2InviteResponse
- type SyncV2JoinResponse
- type SyncV2LeaveResponse
- type TimelineResponse
- type TransactionIDFetcher
- type UnreadNotifications
- type V2DataReceiver
Constants ¶
const AccountDataGlobalRoom = ""
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type E2EEFetcher ¶
type E2EEFetcher interface {
DeviceData(userID, deviceID string, isInitial bool) *internal.DeviceData
}
Fetcher used by the E2EE extension
type EventsResponse ¶
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}
type HTTPClient ¶
HTTPClient represents a Sync v2 Client. One client can be shared among many users.
func (*HTTPClient) DoSyncV2 ¶
func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool) (*SyncResponse, int, error)
DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.
type PollerMap ¶
type PollerMap struct { Pollers map[string]*poller // device_id -> poller // contains filtered or unexported fields }
PollerMap is a map of device ID to Poller
func NewPollerMap ¶
NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.
Consider these events in the same room, with 3 different pollers getting the data:
1 2 3 4 5 6 7 eventual DB event NID A B C D E F G ----- poll loop 1 = A,B,C new events = A,B,C latest=3 --------- poll loop 2 = A,B,C,D,E new events = D,E latest=5 ------------- poll loop 3 = A,B,C,D,E,F,G new events = F,G latest=7
The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.
This only affects resources which are shared across multiple DEVICES such as:
- room resources: events, EDUs
- user resources: notif counts, account data
NOT to-device messages,or since tokens.
func (*PollerMap) Accumulate ¶
func (h *PollerMap) Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage)
func (*PollerMap) AddToDeviceMessages ¶
func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)
Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.
func (*PollerMap) EnsurePolling ¶
func (h *PollerMap) EnsurePolling(accessToken, userID, deviceID, v2since string, logger zerolog.Logger)
EnsurePolling makes sure there is a poller for this user, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE roms.
func (*PollerMap) Initialise ¶
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)
func (*PollerMap) NumPollers ¶ added in v0.5.1
func (*PollerMap) OnAccountData ¶
func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)
func (*PollerMap) OnE2EEData ¶ added in v0.5.0
func (*PollerMap) OnInvite ¶
func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)
func (*PollerMap) OnLeftRoom ¶ added in v0.4.1
func (*PollerMap) OnReceipt ¶ added in v0.7.3
func (h *PollerMap) OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage)
func (*PollerMap) OnTerminated ¶ added in v0.5.1
func (*PollerMap) SetCallbacks ¶ added in v0.5.0
func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
func (*PollerMap) SetTyping ¶
func (h *PollerMap) SetTyping(roomID string, ephEvent json.RawMessage)
func (*PollerMap) Terminate ¶ added in v0.5.0
func (h *PollerMap) Terminate()
Terminate all pollers. Useful in tests.
func (*PollerMap) UpdateDeviceSince ¶
func (*PollerMap) UpdateUnreadCounts ¶
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage remembers sync v2 tokens per-device
func (*Storage) AllDevices ¶ added in v0.2.0
func (*Storage) InsertDevice ¶
func (*Storage) UpdateDeviceSince ¶
func (*Storage) UpdateUserIDForDevice ¶
type SyncResponse ¶
type SyncResponse struct { NextBatch string `json:"next_batch"` AccountData EventsResponse `json:"account_data"` Presence struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` } `json:"presence"` Rooms SyncRoomsResponse `json:"rooms"` ToDevice EventsResponse `json:"to_device"` DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` } `json:"device_lists"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` DeviceUnusedFallbackKeyTypes []string `json:"device_unused_fallback_key_types,omitempty"` }
type SyncRoomsResponse ¶
type SyncRoomsResponse struct { Join map[string]SyncV2JoinResponse `json:"join"` Invite map[string]SyncV2InviteResponse `json:"invite"` Leave map[string]SyncV2LeaveResponse `json:"leave"` }
type SyncV2InviteResponse ¶
type SyncV2InviteResponse struct {
InviteState EventsResponse `json:"invite_state"`
}
InviteResponse represents a /sync response for a room which is under the 'invite' key.
type SyncV2JoinResponse ¶
type SyncV2JoinResponse struct { State EventsResponse `json:"state"` Timeline TimelineResponse `json:"timeline"` Ephemeral EventsResponse `json:"ephemeral"` AccountData EventsResponse `json:"account_data"` UnreadNotifications UnreadNotifications `json:"unread_notifications"` }
JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2LeaveResponse ¶
type SyncV2LeaveResponse struct { State struct { Events []json.RawMessage `json:"events"` } `json:"state"` Timeline struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` } `json:"timeline"` }
LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type TimelineResponse ¶
type TimelineResponse struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` }
type TransactionIDFetcher ¶
type UnreadNotifications ¶
type V2DataReceiver ¶
type V2DataReceiver interface { // Update the since token for this device. Called AFTER all other data in this sync response has been processed. UpdateDeviceSince(deviceID, since string) // Accumulate data for this room. This means the timeline section of the v2 response. Accumulate(userID, roomID, prevBatch string, timeline []json.RawMessage) // latest pos with event nids of timeline entries // Initialise the room, if it hasn't been already. This means the state section of the v2 response. Initialise(roomID string, state []json.RawMessage) // snapshot ID? // SetTyping indicates which users are typing. SetTyping(roomID string, ephEvent json.RawMessage) // Sent when there is a new receipt OnReceipt(userID, roomID, ephEventType string, ephEvent json.RawMessage) // AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering. AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) // start/end stream pos // UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room. UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) // Set the latest account data for this user. OnAccountData(userID, roomID string, events []json.RawMessage) // ping update with types? Can you race when re-querying? // Sent when there is a room in the `invite` section of the v2 response. OnInvite(userID, roomID string, inviteState []json.RawMessage) // invitestate in db // Sent when there is a room in the `leave` section of the v2 response. OnLeftRoom(userID, roomID string) // Sent when there is a _change_ in E2EE data, not all the time OnE2EEData(userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) // Sent when the upstream homeserver sends back a 401 invalidating the token OnTerminated(userID, deviceID string) }
V2DataReceiver is the receiver for all the v2 sync data the poller gets. There exists 2 concrete implementations of this interface:
- The sync2 code which dumps this data into a database and issues a NOTIFY
- The sync3 code which LISTENs and repopulates this data and passes it through to handlers