Documentation ¶
Index ¶
- type Notifier
- func (n *Notifier) CurrentPosition() types.StreamingToken
- func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener
- func (n *Notifier) IsSharedUser(userA, userB string) bool
- func (n *Notifier) JoinedUsers(roomID string) (userIDs []string)
- func (n *Notifier) Load(ctx context.Context, db storage.Database) error
- func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs []string) error
- func (n *Notifier) OnNewAccountData(userID string, posUpdate types.StreamingToken)
- func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, ...)
- func (n *Notifier) OnNewInvite(posUpdate types.StreamingToken, wakeUserID string)
- func (n *Notifier) OnNewKeyChange(posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string)
- func (n *Notifier) OnNewNotificationData(userID string, posUpdate types.StreamingToken)
- func (n *Notifier) OnNewPeek(roomID, userID, deviceID string, posUpdate types.StreamingToken)
- func (n *Notifier) OnNewPresence(posUpdate types.StreamingToken, userID string)
- func (n *Notifier) OnNewReceipt(roomID string, posUpdate types.StreamingToken)
- func (n *Notifier) OnNewSendToDevice(userID string, deviceIDs []string, posUpdate types.StreamingToken)
- func (n *Notifier) OnNewTyping(roomID string, posUpdate types.StreamingToken)
- func (n *Notifier) OnRetirePeek(roomID, userID, deviceID string, posUpdate types.StreamingToken)
- func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)
- func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken)
- func (n *Notifier) SharedUsers(userID string) []string
- type UserDeviceStream
- type UserDeviceStreamListener
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier will wake up sleeping requests when there is some new data. It does not tell requests what that data is, only the sync position which they can use to get at it. This is done to prevent races whereby we tell the caller the event, but the token has already advanced by the time they fetch it, resulting in missed events.
func NewNotifier ¶
func NewNotifier() *Notifier
NewNotifier creates a new notifier set to the given sync position. In order for this to be of any use, the Notifier needs to be told all rooms and the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func (*Notifier) CurrentPosition ¶
func (n *Notifier) CurrentPosition() types.StreamingToken
CurrentPosition returns the current sync position
func (*Notifier) GetListener ¶
func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener
GetListener returns a UserStreamListener that can be used to wait for updates for a user. Must be closed. notify for anything before sincePos
func (*Notifier) IsSharedUser ¶
func (*Notifier) JoinedUsers ¶
func (*Notifier) LoadRooms ¶
LoadRooms loads the membership states required to notify users correctly.
func (*Notifier) OnNewAccountData ¶
func (n *Notifier) OnNewAccountData( userID string, posUpdate types.StreamingToken, )
func (*Notifier) OnNewEvent ¶
func (n *Notifier) OnNewEvent( ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, posUpdate types.StreamingToken, )
OnNewEvent is called when a new event is received from the room server. Must only be called from a single goroutine, to avoid races between updates which could set the current sync position incorrectly. Chooses which user sync streams to update by a provided *gomatrixserverlib.Event (based on the users in the event's room), a roomID directly, or a list of user IDs, prioritised by parameter ordering. posUpdate contains the latest position(s) for one or more types of events. If a position in posUpdate is 0, it means no updates are available of that type. Typically a consumer supplies a posUpdate with the latest sync position for the event type it handles, leaving other fields as 0.
func (*Notifier) OnNewInvite ¶
func (n *Notifier) OnNewInvite( posUpdate types.StreamingToken, wakeUserID string, )
func (*Notifier) OnNewKeyChange ¶
func (n *Notifier) OnNewKeyChange( posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, )
func (*Notifier) OnNewNotificationData ¶
func (n *Notifier) OnNewNotificationData( userID string, posUpdate types.StreamingToken, )
func (*Notifier) OnNewPeek ¶
func (n *Notifier) OnNewPeek( roomID, userID, deviceID string, posUpdate types.StreamingToken, )
func (*Notifier) OnNewPresence ¶
func (n *Notifier) OnNewPresence( posUpdate types.StreamingToken, userID string, )
func (*Notifier) OnNewReceipt ¶
func (n *Notifier) OnNewReceipt( roomID string, posUpdate types.StreamingToken, )
OnNewReceipt updates the current position
func (*Notifier) OnNewSendToDevice ¶
func (n *Notifier) OnNewSendToDevice( userID string, deviceIDs []string, posUpdate types.StreamingToken, )
func (*Notifier) OnNewTyping ¶
func (n *Notifier) OnNewTyping( roomID string, posUpdate types.StreamingToken, )
OnNewReceipt updates the current position
func (*Notifier) OnRetirePeek ¶
func (n *Notifier) OnRetirePeek( roomID, userID, deviceID string, posUpdate types.StreamingToken, )
func (*Notifier) PeekingDevices ¶
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)
func (*Notifier) SetCurrentPosition ¶
func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken)
SetCurrentPosition sets the current streaming positions. This must be called directly after NewNotifier and initialising the streams.
func (*Notifier) SharedUsers ¶
type UserDeviceStream ¶
type UserDeviceStream struct { UserID string DeviceID string // contains filtered or unexported fields }
UserDeviceStream represents a communication mechanism between the /sync request goroutine and the underlying sync server goroutines. Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() updates.
func NewUserDeviceStream ¶
func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream
NewUserDeviceStream creates a new user stream
func (*UserDeviceStream) Broadcast ¶
func (s *UserDeviceStream) Broadcast(pos types.StreamingToken)
Broadcast a new sync position for this user.
func (*UserDeviceStream) GetListener ¶
func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener
GetListener returns UserStreamListener that a sync request can use to wait for new updates with. UserStreamListener must be closed
func (*UserDeviceStream) NumWaiting ¶
func (s *UserDeviceStream) NumWaiting() uint
NumWaiting returns the number of goroutines waiting for waiting for updates. Used for metrics and testing.
func (*UserDeviceStream) TimeOfLastNonEmpty ¶
func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time
TimeOfLastNonEmpty returns the last time that the number of waiting listeners was non-empty, may be time.Now() if number of waiting listeners is currently non-empty.
type UserDeviceStreamListener ¶
type UserDeviceStreamListener struct {
// contains filtered or unexported fields
}
UserDeviceStreamListener allows a sync request to wait for updates for a user.
func (*UserDeviceStreamListener) Close ¶
func (s *UserDeviceStreamListener) Close()
Close cleans up resources used
func (*UserDeviceStreamListener) GetNotifyChannel ¶
func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{}
GetNotifyChannel returns a channel that is closed when there may be an update for the user. sincePos specifies from which point we want to be notified about. If there has already been an update after sincePos we'll return a closed channel immediately.
func (*UserDeviceStreamListener) GetSyncPosition ¶
func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken
GetSyncPosition returns last sync position which the UserStream was notified about