Documentation ¶
Index ¶
- Constants
- type Client
- type Device
- type E2EEFetcher
- type EventsResponse
- type HTTPClient
- type Poller
- type PollerMap
- func (h *PollerMap) Accumulate(roomID, prevBatch string, timeline []json.RawMessage)
- func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)
- func (h *PollerMap) EnsurePolling(authHeader, userID, deviceID, v2since string, logger zerolog.Logger)
- func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)
- func (h *PollerMap) LatestE2EEData(deviceID string) (otkCounts map[string]int, changed, left []string)
- func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)
- func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)
- func (h *PollerMap) OnRetireInvite(userID, roomID string)
- func (h *PollerMap) SetTyping(roomID string, userIDs []string)
- func (h *PollerMap) TransactionIDForEvent(userID, eventID string) string
- func (h *PollerMap) UpdateDeviceSince(deviceID, since string)
- func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)
- type Storage
- type SyncResponse
- type SyncRoomsResponse
- type SyncV2InviteResponse
- type SyncV2JoinResponse
- type SyncV2LeaveResponse
- type TimelineResponse
- type TransactionIDCache
- type TransactionIDFetcher
- type UnreadNotifications
- type V2DataReceiver
Constants ¶
const AccountDataGlobalRoom = ""
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type E2EEFetcher ¶
type E2EEFetcher interface {
LatestE2EEData(deviceID string) (otkCounts map[string]int, changed, left []string)
}
Fetcher which PollerMap satisfies used by the E2EE extension
type EventsResponse ¶
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}
type HTTPClient ¶
HTTPClient represents a Sync v2 Client. One client can be shared among many users.
func (*HTTPClient) DoSyncV2 ¶
func (v *HTTPClient) DoSyncV2(authHeader, since string, isFirst bool) (*SyncResponse, int, error)
DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.
type Poller ¶
type Poller struct { // flag set to true when poll() returns due to expired access tokens Terminated bool // contains filtered or unexported fields }
Poller can automatically poll the sync v2 endpoint and accumulate the responses in storage
func NewPoller ¶
func NewPoller(userID, authHeader, deviceID string, client Client, receiver V2DataReceiver, txnCache *TransactionIDCache, logger zerolog.Logger) *Poller
func (*Poller) DeviceListChanges ¶
func (*Poller) Poll ¶
Poll will block forever, repeatedly calling v2 sync. Do this in a goroutine. Returns if the access token gets invalidated or if there was a fatal error processing v2 responses. Use WaitUntilInitialSync() to wait until the first poll has been processed.
func (*Poller) WaitUntilInitialSync ¶
func (p *Poller) WaitUntilInitialSync()
Blocks until the initial sync has been done on this poller.
type PollerMap ¶
type PollerMap struct { Pollers map[string]*Poller // device_id -> poller // contains filtered or unexported fields }
PollerMap is a map of device ID to Poller
func NewPollerMap ¶
func NewPollerMap(v2Client Client, callbacks V2DataReceiver) *PollerMap
NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.
Consider these events in the same room, with 3 different pollers getting the data:
1 2 3 4 5 6 7 eventual DB event NID A B C D E F G ----- poll loop 1 = A,B,C new events = A,B,C latest=3 --------- poll loop 2 = A,B,C,D,E new events = D,E latest=5 ------------- poll loop 3 = A,B,C,D,E,F,G new events = F,G latest=7
The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.
This only affects resources which are shared across multiple DEVICES such as:
- room resources: events, EDUs
- user resources: notif counts, account data
NOT to-device messages,or since tokens.
func (*PollerMap) Accumulate ¶
func (h *PollerMap) Accumulate(roomID, prevBatch string, timeline []json.RawMessage)
func (*PollerMap) AddToDeviceMessages ¶
func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)
Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.
func (*PollerMap) EnsurePolling ¶
func (h *PollerMap) EnsurePolling(authHeader, userID, deviceID, v2since string, logger zerolog.Logger)
EnsurePolling makes sure there is a poller for this user, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE roms.
func (*PollerMap) Initialise ¶
func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)
func (*PollerMap) LatestE2EEData ¶
func (h *PollerMap) LatestE2EEData(deviceID string) (otkCounts map[string]int, changed, left []string)
LatestE2EEData pulls the latest device_lists and device_one_time_keys_count values from the poller. These bits of data are ephemeral and do not need to be persisted.
func (*PollerMap) OnAccountData ¶
func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)
func (*PollerMap) OnInvite ¶
func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)
func (*PollerMap) OnRetireInvite ¶
func (*PollerMap) TransactionIDForEvent ¶
TransactionIDForEvent returns the transaction ID for this event for this user, if one exists.
func (*PollerMap) UpdateDeviceSince ¶
func (*PollerMap) UpdateUnreadCounts ¶
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage remembers sync v2 tokens per-device
func (*Storage) UpdateDeviceSince ¶
func (*Storage) UpdateUserIDForDevice ¶
type SyncResponse ¶
type SyncResponse struct { NextBatch string `json:"next_batch"` AccountData EventsResponse `json:"account_data"` Presence struct { Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"` } `json:"presence"` Rooms SyncRoomsResponse `json:"rooms"` ToDevice EventsResponse `json:"to_device"` DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` } `json:"device_lists"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` }
type SyncRoomsResponse ¶
type SyncRoomsResponse struct { Join map[string]SyncV2JoinResponse `json:"join"` Invite map[string]SyncV2InviteResponse `json:"invite"` Leave map[string]SyncV2LeaveResponse `json:"leave"` }
type SyncV2InviteResponse ¶
type SyncV2InviteResponse struct {
InviteState EventsResponse `json:"invite_state"`
}
InviteResponse represents a /sync response for a room which is under the 'invite' key.
type SyncV2JoinResponse ¶
type SyncV2JoinResponse struct { State EventsResponse `json:"state"` Timeline TimelineResponse `json:"timeline"` Ephemeral EventsResponse `json:"ephemeral"` AccountData EventsResponse `json:"account_data"` UnreadNotifications UnreadNotifications `json:"unread_notifications"` }
JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2LeaveResponse ¶
type SyncV2LeaveResponse struct { State struct { Events []json.RawMessage `json:"events"` } `json:"state"` Timeline struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` } `json:"timeline"` }
LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type TimelineResponse ¶
type TimelineResponse struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` }
type TransactionIDCache ¶
type TransactionIDCache struct {
// contains filtered or unexported fields
}
func NewTransactionIDCache ¶
func NewTransactionIDCache() *TransactionIDCache
func (*TransactionIDCache) Get ¶
func (c *TransactionIDCache) Get(userID, eventID string) string
Get a transaction ID previously stored.
func (*TransactionIDCache) Store ¶
func (c *TransactionIDCache) Store(userID, eventID, txnID string)
Store a new transaction ID received via v2 /sync
type TransactionIDFetcher ¶
type UnreadNotifications ¶
type V2DataReceiver ¶
type V2DataReceiver interface { UpdateDeviceSince(deviceID, since string) Accumulate(roomID, prevBatch string, timeline []json.RawMessage) Initialise(roomID string, state []json.RawMessage) SetTyping(roomID string, userIDs []string) // Add messages for this device. If an error is returned, the poll loop is terminated as continuing // would implicitly acknowledge these messages. AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int) OnAccountData(userID, roomID string, events []json.RawMessage) OnInvite(userID, roomID string, inviteState []json.RawMessage) OnRetireInvite(userID, roomID string) }
V2DataReceiver is the receiver for all the v2 sync data the poller gets