Documentation
¶
Index ¶
- Constants
- Variables
- func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error
- func SequenceForEvent(evt *XRPCStreamEvent) int64
- type ErrorFrame
- type EventHeader
- type EventManager
- func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
- func (em *EventManager) Shutdown(ctx context.Context) error
- func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, ...) (<-chan *XRPCStreamEvent, func(), error)
- func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error
- type EventPersistence
- type InstrumentedRepoStreamCallbacks
- type MemPersister
- func (mp *MemPersister) Flush(ctx context.Context) error
- func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (mp *MemPersister) Shutdown(context.Context) error
- func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error
- type Operation
- type RepoStreamCallbacks
- type Scheduler
- type Subscriber
- type XRPCStreamEvent
Constants ¶
View Source
const ( EvtKindErrorFrame = -1 EvtKindMessage = 1 )
Variables ¶
View Source
var ( AccountStatusActive = "active" AccountStatusTakendown = "takendown" AccountStatusSuspended = "suspended" AccountStatusDeleted = "deleted" AccountStatusDeactivated = "deactivated" )
View Source
var ( ErrPlaybackShutdown = fmt.Errorf("playback shutting down") ErrCaughtUp = fmt.Errorf("caught up") )
View Source
var ErrNoSeq = errors.New("event has no sequence number")
Functions ¶
func HandleRepoStream ¶
func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error
HandleRepoStream con is source of events sched gets AddWork for each event log may be nil for default logger
func SequenceForEvent ¶
func SequenceForEvent(evt *XRPCStreamEvent) int64
Types ¶
type ErrorFrame ¶
func (*ErrorFrame) MarshalCBOR ¶
func (t *ErrorFrame) MarshalCBOR(w io.Writer) error
func (*ErrorFrame) UnmarshalCBOR ¶
func (t *ErrorFrame) UnmarshalCBOR(r io.Reader) (err error)
type EventHeader ¶
func (*EventHeader) MarshalCBOR ¶
func (t *EventHeader) MarshalCBOR(w io.Writer) error
func (*EventHeader) UnmarshalCBOR ¶
func (t *EventHeader) UnmarshalCBOR(r io.Reader) (err error)
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
func NewEventManager ¶
func NewEventManager(persister EventPersistence) *EventManager
func (*EventManager) AddEvent ¶
func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
func (*EventManager) Subscribe ¶
func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error)
func (*EventManager) TakeDownRepo ¶
type EventPersistence ¶
type EventPersistence interface { Persist(ctx context.Context, e *XRPCStreamEvent) error Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error TakeDownRepo(ctx context.Context, usr models.Uid) error Flush(context.Context) error Shutdown(context.Context) error SetEventBroadcaster(func(*XRPCStreamEvent)) }
Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels
type InstrumentedRepoStreamCallbacks ¶
type InstrumentedRepoStreamCallbacks struct { Next func(ctx context.Context, xev *XRPCStreamEvent) error // contains filtered or unexported fields }
func NewInstrumentedRepoStreamCallbacks ¶
func NewInstrumentedRepoStreamCallbacks(limiters []*slidingwindow.Limiter, next func(ctx context.Context, xev *XRPCStreamEvent) error) *InstrumentedRepoStreamCallbacks
func (*InstrumentedRepoStreamCallbacks) EventHandler ¶
func (rsc *InstrumentedRepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type MemPersister ¶
type MemPersister struct {
// contains filtered or unexported fields
}
MemPersister is the most naive implementation of event persistence This EventPersistence option works fine with all event types ill do better later
func NewMemPersister ¶
func NewMemPersister() *MemPersister
func (*MemPersister) Persist ¶
func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*MemPersister) Playback ¶
func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*MemPersister) SetEventBroadcaster ¶
func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*MemPersister) TakeDownRepo ¶
type RepoStreamCallbacks ¶
type RepoStreamCallbacks struct { RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error Error func(evt *ErrorFrame) error }
func (*RepoStreamCallbacks) EventHandler ¶
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type Scheduler ¶
type Scheduler interface { AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error Shutdown() }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
type XRPCStreamEvent ¶
type XRPCStreamEvent struct { Error *ErrorFrame RepoCommit *comatproto.SyncSubscribeRepos_Commit RepoSync *comatproto.SyncSubscribeRepos_Sync RepoHandle *comatproto.SyncSubscribeRepos_Handle RepoIdentity *comatproto.SyncSubscribeRepos_Identity RepoInfo *comatproto.SyncSubscribeRepos_Info RepoMigrate *comatproto.SyncSubscribeRepos_Migrate RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone RepoAccount *comatproto.SyncSubscribeRepos_Account LabelLabels *comatproto.LabelSubscribeLabels_Labels LabelInfo *comatproto.LabelSubscribeLabels_Info // some private fields for internal routing perf PrivUid models.Uid `json:"-" cborgen:"-"` PrivPdsId uint `json:"-" cborgen:"-"` PrivRelevantPds []uint `json:"-" cborgen:"-"` Preserialized []byte `json:"-" cborgen:"-"` }
func (*XRPCStreamEvent) Deserialize ¶
func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error
func (*XRPCStreamEvent) GetSequence ¶
func (evt *XRPCStreamEvent) GetSequence() (int64, bool)
func (*XRPCStreamEvent) Preserialize ¶
func (evt *XRPCStreamEvent) Preserialize() error
serialize content into Preserialized cache
func (*XRPCStreamEvent) Sequence ¶
func (evt *XRPCStreamEvent) Sequence() int64
Click to show internal directories.
Click to hide internal directories.