events

package
v0.0.0-...-82305ef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0, MIT Imports: 34 Imported by: 39

Documentation

Index

Constants

View Source
const (
	EvtFlagTakedown = 1 << iota
	EvtFlagRebased
)
View Source
const (
	EvtKindErrorFrame = -1
	EvtKindMessage    = 1
)

Variables

View Source
var (
	AccountStatusActive      = "active"
	AccountStatusTakendown   = "takendown"
	AccountStatusSuspended   = "suspended"
	AccountStatusDeleted     = "deleted"
	AccountStatusDeactivated = "deactivated"
)
View Source
var (
	ErrPlaybackShutdown = fmt.Errorf("playback shutting down")
	ErrCaughtUp         = fmt.Errorf("caught up")
)
View Source
var DefaultPebblePersistOptions = PebblePersistOptions{
	PersistDuration: time.Minute * 20,
	GCPeriod:        time.Minute * 5,
	MaxBytes:        1024 * 1024 * 1024,
}
View Source
var ErrNoLast = errors.New("no last event")
View Source
var ErrNoSeq = errors.New("event has no sequence number")

Functions

func ConsumeRepoStreamLite2

func ConsumeRepoStreamLite2(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error

func HandleRepoStream

func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error

HandleRepoStream con is source of events sched gets AddWork for each event log may be nil for default logger

func SequenceForEvent

func SequenceForEvent(evt *XRPCStreamEvent) int64

Types

type DbPersistence

type DbPersistence struct {
	// contains filtered or unexported fields
}

func NewDbPersistence

func NewDbPersistence(db *gorm.DB, cs carstore.CarStore, options *Options) (*DbPersistence, error)

func (*DbPersistence) AddItemToBatch

func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error

func (*DbPersistence) Flush

func (p *DbPersistence) Flush(ctx context.Context) error

func (*DbPersistence) Persist

func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*DbPersistence) Playback

func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*DbPersistence) RecordFromHandleChange

func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error)

func (*DbPersistence) RecordFromRepoAccount

func (*DbPersistence) RecordFromRepoCommit

func (*DbPersistence) RecordFromRepoIdentity

func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error)

func (*DbPersistence) RecordFromTombstone

func (*DbPersistence) SetEventBroadcaster

func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent))

func (*DbPersistence) Shutdown

func (p *DbPersistence) Shutdown(context.Context) error

func (*DbPersistence) TakeDownRepo

func (p *DbPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error

type DiskPersistOptions

type DiskPersistOptions struct {
	UIDCacheSize    int
	DIDCacheSize    int
	EventsPerFile   int64
	WriteBufferSize int
	Retention       time.Duration
}

func DefaultDiskPersistOptions

func DefaultDiskPersistOptions() *DiskPersistOptions

type DiskPersistence

type DiskPersistence struct {
	// contains filtered or unexported fields
}

func NewDiskPersistence

func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error)

func (*DiskPersistence) Flush

func (dp *DiskPersistence) Flush(ctx context.Context) error

func (*DiskPersistence) Persist

func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*DiskPersistence) Playback

func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*DiskPersistence) PlaybackLogfiles

func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error)

func (*DiskPersistence) SetEventBroadcaster

func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent))

func (*DiskPersistence) Shutdown

func (dp *DiskPersistence) Shutdown(ctx context.Context) error

func (*DiskPersistence) TakeDownRepo

func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error

type ErrorFrame

type ErrorFrame struct {
	Error   string `cborgen:"error"`
	Message string `cborgen:"message"`
}

func (*ErrorFrame) MarshalCBOR

func (t *ErrorFrame) MarshalCBOR(w io.Writer) error

func (*ErrorFrame) UnmarshalCBOR

func (t *ErrorFrame) UnmarshalCBOR(r io.Reader) (err error)

type EventHeader

type EventHeader struct {
	Op      int64  `cborgen:"op"`
	MsgType string `cborgen:"t"`
}

func (*EventHeader) MarshalCBOR

func (t *EventHeader) MarshalCBOR(w io.Writer) error

func (*EventHeader) UnmarshalCBOR

func (t *EventHeader) UnmarshalCBOR(r io.Reader) (err error)

type EventManager

type EventManager struct {
	// contains filtered or unexported fields
}

func NewEventManager

func NewEventManager(persister EventPersistence) *EventManager

func (*EventManager) AddEvent

func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error

func (*EventManager) Shutdown

func (em *EventManager) Shutdown(ctx context.Context) error

func (*EventManager) Subscribe

func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error)

func (*EventManager) TakeDownRepo

func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error

type EventPersistence

type EventPersistence interface {
	Persist(ctx context.Context, e *XRPCStreamEvent) error
	Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
	TakeDownRepo(ctx context.Context, usr models.Uid) error
	Flush(context.Context) error
	Shutdown(context.Context) error

	SetEventBroadcaster(func(*XRPCStreamEvent))
}

Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels

type InstrumentedRepoStreamCallbacks

type InstrumentedRepoStreamCallbacks struct {
	Next func(ctx context.Context, xev *XRPCStreamEvent) error
	// contains filtered or unexported fields
}

func NewInstrumentedRepoStreamCallbacks

func NewInstrumentedRepoStreamCallbacks(limiters []*slidingwindow.Limiter, next func(ctx context.Context, xev *XRPCStreamEvent) error) *InstrumentedRepoStreamCallbacks

func (*InstrumentedRepoStreamCallbacks) EventHandler

type LiteStreamHandleFunc

type LiteStreamHandleFunc func(op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error

type LogFileRef

type LogFileRef struct {
	gorm.Model
	Path     string
	Archived bool
	SeqStart int64
}

type MemPersister

type MemPersister struct {
	// contains filtered or unexported fields
}

MemPersister is the most naive implementation of event persistence This EventPersistence option works fine with all event types ill do better later

func NewMemPersister

func NewMemPersister() *MemPersister

func (*MemPersister) Flush

func (mp *MemPersister) Flush(ctx context.Context) error

func (*MemPersister) Persist

func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*MemPersister) Playback

func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*MemPersister) SetEventBroadcaster

func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))

func (*MemPersister) Shutdown

func (mp *MemPersister) Shutdown(context.Context) error

func (*MemPersister) TakeDownRepo

func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error

type Operation

type Operation struct {
	// contains filtered or unexported fields
}

type Options

type Options struct {
	MaxBatchSize         int
	MinBatchSize         int
	MaxTimeBetweenFlush  time.Duration
	CheckBatchInterval   time.Duration
	UIDCacheSize         int
	DIDCacheSize         int
	PlaybackBatchSize    int
	HydrationConcurrency int
}

func DefaultOptions

func DefaultOptions() *Options

type PebblePersist

type PebblePersist struct {
	// contains filtered or unexported fields
}

func NewPebblePersistance

func NewPebblePersistance(opts *PebblePersistOptions) (*PebblePersist, error)

Create a new EventPersistence which stores data in pebbledb nil opts is ok

func (*PebblePersist) Flush

func (pp *PebblePersist) Flush(context.Context) error

func (*PebblePersist) GCThread

func (pp *PebblePersist) GCThread(ctx context.Context)

example; ``` pp := NewPebblePersistance("/tmp/foo.pebble") go pp.GCThread(context.Background(), 48 * time.Hour, 5 * time.Minute) ```

func (*PebblePersist) GarbageCollect

func (pp *PebblePersist) GarbageCollect(ctx context.Context) error

func (*PebblePersist) GetLast

func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error)

func (*PebblePersist) Persist

func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*PebblePersist) Playback

func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*PebblePersist) SetEventBroadcaster

func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent))

func (*PebblePersist) Shutdown

func (pp *PebblePersist) Shutdown(context.Context) error

func (*PebblePersist) TakeDownRepo

func (pp *PebblePersist) TakeDownRepo(ctx context.Context, usr models.Uid) error

type PebblePersistOptions

type PebblePersistOptions struct {
	// path where pebble will create a directory full of files
	DbPath string

	// Throw away posts older than some time ago
	PersistDuration time.Duration

	// Throw away old posts every so often
	GCPeriod time.Duration

	// MaxBytes is what we _try_ to keep disk usage under
	MaxBytes uint64
}

type PersistenceBatchItem

type PersistenceBatchItem struct {
	Record *RepoEventRecord
	Event  *XRPCStreamEvent
}

type RepoEventRecord

type RepoEventRecord struct {
	Seq       uint `gorm:"primarykey"`
	Rev       string
	Since     *string
	Commit    *models.DbCID
	Prev      *models.DbCID
	NewHandle *string // NewHandle is only set if this is a handle change event

	Time   time.Time
	Blobs  []byte
	Repo   models.Uid
	Type   string
	Rebase bool

	// Active and Status are only set on RepoAccount events
	Active bool
	Status *string

	Ops []byte
}

type RepoStreamCallbacks

type RepoStreamCallbacks struct {
	RepoCommit    func(evt *comatproto.SyncSubscribeRepos_Commit) error
	RepoHandle    func(evt *comatproto.SyncSubscribeRepos_Handle) error
	RepoIdentity  func(evt *comatproto.SyncSubscribeRepos_Identity) error
	RepoAccount   func(evt *comatproto.SyncSubscribeRepos_Account) error
	RepoInfo      func(evt *comatproto.SyncSubscribeRepos_Info) error
	RepoMigrate   func(evt *comatproto.SyncSubscribeRepos_Migrate) error
	RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
	LabelLabels   func(evt *comatproto.LabelSubscribeLabels_Labels) error
	LabelInfo     func(evt *comatproto.LabelSubscribeLabels_Info) error
	Error         func(evt *ErrorFrame) error
}

func (*RepoStreamCallbacks) EventHandler

func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error

type Scheduler

type Scheduler interface {
	AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error
	Shutdown()
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

type UserAction

type UserAction struct {
	gorm.Model

	Usr      models.Uid
	RebaseAt int64
	Takedown bool
}

type XRPCStreamEvent

type XRPCStreamEvent struct {
	Error         *ErrorFrame
	RepoCommit    *comatproto.SyncSubscribeRepos_Commit
	RepoHandle    *comatproto.SyncSubscribeRepos_Handle
	RepoIdentity  *comatproto.SyncSubscribeRepos_Identity
	RepoInfo      *comatproto.SyncSubscribeRepos_Info
	RepoMigrate   *comatproto.SyncSubscribeRepos_Migrate
	RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone
	RepoAccount   *comatproto.SyncSubscribeRepos_Account
	LabelLabels   *comatproto.LabelSubscribeLabels_Labels
	LabelInfo     *comatproto.LabelSubscribeLabels_Info

	// some private fields for internal routing perf
	PrivUid         models.Uid `json:"-" cborgen:"-"`
	PrivPdsId       uint       `json:"-" cborgen:"-"`
	PrivRelevantPds []uint     `json:"-" cborgen:"-"`
	Preserialized   []byte     `json:"-" cborgen:"-"`
}

func (*XRPCStreamEvent) Deserialize

func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error

func (*XRPCStreamEvent) Preserialize

func (evt *XRPCStreamEvent) Preserialize() error

serialize content into Preserialized cache

func (*XRPCStreamEvent) Sequence

func (evt *XRPCStreamEvent) Sequence() int64

func (*XRPCStreamEvent) Serialize

func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error

type YoloPersister

type YoloPersister struct {
	// contains filtered or unexported fields
}

YoloPersister is used for benchmarking, it has no persistence, it just emits events and forgets them

func NewYoloPersister

func NewYoloPersister() *YoloPersister

func (*YoloPersister) Flush

func (yp *YoloPersister) Flush(ctx context.Context) error

func (*YoloPersister) Persist

func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*YoloPersister) Playback

func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*YoloPersister) SetEventBroadcaster

func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))

func (*YoloPersister) Shutdown

func (yp *YoloPersister) Shutdown(ctx context.Context) error

func (*YoloPersister) TakeDownRepo

func (yp *YoloPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL