sync3

package
v0.99.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpSync       = "SYNC"
	OpInvalidate = "INVALIDATE"
	OpInsert     = "INSERT"
	OpDelete     = "DELETE"
)
View Source
const DispatcherAllUsers = "-"

Variables

View Source
var (
	SortByName              = "by_name"
	SortByRecency           = "by_recency"
	SortByNotificationLevel = "by_notification_level"
	SortByNotificationCount = "by_notification_count" // deprecated
	SortByHighlightCount    = "by_highlight_count"    // deprecated
	SortBy                  = []string{SortByHighlightCount, SortByName, SortByNotificationCount, SortByRecency, SortByNotificationLevel}

	Wildcard     = "*"
	StateKeyLazy = "$LAZY"
	StateKeyMe   = "$ME"

	DefaultTimelineLimit = int64(20)
	DefaultTimeoutMSecs  = 10 * 1000 // 10s
)

Functions

This section is empty.

Types

type Conn

type Conn struct {
	ConnID ConnID
	// contains filtered or unexported fields
}

Conn is an abstraction of a long-poll connection. It automatically handles the position values of the /sync request, including sending cached data in the event of retries. It does not handle the contents of the data at all.

func NewConn

func NewConn(connID ConnID, h ConnHandler) *Conn

func (*Conn) Alive

func (c *Conn) Alive() bool

func (*Conn) OnIncomingRequest

func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError)

OnIncomingRequest advances the clients position in the stream, returning the response position and data.

func (*Conn) OnUpdate added in v0.98.1

func (c *Conn) OnUpdate(ctx context.Context, update caches.Update)

func (*Conn) UserID

func (c *Conn) UserID() string

type ConnHandler

type ConnHandler interface {
	// Callback which is allowed to block as long as the context is active. Return the response
	// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
	// status code to send back.
	OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error)
	OnUpdate(ctx context.Context, update caches.Update)
	UserID() string
	Destroy()
	Alive() bool
}

type ConnID

type ConnID struct {
	DeviceID string
}

func (*ConnID) String

func (c *ConnID) String() string

type ConnMap

type ConnMap struct {
	// contains filtered or unexported fields
}

ConnMap stores a collection of Conns.

func NewConnMap

func NewConnMap() *ConnMap

func (*ConnMap) CloseConn

func (m *ConnMap) CloseConn(connID ConnID)

func (*ConnMap) Conn

func (m *ConnMap) Conn(cid ConnID) *Conn

Conn returns a connection with this ConnID. Returns nil if no connection exists.

func (*ConnMap) CreateConn

func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool)

Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.

func (*ConnMap) Len

func (m *ConnMap) Len() int

func (*ConnMap) Teardown

func (m *ConnMap) Teardown()

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatches live events to caches

func NewDispatcher

func NewDispatcher() *Dispatcher

func (*Dispatcher) IsUserJoined

func (d *Dispatcher) IsUserJoined(userID, roomID string) bool

func (*Dispatcher) OnEphemeralEvent

func (d *Dispatcher) OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)

func (*Dispatcher) OnNewEvents

func (d *Dispatcher) OnNewEvents(
	ctx context.Context, roomID string, events []json.RawMessage, latestPos int64,
)

Called by v2 pollers when we receive new events

func (*Dispatcher) OnNewInitialRoomState

func (d *Dispatcher) OnNewInitialRoomState(ctx context.Context, roomID string, state []json.RawMessage)

Called by v2 pollers when we receive an initial state block. Very similar to OnNewEvents but done in bulk for speed.

func (*Dispatcher) OnReceipt added in v0.99.1

func (d *Dispatcher) OnReceipt(ctx context.Context, receipt internal.Receipt)

func (*Dispatcher) Register

func (d *Dispatcher) Register(userID string, r Receiver) error

func (*Dispatcher) Startup

func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error

Load joined members into the dispatcher. MUST BE CALLED BEFORE V2 POLL LOOPS START.

func (*Dispatcher) Unregister

func (d *Dispatcher) Unregister(userID string)

type FilteredSortableRooms

type FilteredSortableRooms struct {
	*SortableRooms
	// contains filtered or unexported fields
}

FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list. Updates to room metadata may result in rooms being added/removed.

func NewFilteredSortableRooms

func NewFilteredSortableRooms(finder RoomFinder, roomIDs []string, filter *RequestFilters) *FilteredSortableRooms

func (*FilteredSortableRooms) Add

func (f *FilteredSortableRooms) Add(roomID string) bool

type InternalRequestLists

type InternalRequestLists struct {
	// contains filtered or unexported fields
}

InternalRequestLists is a list of lists which matches each index position in the request JSON 'lists'. It contains all the internal metadata for rooms and controls access and updatings of said lists.

func NewInternalRequestLists

func NewInternalRequestLists() *InternalRequestLists

func (*InternalRequestLists) AssignList

func (s *InternalRequestLists) AssignList(listKey string, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool)

Assign a new list at the given key. If Overwrite, any existing list is replaced. If DoNotOverwrite, the existing list is returned if one exists, else a new list is created. Returns the list and true if the list was overwritten.

func (*InternalRequestLists) Count

func (s *InternalRequestLists) Count(listKey string) int

Count returns the count of total rooms in this list

func (*InternalRequestLists) DeleteList

func (s *InternalRequestLists) DeleteList(listKey string)

func (*InternalRequestLists) Get

func (*InternalRequestLists) Len

func (s *InternalRequestLists) Len() int

func (*InternalRequestLists) ReadOnlyRoom added in v0.99.1

func (s *InternalRequestLists) ReadOnlyRoom(roomID string) *RoomConnMetadata

Returns the underlying Room object. Returns a shared pointer, not a copy. It is only safe to read this data, never to write.

func (*InternalRequestLists) RemoveRoom

func (s *InternalRequestLists) RemoveRoom(roomID string)

Remove a room from all lists e.g retired an invite, left a room

func (*InternalRequestLists) SetRoom

func (s *InternalRequestLists) SetRoom(r RoomConnMetadata) (delta RoomDelta)

type JoinedRoomsTracker

type JoinedRoomsTracker struct {
	// contains filtered or unexported fields
}

Tracks who is joined to which rooms. This is critical from a security perspective in order to ensure that only the users joined to the room receive events in that room. Consider the situation where Alice and Bob are joined to room X. If Alice gets kicked from X, the proxy server will still receive messages for room X due to Bob being joined to the room. We therefore need to decide which active connections should be pushed events, which is what this tracker does.

func NewJoinedRoomsTracker

func NewJoinedRoomsTracker() *JoinedRoomsTracker

func (*JoinedRoomsTracker) IsUserJoined

func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool

func (*JoinedRoomsTracker) JoinedRoomsForUser

func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string

func (*JoinedRoomsTracker) JoinedUsersForRoom

func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string, filter func(userID string) bool) (matchedUserIDs []string, joinCount int)

JoinedUsersForRoom returns the joined users in the given room, filtered by the filter function if provided. If one is not provided, all joined users are returned. Returns the join count at the time this function was called.

func (*JoinedRoomsTracker) NumInvitedUsersForRoom

func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int

func (*JoinedRoomsTracker) Startup

func (t *JoinedRoomsTracker) Startup(roomToJoinedUsers map[string][]string)

Startup efficiently sets up the joined rooms tracker, but isn't safe to call with live traffic, as it replaces all known in-memory state. Panics if called on a non-empty tracker.

func (*JoinedRoomsTracker) UserJoinedRoom

func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool

returns true if the state changed

func (*JoinedRoomsTracker) UserLeftRoom

func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)

func (*JoinedRoomsTracker) UsersInvitedToRoom

func (t *JoinedRoomsTracker) UsersInvitedToRoom(userIDs []string, roomID string)

func (*JoinedRoomsTracker) UsersJoinedRoom

func (t *JoinedRoomsTracker) UsersJoinedRoom(userIDs []string, roomID string) bool

returns true if the state changed for any user in userIDs

type List

type List interface {
	IndexOf(roomID string) (int, bool)
	Len() int64
	Sort(sortBy []string) error
	Add(roomID string) bool
	Remove(roomID string) int
	Get(index int) string
}

type ListOp

type ListOp uint8

ListOp represents the possible operations on a list

var (
	// The room is added to the list
	ListOpAdd ListOp = 1
	// The room is removed from the list
	ListOpDel ListOp = 2
	// The room may change position in the list
	ListOpChange ListOp = 3
)

type OverwriteVal

type OverwriteVal bool
var (
	DoNotOverwrite OverwriteVal = false
	Overwrite      OverwriteVal = true
)

type Receiver

type Receiver interface {
	OnNewEvent(ctx context.Context, event *caches.EventData)
	OnReceipt(ctx context.Context, receipt internal.Receipt)
	OnEphemeralEvent(ctx context.Context, roomID string, ephEvent json.RawMessage)
	OnRegistered(latestPos int64) error
}

type Request

type Request struct {
	TxnID             string                      `json:"txn_id"`
	Lists             map[string]RequestList      `json:"lists"`
	RoomSubscriptions map[string]RoomSubscription `json:"room_subscriptions"`
	UnsubscribeRooms  []string                    `json:"unsubscribe_rooms"`
	Extensions        extensions.Request          `json:"extensions"`
	// contains filtered or unexported fields
}

func (*Request) ApplyDelta

func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestDelta)

Apply this delta on top of the request. Returns a new Request with the combined output, along with the delta operations `nextReq` cannot be nil, but `r` can be nil in the case of an initial request.

func (*Request) Same

func (r *Request) Same(other *Request) bool

func (*Request) SetPos

func (r *Request) SetPos(pos int64)

func (*Request) SetTimeoutMSecs

func (r *Request) SetTimeoutMSecs(timeout int)

func (*Request) TimeoutMSecs

func (r *Request) TimeoutMSecs() int

type RequestDelta

type RequestDelta struct {
	// new room IDs to subscribe to
	Subs []string
	// room IDs to unsubscribe from
	Unsubs []string
	// The complete union of both lists (contains max(a,b) lists)
	Lists map[string]RequestListDelta
}

Internal struct used to represent the diffs between 2 requests

type RequestFilters

type RequestFilters struct {
	Spaces         []string  `json:"spaces"`
	IsDM           *bool     `json:"is_dm"`
	IsEncrypted    *bool     `json:"is_encrypted"`
	IsInvite       *bool     `json:"is_invite"`
	IsTombstoned   *bool     `json:"is_tombstoned"` // deprecated
	RoomTypes      []*string `json:"room_types"`
	NotRoomTypes   []*string `json:"not_room_types"`
	RoomNameFilter string    `json:"room_name_like"`
	Tags           []string  `json:"tags"`
	NotTags        []string  `json:"not_tags"`
}

func (*RequestFilters) Include

func (rf *RequestFilters) Include(r *RoomConnMetadata, finder RoomFinder) bool

type RequestList

type RequestList struct {
	RoomSubscription
	Ranges          SliceRanges     `json:"ranges"`
	Sort            []string        `json:"sort"`
	Filters         *RequestFilters `json:"filters"`
	SlowGetAllRooms *bool           `json:"slow_get_all_rooms,omitempty"`
	Deleted         bool            `json:"deleted,omitempty"`
}

func (*RequestList) CalculateMoveIndexes

func (rl *RequestList) CalculateMoveIndexes(fromIndex, toIndex int) (fromTos [][2]int)

Calculate the real from -> to index positions for the two input index positions. This takes into account the ranges on the list.

func (*RequestList) FiltersChanged

func (rl *RequestList) FiltersChanged(next *RequestList) bool

func (*RequestList) ShouldGetAllRooms

func (rl *RequestList) ShouldGetAllRooms() bool

func (*RequestList) SortOrderChanged

func (rl *RequestList) SortOrderChanged(next *RequestList) bool

func (*RequestList) TimelineLimitChanged added in v0.99.0

func (rl *RequestList) TimelineLimitChanged(next *RequestList) bool

func (*RequestList) WriteDeleteOp

func (rl *RequestList) WriteDeleteOp(deletedIndex int) *ResponseOpSingle

Write a delete operation for this list. Can return nil for invalid indexes or if this index isn't being tracked. Useful when rooms are removed from the list e.g left rooms.

func (*RequestList) WriteInsertOp

func (rl *RequestList) WriteInsertOp(insertedIndex int, roomID string) *ResponseOpSingle

Write an insert operation for this list. Can return nil for indexes not being tracked. Useful when rooms are added to the list e.g newly joined rooms.

func (*RequestList) WriteSwapOp

func (rl *RequestList) WriteSwapOp(
	roomID string, fromIndex, toIndex int,
) []ResponseOp

Move a room from an absolute index position to another absolute position. These positions do not need to be inside a valid range. Returns 0-2 operations. For example:

1,2,3,4,5 tracking range [0,4]
3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0
7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0
7 bumps to op again -> 7,1,2,3,4 -> no-op as from == to index
new room 8 in i=5 -> 7,1,2,3,4,8 -> no-op as 8 is outside the range.

Returns the list of ops as well as the new toIndex if it wasn't inside a range.

type RequestListDelta

type RequestListDelta struct {
	// What was there before, nullable
	Prev *RequestList
	// What is there now, nullable. Combined result.
	Curr *RequestList
}

Internal struct used to represent a single list delta.

type Response

type Response struct {
	Lists map[string]ResponseList `json:"lists"`

	Rooms      map[string]Room     `json:"rooms"`
	Extensions extensions.Response `json:"extensions"`

	Pos     string `json:"pos"`
	TxnID   string `json:"txn_id,omitempty"`
	Session string `json:"session_id,omitempty"`
}

func (*Response) ListOps

func (r *Response) ListOps() int

func (*Response) PosInt

func (r *Response) PosInt() int64

func (*Response) RoomIDsToTimelineEventIDs added in v0.99.1

func (r *Response) RoomIDsToTimelineEventIDs() map[string][]string

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

Custom unmarshal so we can dynamically create the right ResponseOp for Ops

type ResponseList

type ResponseList struct {
	Ops   []ResponseOp `json:"ops,omitempty"`
	Count int          `json:"count"`
}

type ResponseOp

type ResponseOp interface {
	Op() string
	// which rooms are we giving data about
	IncludedRoomIDs() []string
}

func CalculateListOps

func CalculateListOps(reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string)

CalculateListOps contains the core list moving algorithm. It accepts the client's list of ranges, the underlying list on which to perform operations on, and the room which was modified and in what way. It returns a list of INSERT/DELETE operations, which may be zero length, as well as which rooms are newly added into the window.

A,B,C,D,E,F,G,H,I        <-- List
`----`    `----`         <-- RequestList.Ranges
DEL E | ADD J | CHANGE C <-- ListOp RoomID

returns:

[ {op:DELETE, index:2}, {op:INSERT, index:0, room_id:A} ] <--- []ResponseOp
[ "A" ] <--- []string, new room subscriptions, if it wasn't in the window before

This function will modify List to Add/Delete/Sort appropriately.

type ResponseOpRange

type ResponseOpRange struct {
	Operation string   `json:"op"`
	Range     []int64  `json:"range,omitempty"`
	RoomIDs   []string `json:"room_ids,omitempty"`
}

func (*ResponseOpRange) IncludedRoomIDs

func (r *ResponseOpRange) IncludedRoomIDs() []string

func (*ResponseOpRange) Op

func (r *ResponseOpRange) Op() string

type ResponseOpSingle

type ResponseOpSingle struct {
	Operation string `json:"op"`
	Index     *int   `json:"index,omitempty"` // 0 is a valid value, hence *int
	RoomID    string `json:"room_id,omitempty"`
}

func (*ResponseOpSingle) IncludedRoomIDs

func (r *ResponseOpSingle) IncludedRoomIDs() []string

func (*ResponseOpSingle) Op

func (r *ResponseOpSingle) Op() string

type Room

type Room struct {
	Name              string            `json:"name,omitempty"`
	RequiredState     []json.RawMessage `json:"required_state,omitempty"`
	Timeline          []json.RawMessage `json:"timeline,omitempty"`
	InviteState       []json.RawMessage `json:"invite_state,omitempty"`
	NotificationCount int64             `json:"notification_count"`
	HighlightCount    int64             `json:"highlight_count"`
	Initial           bool              `json:"initial,omitempty"`
	IsDM              bool              `json:"is_dm,omitempty"`
	JoinedCount       int               `json:"joined_count,omitempty"`
	InvitedCount      int               `json:"invited_count,omitempty"`
	PrevBatch         string            `json:"prev_batch,omitempty"`
	NumLive           int               `json:"num_live,omitempty"`
}

type RoomConnMetadata

type RoomConnMetadata struct {
	internal.RoomMetadata
	caches.UserRoomData
}

type RoomDelta

type RoomDelta struct {
	RoomNameChanged          bool
	JoinCountChanged         bool
	InviteCountChanged       bool
	NotificationCountChanged bool
	HighlightCountChanged    bool
	Lists                    []RoomListDelta
}

type RoomFinder

type RoomFinder interface {
	ReadOnlyRoom(roomID string) *RoomConnMetadata
}

type RoomListDelta

type RoomListDelta struct {
	ListKey string
	Op      ListOp
}

type RoomSubscription

type RoomSubscription struct {
	RequiredState   [][2]string       `json:"required_state"`
	TimelineLimit   int64             `json:"timeline_limit"`
	IncludeOldRooms *RoomSubscription `json:"include_old_rooms"`
}

func (RoomSubscription) Combine

Combine this subcription with another, returning a union of both as a copy.

func (RoomSubscription) LazyLoadMembers

func (rs RoomSubscription) LazyLoadMembers() bool

func (RoomSubscription) RequiredStateChanged added in v0.99.0

func (rs RoomSubscription) RequiredStateChanged(other RoomSubscription) bool

func (RoomSubscription) RequiredStateMap

func (rs RoomSubscription) RequiredStateMap(userID string) *internal.RequiredStateMap

Calculate the required state map for this room subscription. Given event types A,B,C and state keys 1,2,3, the following Venn diagrams are possible:

.---------[*,*]----------.
|      .---------.       |
|      |   A,2   | A,3   |
| .----+--[B,*]--+-----. |
| |    | .-----. |     | |
| |B,1 | | B,2 | | B,3 | |
| |    | `[B,2]` |     | |
| `----+---------+-----` |
|      |   C,2   | C,3   |
|      `--[*,2]--`       |
`------------------------`

The largest set will be used when returning the required state map. For example, [B,2] + [B,*] = [B,*] because [B,*] encompasses [B,2]. This means [*,*] encompasses everything. 'userID' is the ID of the user performing this request, so $ME can be replaced.

type SliceRanges

type SliceRanges [][2]int64

func (SliceRanges) ClosestInDirection

func (r SliceRanges) ClosestInDirection(i int64, towardsZero bool) (closestIndex int64)

ClosestInDirection returns the index position of a range bound that is closest to `i`, heading either towards 0 or towards infinity. If there is no range boundary in that direction, -1 is returned. For example:

[0,20] i=25,towardsZero=true => 20
[0,20] i=15,towardsZero=true => 0
[0,20] i=15,towardsZero=false => 20
[0,20] i=25,towardsZero=false => -1
[0,20],[40,60] i=25,towardsZero=true => 20
[0,20],[40,60] i=25,towardsZero=false => 40
[0,20],[40,60] i=40,towardsZero=true => 40
[20,40] i=40,towardsZero=true => 20

func (SliceRanges) Delta

func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)

Delta returns the ranges which are unchanged, added and removed. Intelligently handles overlaps.

func (SliceRanges) Inside

func (r SliceRanges) Inside(i int64) ([2]int64, bool)

Inside returns true if i is inside the range

func (SliceRanges) SliceInto

func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer

Slice into this range, returning subslices of slice

func (SliceRanges) Valid

func (r SliceRanges) Valid() bool

type SortableRooms

type SortableRooms struct {
	// contains filtered or unexported fields
}

SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of room IDs to current index positions after sorting.

func NewSortableRooms

func NewSortableRooms(finder RoomFinder, rooms []string) *SortableRooms

func (*SortableRooms) Add

func (s *SortableRooms) Add(roomID string) bool

Add a room to the list. Returns true if the room was added.

func (*SortableRooms) Get

func (s *SortableRooms) Get(index int) string

func (*SortableRooms) IndexOf

func (s *SortableRooms) IndexOf(roomID string) (int, bool)

func (*SortableRooms) Len

func (s *SortableRooms) Len() int64

func (*SortableRooms) Remove

func (s *SortableRooms) Remove(roomID string) int

func (*SortableRooms) RoomIDs

func (s *SortableRooms) RoomIDs() []string

func (*SortableRooms) Sort

func (s *SortableRooms) Sort(sortBy []string) error

func (*SortableRooms) Subslice

func (s *SortableRooms) Subslice(i, j int64) Subslicer

type Subslicer

type Subslicer interface {
	Len() int64
	Subslice(i, j int64) Subslicer
}

Directories

Path Synopsis
package extensions contains the interface and concrete implementations for all sliding sync extensions
package extensions contains the interface and concrete implementations for all sliding sync extensions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL