Documentation ¶
Index ¶
- Constants
- Variables
- type Client
- type Device
- type DeviceDataTicker
- type DevicesTable
- type EventsResponse
- type HTTPClient
- type IPollerMap
- type PendingTransactionIDs
- type PollerID
- type PollerMap
- func (h *PollerMap) Accumulate(ctx context.Context, userID, deviceID, roomID string, ...) (err error)
- func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error
- func (h *PollerMap) DeviceIDs(userID string) []string
- func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, ...) (bool, error)
- func (h *PollerMap) ExpirePollers(pids []PollerID) int
- func (h *PollerMap) Initialise(ctx context.Context, roomID string, state []json.RawMessage) (err error)
- func (h *PollerMap) NumPollers() (count int)
- func (h *PollerMap) OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) (err error)
- func (h *PollerMap) OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, ...) error
- func (h *PollerMap) OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string)
- func (h *PollerMap) OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) (err error)
- func (h *PollerMap) OnLeftRoom(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) (err error)
- func (h *PollerMap) OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ...)
- func (h *PollerMap) OnTerminated(ctx context.Context, pollerID PollerID)
- func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
- func (h *PollerMap) SetTyping(ctx context.Context, pollerID PollerID, roomID string, ...)
- func (h *PollerMap) Terminate()
- func (h *PollerMap) UpdateDeviceSince(ctx context.Context, userID, deviceID, since string)
- func (h *PollerMap) UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int)
- type Storage
- type SyncResponse
- type SyncRoomsResponse
- type SyncV2InviteResponse
- type SyncV2JoinResponse
- type SyncV2LeaveResponse
- type TimelineResponse
- type Token
- type TokenForPoller
- type TokensTable
- func (t *TokensTable) Delete(accessTokenHash string) error
- func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)
- func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)
- func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error
- func (t *TokensTable) Token(plaintextToken string) (*Token, error)
- func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)
- type UnreadNotifications
- type V2DataReceiver
Constants ¶
const AccountDataGlobalRoom = ""
Variables ¶
var HTTP401 error = fmt.Errorf("HTTP 401")
var ProxyVersion = ""
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Versions fetches and parses the list of Matrix versions that the homeserver // advertises itself as supporting. Versions(ctx context.Context) (version []string, err error) // WhoAmI asks the homeserver to lookup the access token using the CSAPI /whoami // endpoint. The response must contain a device ID (meaning that we assume the // homeserver supports Matrix >= 1.1.) WhoAmI(ctx context.Context, accessToken string) (userID, deviceID string, err error) DoSyncV2(ctx context.Context, accessToken, since string, isFirst bool, toDeviceOnly bool) (*SyncResponse, int, error) }
type DeviceDataTicker ¶ added in v0.99.4
type DeviceDataTicker struct {
// contains filtered or unexported fields
}
This struct remembers user+device IDs to notify for then periodically emits them all to the caller. Use to rate limit the frequency of device list updates.
func NewDeviceDataTicker ¶ added in v0.99.4
func NewDeviceDataTicker(d time.Duration) *DeviceDataTicker
Create a new device data ticker, which batches calls to Remember and invokes a callback every d duration. If d is 0, no batching is performed and the callback is invoked synchronously, which is useful for testing.
func (*DeviceDataTicker) Remember ¶ added in v0.99.4
func (t *DeviceDataTicker) Remember(pid PollerID)
Remember this user/device ID, and emit it later on.
func (*DeviceDataTicker) Run ¶ added in v0.99.4
func (t *DeviceDataTicker) Run()
Blocks forever, ticking until Stop() is called.
func (*DeviceDataTicker) SetCallback ¶ added in v0.99.4
func (t *DeviceDataTicker) SetCallback(fn func(payload *pubsub.V2DeviceData))
Set the function which should be called when the tick happens.
type DevicesTable ¶ added in v0.99.3
type DevicesTable struct {
// contains filtered or unexported fields
}
DevicesTable remembers syncv2 since positions per-device
func NewDevicesTable ¶ added in v0.99.3
func NewDevicesTable(db *sqlx.DB) *DevicesTable
func (*DevicesTable) FindOldDevices ¶ added in v0.99.6
func (t *DevicesTable) FindOldDevices(inactivityPeriod time.Duration) (devices []Device, err error)
FindOldDevices fetches the user_id and device_id of all devices which haven't /synced for at least as long as the given inactivityPeriod. Such devices are returned in no particular order.
This is determined using the syncv3_sync2_tokens.last_seen column, which is updated at most once per day to save DB throughtput (see TokensTable.MaybeUpdateLastSeen). The caller should therefore use an inactivityPeriod of at least two days to avoid considering a recently-used device as old.
func (*DevicesTable) InsertDevice ¶ added in v0.99.3
func (t *DevicesTable) InsertDevice(txn *sqlx.Tx, userID, deviceID string) error
InsertDevice creates a new devices row with a blank since token if no such row exists. Otherwise, it does nothing.
func (*DevicesTable) UpdateDeviceSince ¶ added in v0.99.3
func (t *DevicesTable) UpdateDeviceSince(userID, deviceID, since string) error
type EventsResponse ¶
type EventsResponse struct {
Events []json.RawMessage `json:"events"`
}
type HTTPClient ¶
type HTTPClient struct { Client *http.Client LongTimeoutClient *http.Client DestinationServer string }
HTTPClient represents a Sync v2 Client. One client can be shared among many users.
func NewHTTPClient ¶ added in v0.99.11
func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer string) *HTTPClient
func (*HTTPClient) DoSyncV2 ¶
func (v *HTTPClient) DoSyncV2(ctx context.Context, accessToken, since string, isFirst, toDeviceOnly bool) (*SyncResponse, int, error)
DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.
type IPollerMap ¶ added in v0.99.3
type IPollerMap interface { EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (created bool, err error) NumPollers() int Terminate() DeviceIDs(userID string) []string // ExpirePollers requests that the given pollers are terminated as if their access // tokens had expired. Returns the number of pollers successfully terminated. ExpirePollers(ids []PollerID) int }
type PendingTransactionIDs ¶ added in v0.99.5
type PendingTransactionIDs struct {
// contains filtered or unexported fields
}
PendingTransactionIDs is (conceptually) a map from event IDs to a list of device IDs. Its keys are the IDs of event we've seen which a) lack a transaction ID, and b) were sent by one of the users we are polling for. The values are the list of the sender's devices whose pollers are yet to see a transaction ID.
If another poller sees the same event
with a transaction ID, it emits a V2TransactionID payload with that ID and removes the event ID from this map.
without a transaction ID, it removes the polling device ID from the values list. If the device ID list is now empty, the poller emits an "all clear" V2TransactionID payload.
This is a best-effort affair to ensure that the rest of the proxy can wait for transaction IDs to appear before transmitting an event down /sync to its sender.
It's possible that we add an entry to this map and then the list of remaining device IDs becomes out of date, either due to a new device creation or an existing device expiring. We choose not to handle this case, because it is relatively rare.
To avoid the map growing without bound, we use a ttlcache and drop entries after a short period of time.
func NewPendingTransactionIDs ¶ added in v0.99.5
func NewPendingTransactionIDs(loader loaderFunc) *PendingTransactionIDs
func (*PendingTransactionIDs) MissingTxnID ¶ added in v0.99.5
func (c *PendingTransactionIDs) MissingTxnID(eventID, userID, myDeviceID string) (bool, error)
MissingTxnID should be called to report that this device ID did not see a transaction ID for this event ID. Returns true if this is the first time we know for sure that we'll never see a txn ID for this event.
func (*PendingTransactionIDs) SeenTxnID ¶ added in v0.99.5
func (c *PendingTransactionIDs) SeenTxnID(eventID string) error
SeenTxnID should be called to report that this device saw a transaction ID for this event.
type PollerMap ¶
type PollerMap struct { Pollers map[PollerID]*poller // contains filtered or unexported fields }
PollerMap is a map of device ID to Poller
func NewPollerMap ¶
NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.
Consider these events in the same room, with 3 different pollers getting the data:
1 2 3 4 5 6 7 eventual DB event NID A B C D E F G ----- poll loop 1 = A,B,C new events = A,B,C latest=3 --------- poll loop 2 = A,B,C,D,E new events = D,E latest=5 ------------- poll loop 3 = A,B,C,D,E,F,G new events = F,G latest=7
The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.
This only affects resources which are shared across multiple DEVICES such as:
- room resources: events, EDUs
- user resources: notif counts, account data
NOT to-device messages,or since tokens.
func (*PollerMap) Accumulate ¶
func (*PollerMap) AddToDeviceMessages ¶
func (h *PollerMap) AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error
Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.
func (*PollerMap) DeviceIDs ¶ added in v0.99.5
DeviceIDs returns the slice of all devices currently being polled for by this user. The return value is brand-new and is fully owned by the caller.
func (*PollerMap) EnsurePolling ¶
func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isStartup bool, logger zerolog.Logger) (bool, error)
EnsurePolling makes sure there is a poller for this device, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE rooms.
func (*PollerMap) ExpirePollers ¶ added in v0.99.6
func (*PollerMap) Initialise ¶
func (*PollerMap) NumPollers ¶
func (*PollerMap) OnAccountData ¶
func (*PollerMap) OnE2EEData ¶
func (*PollerMap) OnExpiredToken ¶ added in v0.99.2
func (*PollerMap) OnLeftRoom ¶
func (*PollerMap) OnTerminated ¶
func (*PollerMap) SetCallbacks ¶
func (h *PollerMap) SetCallbacks(callbacks V2DataReceiver)
func (*PollerMap) Terminate ¶
func (h *PollerMap) Terminate()
Terminate all pollers. Useful in tests.
func (*PollerMap) UpdateDeviceSince ¶
type Storage ¶
type Storage struct { DevicesTable *DevicesTable TokensTable *TokensTable DB *sqlx.DB }
type SyncResponse ¶
type SyncResponse struct { NextBatch string `json:"next_batch"` AccountData EventsResponse `json:"account_data"` Presence struct { Events []json.RawMessage `json:"events,omitempty"` } `json:"presence"` Rooms SyncRoomsResponse `json:"rooms"` ToDevice EventsResponse `json:"to_device"` DeviceLists struct { Changed []string `json:"changed,omitempty"` Left []string `json:"left,omitempty"` } `json:"device_lists"` DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"` DeviceUnusedFallbackKeyTypes []string `json:"device_unused_fallback_key_types,omitempty"` }
type SyncRoomsResponse ¶
type SyncRoomsResponse struct { Join map[string]SyncV2JoinResponse `json:"join"` Invite map[string]SyncV2InviteResponse `json:"invite"` Leave map[string]SyncV2LeaveResponse `json:"leave"` }
type SyncV2InviteResponse ¶
type SyncV2InviteResponse struct {
InviteState EventsResponse `json:"invite_state"`
}
InviteResponse represents a /sync response for a room which is under the 'invite' key.
type SyncV2JoinResponse ¶
type SyncV2JoinResponse struct { State EventsResponse `json:"state"` Timeline TimelineResponse `json:"timeline"` Ephemeral EventsResponse `json:"ephemeral"` AccountData EventsResponse `json:"account_data"` UnreadNotifications UnreadNotifications `json:"unread_notifications"` }
JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type SyncV2LeaveResponse ¶
type SyncV2LeaveResponse struct { State struct { Events []json.RawMessage `json:"events"` } `json:"state"` Timeline struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` } `json:"timeline"` }
LeaveResponse represents a /sync response for a room which is under the 'leave' key.
type TimelineResponse ¶
type TimelineResponse struct { Events []json.RawMessage `json:"events"` Limited bool `json:"limited"` PrevBatch string `json:"prev_batch,omitempty"` }
type TokenForPoller ¶ added in v0.99.3
TokenForPoller represents a row of the tokens table, together with any data maintained by pollers for that token's device.
type TokensTable ¶ added in v0.99.3
type TokensTable struct {
// contains filtered or unexported fields
}
TokensTable remembers sync v2 tokens
func NewTokensTable ¶ added in v0.99.3
func NewTokensTable(db *sqlx.DB, secret string) *TokensTable
NewTokensTable creates the syncv3_sync2_tokens table if it does not already exist.
func (*TokensTable) Delete ¶ added in v0.99.3
func (t *TokensTable) Delete(accessTokenHash string) error
Delete looks up a token by its hash and deletes the row. If no token exists with the given hash, a warning is logged but no error is returned.
func (*TokensTable) GetTokenAndSince ¶ added in v0.99.3
func (t *TokensTable) GetTokenAndSince(userID, deviceID, tokenHash string) (accessToken, since string, err error)
func (*TokensTable) Insert ¶ added in v0.99.3
func (t *TokensTable) Insert(txn *sqlx.Tx, plaintextToken, userID, deviceID string, lastSeen time.Time) (*Token, error)
Insert a new token into the table.
func (*TokensTable) MaybeUpdateLastSeen ¶ added in v0.99.3
func (t *TokensTable) MaybeUpdateLastSeen(token *Token, newLastSeen time.Time) error
MaybeUpdateLastSeen actions a request to update a Token struct with its last_seen value in the DB. To avoid spamming the DB with a write every time a sync3 request arrives, we only update the last seen timestamp or the if it is at least 24 hours old. The timestamp is updated on the Token struct if and only if it is updated in the DB.
func (*TokensTable) Token ¶ added in v0.99.3
func (t *TokensTable) Token(plaintextToken string) (*Token, error)
Token retrieves a tokens row from the database if it exists. Errors with sql.NoRowsError if the token does not exist. Errors with an unspecified error otherwise.
func (*TokensTable) TokenForEachDevice ¶ added in v0.99.3
func (t *TokensTable) TokenForEachDevice(txn *sqlx.Tx) (tokens []TokenForPoller, err error)
TokenForEachDevice loads the most recently used token for each device. If given a transaction, it will SELECT inside that transaction.
type UnreadNotifications ¶
type V2DataReceiver ¶
type V2DataReceiver interface { // Update the since token for this device. Called AFTER all other data in this sync response has been processed. UpdateDeviceSince(ctx context.Context, userID, deviceID, since string) // Accumulate data for this room. This means the timeline section of the v2 response. // Return an error to stop the since token advancing. Accumulate(ctx context.Context, userID, deviceID, roomID string, timeline TimelineResponse) error // latest pos with event nids of timeline entries // Initialise the room, if it hasn't been already. This means the state section of the v2 response. // If given a state delta from an incremental sync, returns the slice of all state events unknown to the DB. // Return an error to stop the since token advancing. Initialise(ctx context.Context, roomID string, state []json.RawMessage) error // snapshot ID? // SetTyping indicates which users are typing. SetTyping(ctx context.Context, pollerID PollerID, roomID string, ephEvent json.RawMessage) // Sent when there is a new receipt OnReceipt(ctx context.Context, userID, roomID, ephEventType string, ephEvent json.RawMessage) // AddToDeviceMessages adds this chunk of to_device messages. Preserve the ordering. // Return an error to stop the since token advancing. AddToDeviceMessages(ctx context.Context, userID, deviceID string, msgs []json.RawMessage) error // UpdateUnreadCounts sets the highlight_count and notification_count for this user in this room. UpdateUnreadCounts(ctx context.Context, roomID, userID string, highlightCount, notifCount *int) // Set the latest account data for this user. // Return an error to stop the since token advancing. OnAccountData(ctx context.Context, userID, roomID string, events []json.RawMessage) error // ping update with types? Can you race when re-querying? // Sent when there is a room in the `invite` section of the v2 response. // Return an error to stop the since token advancing. OnInvite(ctx context.Context, userID, roomID string, inviteState []json.RawMessage) error // invitestate in db // Sent when there is a room in the `leave` section of the v2 response. // Return an error to stop the since token advancing. OnLeftRoom(ctx context.Context, userID, roomID string, leaveEvent json.RawMessage) error // Sent when there is a _change_ in E2EE data, not all the time OnE2EEData(ctx context.Context, userID, deviceID string, otkCounts map[string]int, fallbackKeyTypes []string, deviceListChanges map[string]int) error // Sent when the poll loop terminates OnTerminated(ctx context.Context, pollerID PollerID) // Sent when the token gets a 401 response OnExpiredToken(ctx context.Context, accessTokenHash, userID, deviceID string) }
V2DataReceiver is the receiver for all the v2 sync data the poller gets