sync2

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const AccountDataGlobalRoom = ""

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	WhoAmI(accessToken string) (string, error)
	DoSyncV2(accessToken, since string, isFirst bool) (*SyncResponse, int, error)
}

type Device

type Device struct {
	UserID               string `db:"user_id"`
	DeviceID             string `db:"device_id"`
	Since                string `db:"since"`
	AccessToken          string
	AccessTokenEncrypted string `db:"v2_token_encrypted"`
}

type E2EEFetcher

type E2EEFetcher interface {
	LatestE2EEData(deviceID string) (otkCounts map[string]int, fallbackKeyTypes, changed, left []string)
}

Fetcher which PollerMap satisfies used by the E2EE extension

type EventsResponse

type EventsResponse struct {
	Events []json.RawMessage `json:"events"`
}

type HTTPClient

type HTTPClient struct {
	Client            *http.Client
	DestinationServer string
}

HTTPClient represents a Sync v2 Client. One client can be shared among many users.

func (*HTTPClient) DoSyncV2

func (v *HTTPClient) DoSyncV2(accessToken, since string, isFirst bool) (*SyncResponse, int, error)

DoSyncV2 performs a sync v2 request. Returns the sync response and the response status code or an error. Set isFirst=true on the first sync to force a timeout=0 sync to ensure snapiness.

func (*HTTPClient) WhoAmI

func (v *HTTPClient) WhoAmI(accessToken string) (string, error)

type Poller

type Poller struct {

	// flag set to true when poll() returns due to expired access tokens
	Terminated bool
	// contains filtered or unexported fields
}

Poller can automatically poll the sync v2 endpoint and accumulate the responses in storage

func NewPoller

func NewPoller(userID, accessToken, deviceID string, client Client, receiver V2DataReceiver, txnCache *TransactionIDCache, logger zerolog.Logger) *Poller

func (*Poller) DeviceListChanges

func (p *Poller) DeviceListChanges() (changed, left []string)

func (*Poller) FallbackKeyTypes added in v0.3.1

func (p *Poller) FallbackKeyTypes() []string

func (*Poller) OTKCounts

func (p *Poller) OTKCounts() map[string]int

func (*Poller) Poll

func (p *Poller) Poll(since string)

Poll will block forever, repeatedly calling v2 sync. Do this in a goroutine. Returns if the access token gets invalidated or if there was a fatal error processing v2 responses. Use WaitUntilInitialSync() to wait until the first poll has been processed.

func (*Poller) WaitUntilInitialSync

func (p *Poller) WaitUntilInitialSync()

Blocks until the initial sync has been done on this poller.

type PollerMap

type PollerMap struct {
	Pollers map[string]*Poller // device_id -> poller
	// contains filtered or unexported fields
}

PollerMap is a map of device ID to Poller

func NewPollerMap

func NewPollerMap(v2Client Client, callbacks V2DataReceiver) *PollerMap

NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same goroutine for all pollers. This is required to avoid race conditions at the Go level. Whilst we use SQL transactions to ensure that the DB doesn't race, we then subsequently feed new events from that call into a global cache. This can race which can result in out of order latest NIDs which, if we assert NIDs only increment, will result in missed events.

Consider these events in the same room, with 3 different pollers getting the data:

1 2 3 4 5 6 7 eventual DB event NID
A B C D E F G
-----          poll loop 1 = A,B,C          new events = A,B,C latest=3
---------      poll loop 2 = A,B,C,D,E      new events = D,E   latest=5
-------------  poll loop 3 = A,B,C,D,E,F,G  new events = F,G   latest=7

The DB layer will correctly assign NIDs and stop duplicates, resulting in a set of new events which do not overlap. However, there is a gap between this point and updating the cache, where variable delays can be introduced, so F,G latest=7 could be injected first. If we then never walk back to earlier NIDs, A,B,C,D,E will be dropped from the cache.

This only affects resources which are shared across multiple DEVICES such as:

  • room resources: events, EDUs
  • user resources: notif counts, account data

NOT to-device messages,or since tokens.

func (*PollerMap) Accumulate

func (h *PollerMap) Accumulate(roomID, prevBatch string, timeline []json.RawMessage)

func (*PollerMap) AddToDeviceMessages

func (h *PollerMap) AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)

Add messages for this device. If an error is returned, the poll loop is terminated as continuing would implicitly acknowledge these messages.

func (*PollerMap) EnsurePolling

func (h *PollerMap) EnsurePolling(accessToken, userID, deviceID, v2since string, logger zerolog.Logger)

EnsurePolling makes sure there is a poller for this user, making one if need be. Blocks until at least 1 sync is done if and only if the poller was just created. This ensures that calls to the database will return data. Guarantees only 1 poller will be running per deviceID. Note that we will immediately return if there is a poller for the same user but a different device. We do this to allow for logins on clients to be snappy fast, even though they won't yet have the to-device msgs to decrypt E2EE roms.

func (*PollerMap) Initialise

func (h *PollerMap) Initialise(roomID string, state []json.RawMessage)

func (*PollerMap) LatestE2EEData

func (h *PollerMap) LatestE2EEData(deviceID string) (otkCounts map[string]int, fallbackKeyTypes, changed, left []string)

LatestE2EEData pulls the latest device_lists and device_one_time_keys_count values from the poller. These bits of data are ephemeral and do not need to be persisted.

func (*PollerMap) OnAccountData

func (h *PollerMap) OnAccountData(userID, roomID string, events []json.RawMessage)

func (*PollerMap) OnInvite

func (h *PollerMap) OnInvite(userID, roomID string, inviteState []json.RawMessage)

func (*PollerMap) OnRetireInvite

func (h *PollerMap) OnRetireInvite(userID, roomID string)

func (*PollerMap) SetTyping

func (h *PollerMap) SetTyping(roomID string, userIDs []string)

func (*PollerMap) TransactionIDForEvent

func (h *PollerMap) TransactionIDForEvent(userID, eventID string) string

TransactionIDForEvent returns the transaction ID for this event for this user, if one exists.

func (*PollerMap) UpdateDeviceSince

func (h *PollerMap) UpdateDeviceSince(deviceID, since string)

func (*PollerMap) UpdateUnreadCounts

func (h *PollerMap) UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage remembers sync v2 tokens per-device

func NewStore

func NewStore(postgresURI, secret string) *Storage

func (*Storage) AllDevices added in v0.2.0

func (s *Storage) AllDevices() (devices []Device, err error)

func (*Storage) Device

func (s *Storage) Device(deviceID string) (*Device, error)

func (*Storage) InsertDevice

func (s *Storage) InsertDevice(deviceID, accessToken string) (*Device, error)

func (*Storage) UpdateDeviceSince

func (s *Storage) UpdateDeviceSince(deviceID, since string) error

func (*Storage) UpdateUserIDForDevice

func (s *Storage) UpdateUserIDForDevice(deviceID, userID string) error

type SyncResponse

type SyncResponse struct {
	NextBatch   string         `json:"next_batch"`
	AccountData EventsResponse `json:"account_data"`
	Presence    struct {
		Events []gomatrixserverlib.ClientEvent `json:"events,omitempty"`
	} `json:"presence"`
	Rooms       SyncRoomsResponse `json:"rooms"`
	ToDevice    EventsResponse    `json:"to_device"`
	DeviceLists struct {
		Changed []string `json:"changed,omitempty"`
		Left    []string `json:"left,omitempty"`
	} `json:"device_lists"`
	DeviceListsOTKCount          map[string]int `json:"device_one_time_keys_count,omitempty"`
	DeviceUnusedFallbackKeyTypes []string       `json:"device_unused_fallback_key_types,omitempty"`
}

type SyncRoomsResponse

type SyncRoomsResponse struct {
	Join   map[string]SyncV2JoinResponse   `json:"join"`
	Invite map[string]SyncV2InviteResponse `json:"invite"`
	Leave  map[string]SyncV2LeaveResponse  `json:"leave"`
}

type SyncV2InviteResponse

type SyncV2InviteResponse struct {
	InviteState EventsResponse `json:"invite_state"`
}

InviteResponse represents a /sync response for a room which is under the 'invite' key.

type SyncV2JoinResponse

type SyncV2JoinResponse struct {
	State               EventsResponse      `json:"state"`
	Timeline            TimelineResponse    `json:"timeline"`
	Ephemeral           EventsResponse      `json:"ephemeral"`
	AccountData         EventsResponse      `json:"account_data"`
	UnreadNotifications UnreadNotifications `json:"unread_notifications"`
}

JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.

type SyncV2LeaveResponse

type SyncV2LeaveResponse struct {
	State struct {
		Events []json.RawMessage `json:"events"`
	} `json:"state"`
	Timeline struct {
		Events    []json.RawMessage `json:"events"`
		Limited   bool              `json:"limited"`
		PrevBatch string            `json:"prev_batch,omitempty"`
	} `json:"timeline"`
}

LeaveResponse represents a /sync response for a room which is under the 'leave' key.

type TimelineResponse

type TimelineResponse struct {
	Events    []json.RawMessage `json:"events"`
	Limited   bool              `json:"limited"`
	PrevBatch string            `json:"prev_batch,omitempty"`
}

type TransactionIDCache

type TransactionIDCache struct {
	// contains filtered or unexported fields
}

func NewTransactionIDCache

func NewTransactionIDCache() *TransactionIDCache

func (*TransactionIDCache) Get

func (c *TransactionIDCache) Get(userID, eventID string) string

Get a transaction ID previously stored.

func (*TransactionIDCache) Store

func (c *TransactionIDCache) Store(userID, eventID, txnID string)

Store a new transaction ID received via v2 /sync

type TransactionIDFetcher

type TransactionIDFetcher interface {
	TransactionIDForEvent(userID, eventID string) (txnID string)
}

type UnreadNotifications

type UnreadNotifications struct {
	HighlightCount    *int `json:"highlight_count,omitempty"`
	NotificationCount *int `json:"notification_count,omitempty"`
}

type V2DataReceiver

type V2DataReceiver interface {
	UpdateDeviceSince(deviceID, since string)
	Accumulate(roomID, prevBatch string, timeline []json.RawMessage)
	Initialise(roomID string, state []json.RawMessage)
	SetTyping(roomID string, userIDs []string)
	// Add messages for this device. If an error is returned, the poll loop is terminated as continuing
	// would implicitly acknowledge these messages.
	AddToDeviceMessages(userID, deviceID string, msgs []json.RawMessage)

	UpdateUnreadCounts(roomID, userID string, highlightCount, notifCount *int)

	OnAccountData(userID, roomID string, events []json.RawMessage)
	OnInvite(userID, roomID string, inviteState []json.RawMessage)
	OnRetireInvite(userID, roomID string)
}

V2DataReceiver is the receiver for all the v2 sync data the poller gets

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL