notifier

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier will wake up sleeping requests when there is some new data. It does not tell requests what that data is, only the sync position which they can use to get at it. This is done to prevent races whereby we tell the caller the event, but the token has already advanced by the time they fetch it, resulting in missed events.

func NewNotifier

func NewNotifier(rsAPI api.SyncRoomserverAPI) *Notifier

NewNotifier creates a new notifier set to the given sync position. In order for this to be of any use, the Notifier needs to be told all rooms and the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).

func (*Notifier) CurrentPosition

func (n *Notifier) CurrentPosition() types.StreamingToken

CurrentPosition returns the current sync position

func (*Notifier) GetListener

func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener

GetListener returns a UserStreamListener that can be used to wait for updates for a user. Must be closed. notify for anything before sincePos

func (*Notifier) IsSharedUser

func (n *Notifier) IsSharedUser(userA, userB string) bool

func (*Notifier) JoinedUsers

func (n *Notifier) JoinedUsers(roomID string) (userIDs []string)

func (*Notifier) Load

func (n *Notifier) Load(ctx context.Context, db storage.Database) error

Load the membership states required to notify users correctly.

func (*Notifier) LoadRooms

func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs []string) error

LoadRooms loads the membership states required to notify users correctly.

func (*Notifier) OnNewAccountData

func (n *Notifier) OnNewAccountData(
	userID string, posUpdate types.StreamingToken,
)

func (*Notifier) OnNewEvent

func (n *Notifier) OnNewEvent(
	ev *rstypes.HeaderedEvent, roomID string, userIDs []string,
	posUpdate types.StreamingToken,
)

OnNewEvent is called when a new event is received from the room server. Must only be called from a single goroutine, to avoid races between updates which could set the current sync position incorrectly. Chooses which user sync streams to update by a provided gomatrixserverlib.PDU (based on the users in the event's room), a roomID directly, or a list of user IDs, prioritised by parameter ordering. posUpdate contains the latest position(s) for one or more types of events. If a position in posUpdate is 0, it means no updates are available of that type. Typically a consumer supplies a posUpdate with the latest sync position for the event type it handles, leaving other fields as 0.

func (*Notifier) OnNewInvite

func (n *Notifier) OnNewInvite(
	posUpdate types.StreamingToken, wakeUserID string,
)

func (*Notifier) OnNewKeyChange

func (n *Notifier) OnNewKeyChange(
	posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
)

func (*Notifier) OnNewNotificationData

func (n *Notifier) OnNewNotificationData(
	userID string,
	posUpdate types.StreamingToken,
)

func (*Notifier) OnNewPeek

func (n *Notifier) OnNewPeek(
	roomID, userID, deviceID string,
	posUpdate types.StreamingToken,
)

func (*Notifier) OnNewPresence

func (n *Notifier) OnNewPresence(
	posUpdate types.StreamingToken, userID string,
)

func (*Notifier) OnNewReceipt

func (n *Notifier) OnNewReceipt(
	roomID string,
	posUpdate types.StreamingToken,
)

OnNewReceipt updates the current position

func (*Notifier) OnNewSendToDevice

func (n *Notifier) OnNewSendToDevice(
	userID string, deviceIDs []string,
	posUpdate types.StreamingToken,
)

func (*Notifier) OnNewTyping

func (n *Notifier) OnNewTyping(
	roomID string,
	posUpdate types.StreamingToken,
)

OnNewReceipt updates the current position

func (*Notifier) OnRetirePeek

func (n *Notifier) OnRetirePeek(
	roomID, userID, deviceID string,
	posUpdate types.StreamingToken,
)

func (*Notifier) PeekingDevices

func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)

func (*Notifier) SetCurrentPosition

func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken)

SetCurrentPosition sets the current streaming positions. This must be called directly after NewNotifier and initialising the streams.

func (*Notifier) SharedUsers

func (n *Notifier) SharedUsers(userID string) []string

type UserDeviceStream

type UserDeviceStream struct {
	UserID   string
	DeviceID string
	// contains filtered or unexported fields
}

UserDeviceStream represents a communication mechanism between the /sync request goroutine and the underlying sync server goroutines. Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() updates.

func NewUserDeviceStream

func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream

NewUserDeviceStream creates a new user stream

func (*UserDeviceStream) Broadcast

func (s *UserDeviceStream) Broadcast(pos types.StreamingToken)

Broadcast a new sync position for this user.

func (*UserDeviceStream) GetListener

GetListener returns UserStreamListener that a sync request can use to wait for new updates with. UserStreamListener must be closed

func (*UserDeviceStream) NumWaiting

func (s *UserDeviceStream) NumWaiting() uint

NumWaiting returns the number of goroutines waiting for waiting for updates. Used for metrics and testing.

func (*UserDeviceStream) TimeOfLastNonEmpty

func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time

TimeOfLastNonEmpty returns the last time that the number of waiting listeners was non-empty, may be time.Now() if number of waiting listeners is currently non-empty.

type UserDeviceStreamListener

type UserDeviceStreamListener struct {
	// contains filtered or unexported fields
}

UserDeviceStreamListener allows a sync request to wait for updates for a user.

func (*UserDeviceStreamListener) Close

func (s *UserDeviceStreamListener) Close()

Close cleans up resources used

func (*UserDeviceStreamListener) GetNotifyChannel

func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{}

GetNotifyChannel returns a channel that is closed when there may be an update for the user. sincePos specifies from which point we want to be notified about. If there has already been an update after sincePos we'll return a closed channel immediately.

func (*UserDeviceStreamListener) GetSyncPosition

func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken

GetSyncPosition returns last sync position which the UserStream was notified about

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL