Documentation ¶
Index ¶
- Constants
- Variables
- func ConsumeRepoStreamLite2(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error
- func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error
- func SequenceForEvent(evt *XRPCStreamEvent) int64
- type DbPersistence
- func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error
- func (p *DbPersistence) Flush(ctx context.Context) error
- func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromRepoAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error)
- func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error)
- func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (p *DbPersistence) Shutdown(context.Context) error
- func (p *DbPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error
- type DiskPersistOptions
- type DiskPersistence
- func (dp *DiskPersistence) Flush(ctx context.Context) error
- func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, ...) (*int64, error)
- func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent))
- func (dp *DiskPersistence) Shutdown(ctx context.Context) error
- func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error
- type ErrorFrame
- type EventHeader
- type EventManager
- func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
- func (em *EventManager) Shutdown(ctx context.Context) error
- func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, ...) (<-chan *XRPCStreamEvent, func(), error)
- func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error
- type EventPersistence
- type InstrumentedRepoStreamCallbacks
- type LiteStreamHandleFunc
- type LogFileRef
- type MemPersister
- func (mp *MemPersister) Flush(ctx context.Context) error
- func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (mp *MemPersister) Shutdown(context.Context) error
- func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error
- type Operation
- type Options
- type PebblePersist
- func (pp *PebblePersist) Flush(context.Context) error
- func (pp *PebblePersist) GCThread(ctx context.Context)
- func (pp *PebblePersist) GarbageCollect(ctx context.Context) error
- func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error)
- func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent))
- func (pp *PebblePersist) Shutdown(context.Context) error
- func (pp *PebblePersist) TakeDownRepo(ctx context.Context, usr models.Uid) error
- type PebblePersistOptions
- type PersistenceBatchItem
- type RepoEventRecord
- type RepoStreamCallbacks
- type Scheduler
- type Subscriber
- type UserAction
- type XRPCStreamEvent
- type YoloPersister
- func (yp *YoloPersister) Flush(ctx context.Context) error
- func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
- func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
- func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
- func (yp *YoloPersister) Shutdown(ctx context.Context) error
- func (yp *YoloPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error
Constants ¶
const ( EvtFlagTakedown = 1 << iota EvtFlagRebased )
const ( EvtKindErrorFrame = -1 EvtKindMessage = 1 )
Variables ¶
var ( AccountStatusActive = "active" AccountStatusTakendown = "takendown" AccountStatusSuspended = "suspended" AccountStatusDeleted = "deleted" AccountStatusDeactivated = "deactivated" )
var ( ErrPlaybackShutdown = fmt.Errorf("playback shutting down") ErrCaughtUp = fmt.Errorf("caught up") )
var DefaultPebblePersistOptions = PebblePersistOptions{ PersistDuration: time.Minute * 20, GCPeriod: time.Minute * 5, MaxBytes: 1024 * 1024 * 1024, }
var ErrNoLast = errors.New("no last event")
var ErrNoSeq = errors.New("event has no sequence number")
Functions ¶
func ConsumeRepoStreamLite2 ¶
func HandleRepoStream ¶
func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error
HandleRepoStream con is source of events sched gets AddWork for each event log may be nil for default logger
func SequenceForEvent ¶
func SequenceForEvent(evt *XRPCStreamEvent) int64
Types ¶
type DbPersistence ¶
type DbPersistence struct {
// contains filtered or unexported fields
}
func NewDbPersistence ¶
func (*DbPersistence) AddItemToBatch ¶
func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error
func (*DbPersistence) Persist ¶
func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*DbPersistence) Playback ¶
func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*DbPersistence) RecordFromHandleChange ¶
func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromRepoAccount ¶
func (p *DbPersistence) RecordFromRepoAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromRepoCommit ¶
func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromRepoIdentity ¶
func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error)
func (*DbPersistence) RecordFromTombstone ¶
func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error)
func (*DbPersistence) SetEventBroadcaster ¶
func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*DbPersistence) TakeDownRepo ¶
type DiskPersistOptions ¶
type DiskPersistOptions struct { UIDCacheSize int DIDCacheSize int EventsPerFile int64 WriteBufferSize int Retention time.Duration }
func DefaultDiskPersistOptions ¶
func DefaultDiskPersistOptions() *DiskPersistOptions
type DiskPersistence ¶
type DiskPersistence struct {
// contains filtered or unexported fields
}
func NewDiskPersistence ¶
func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error)
func (*DiskPersistence) Persist ¶
func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*DiskPersistence) Playback ¶
func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*DiskPersistence) PlaybackLogfiles ¶
func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error)
func (*DiskPersistence) SetEventBroadcaster ¶
func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent))
func (*DiskPersistence) TakeDownRepo ¶
type ErrorFrame ¶
func (*ErrorFrame) MarshalCBOR ¶
func (t *ErrorFrame) MarshalCBOR(w io.Writer) error
func (*ErrorFrame) UnmarshalCBOR ¶
func (t *ErrorFrame) UnmarshalCBOR(r io.Reader) (err error)
type EventHeader ¶
func (*EventHeader) MarshalCBOR ¶
func (t *EventHeader) MarshalCBOR(w io.Writer) error
func (*EventHeader) UnmarshalCBOR ¶
func (t *EventHeader) UnmarshalCBOR(r io.Reader) (err error)
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
func NewEventManager ¶
func NewEventManager(persister EventPersistence) *EventManager
func (*EventManager) AddEvent ¶
func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error
func (*EventManager) Subscribe ¶
func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error)
func (*EventManager) TakeDownRepo ¶
type EventPersistence ¶
type EventPersistence interface { Persist(ctx context.Context, e *XRPCStreamEvent) error Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error TakeDownRepo(ctx context.Context, usr models.Uid) error Flush(context.Context) error Shutdown(context.Context) error SetEventBroadcaster(func(*XRPCStreamEvent)) }
Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels
type InstrumentedRepoStreamCallbacks ¶
type InstrumentedRepoStreamCallbacks struct { Next func(ctx context.Context, xev *XRPCStreamEvent) error // contains filtered or unexported fields }
func NewInstrumentedRepoStreamCallbacks ¶
func NewInstrumentedRepoStreamCallbacks(limiters []*slidingwindow.Limiter, next func(ctx context.Context, xev *XRPCStreamEvent) error) *InstrumentedRepoStreamCallbacks
func (*InstrumentedRepoStreamCallbacks) EventHandler ¶
func (rsc *InstrumentedRepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type LiteStreamHandleFunc ¶
type MemPersister ¶
type MemPersister struct {
// contains filtered or unexported fields
}
MemPersister is the most naive implementation of event persistence This EventPersistence option works fine with all event types ill do better later
func NewMemPersister ¶
func NewMemPersister() *MemPersister
func (*MemPersister) Persist ¶
func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*MemPersister) Playback ¶
func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*MemPersister) SetEventBroadcaster ¶
func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))
func (*MemPersister) TakeDownRepo ¶
type Options ¶
type Options struct { MaxBatchSize int MinBatchSize int MaxTimeBetweenFlush time.Duration CheckBatchInterval time.Duration UIDCacheSize int DIDCacheSize int PlaybackBatchSize int HydrationConcurrency int }
func DefaultOptions ¶
func DefaultOptions() *Options
type PebblePersist ¶
type PebblePersist struct {
// contains filtered or unexported fields
}
func NewPebblePersistance ¶
func NewPebblePersistance(opts *PebblePersistOptions) (*PebblePersist, error)
Create a new EventPersistence which stores data in pebbledb nil opts is ok
func (*PebblePersist) GCThread ¶
func (pp *PebblePersist) GCThread(ctx context.Context)
example; ``` pp := NewPebblePersistance("/tmp/foo.pebble") go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) ```
func (*PebblePersist) GarbageCollect ¶
func (pp *PebblePersist) GarbageCollect(ctx context.Context) error
func (*PebblePersist) GetLast ¶
func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error)
func (*PebblePersist) Persist ¶
func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*PebblePersist) Playback ¶
func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*PebblePersist) SetEventBroadcaster ¶
func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent))
func (*PebblePersist) TakeDownRepo ¶
type PebblePersistOptions ¶
type PebblePersistOptions struct { // path where pebble will create a directory full of files DbPath string // Throw away posts older than some time ago PersistDuration time.Duration // Throw away old posts every so often GCPeriod time.Duration // MaxBytes is what we _try_ to keep disk usage under MaxBytes uint64 }
type PersistenceBatchItem ¶
type PersistenceBatchItem struct { Record *RepoEventRecord Event *XRPCStreamEvent }
type RepoEventRecord ¶
type RepoEventRecord struct { Seq uint `gorm:"primarykey"` Rev string Since *string Commit *models.DbCID Prev *models.DbCID NewHandle *string // NewHandle is only set if this is a handle change event Time time.Time Blobs []byte Repo models.Uid Type string Rebase bool // Active and Status are only set on RepoAccount events Active bool Status *string Ops []byte }
type RepoStreamCallbacks ¶
type RepoStreamCallbacks struct { RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error Error func(evt *ErrorFrame) error }
func (*RepoStreamCallbacks) EventHandler ¶
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error
type Scheduler ¶
type Scheduler interface { AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error Shutdown() }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
type XRPCStreamEvent ¶
type XRPCStreamEvent struct { Error *ErrorFrame RepoCommit *comatproto.SyncSubscribeRepos_Commit RepoHandle *comatproto.SyncSubscribeRepos_Handle RepoIdentity *comatproto.SyncSubscribeRepos_Identity RepoInfo *comatproto.SyncSubscribeRepos_Info RepoMigrate *comatproto.SyncSubscribeRepos_Migrate RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone RepoAccount *comatproto.SyncSubscribeRepos_Account LabelLabels *comatproto.LabelSubscribeLabels_Labels LabelInfo *comatproto.LabelSubscribeLabels_Info // some private fields for internal routing perf PrivUid models.Uid `json:"-" cborgen:"-"` PrivPdsId uint `json:"-" cborgen:"-"` PrivRelevantPds []uint `json:"-" cborgen:"-"` Preserialized []byte `json:"-" cborgen:"-"` }
func (*XRPCStreamEvent) Deserialize ¶
func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error
func (*XRPCStreamEvent) Preserialize ¶
func (evt *XRPCStreamEvent) Preserialize() error
serialize content into Preserialized cache
func (*XRPCStreamEvent) Sequence ¶
func (evt *XRPCStreamEvent) Sequence() int64
type YoloPersister ¶
type YoloPersister struct {
// contains filtered or unexported fields
}
YoloPersister is used for benchmarking, it has no persistence, it just emits events and forgets them
func NewYoloPersister ¶
func NewYoloPersister() *YoloPersister
func (*YoloPersister) Persist ¶
func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error
func (*YoloPersister) Playback ¶
func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
func (*YoloPersister) SetEventBroadcaster ¶
func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))