events

package
v0.0.0-...-388bbc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2025 License: Apache-2.0, MIT Imports: 22 Imported by: 58

Documentation

Index

Constants

View Source
const (
	EvtKindErrorFrame = -1
	EvtKindMessage    = 1
)

Variables

View Source
var (
	AccountStatusActive      = "active"
	AccountStatusTakendown   = "takendown"
	AccountStatusSuspended   = "suspended"
	AccountStatusDeleted     = "deleted"
	AccountStatusDeactivated = "deactivated"
)
View Source
var (
	ErrPlaybackShutdown = fmt.Errorf("playback shutting down")
	ErrCaughtUp         = fmt.Errorf("caught up")
)
View Source
var ErrNoSeq = errors.New("event has no sequence number")

Functions

func HandleRepoStream

func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error

HandleRepoStream con is source of events sched gets AddWork for each event log may be nil for default logger

func SequenceForEvent

func SequenceForEvent(evt *XRPCStreamEvent) int64

Types

type ErrorFrame

type ErrorFrame struct {
	Error   string `cborgen:"error"`
	Message string `cborgen:"message"`
}

func (*ErrorFrame) MarshalCBOR

func (t *ErrorFrame) MarshalCBOR(w io.Writer) error

func (*ErrorFrame) UnmarshalCBOR

func (t *ErrorFrame) UnmarshalCBOR(r io.Reader) (err error)

type EventHeader

type EventHeader struct {
	Op      int64  `cborgen:"op"`
	MsgType string `cborgen:"t,omitempty"`
}

func (*EventHeader) MarshalCBOR

func (t *EventHeader) MarshalCBOR(w io.Writer) error

func (*EventHeader) UnmarshalCBOR

func (t *EventHeader) UnmarshalCBOR(r io.Reader) (err error)

type EventManager

type EventManager struct {
	// contains filtered or unexported fields
}

func NewEventManager

func NewEventManager(persister EventPersistence) *EventManager

func (*EventManager) AddEvent

func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error

func (*EventManager) Shutdown

func (em *EventManager) Shutdown(ctx context.Context) error

func (*EventManager) Subscribe

func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error)

func (*EventManager) TakeDownRepo

func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error

type EventPersistence

type EventPersistence interface {
	Persist(ctx context.Context, e *XRPCStreamEvent) error
	Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
	TakeDownRepo(ctx context.Context, usr models.Uid) error
	Flush(context.Context) error
	Shutdown(context.Context) error

	SetEventBroadcaster(func(*XRPCStreamEvent))
}

Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels

type InstrumentedRepoStreamCallbacks

type InstrumentedRepoStreamCallbacks struct {
	Next func(ctx context.Context, xev *XRPCStreamEvent) error
	// contains filtered or unexported fields
}

func NewInstrumentedRepoStreamCallbacks

func NewInstrumentedRepoStreamCallbacks(limiters []*slidingwindow.Limiter, next func(ctx context.Context, xev *XRPCStreamEvent) error) *InstrumentedRepoStreamCallbacks

func (*InstrumentedRepoStreamCallbacks) EventHandler

type MemPersister

type MemPersister struct {
	// contains filtered or unexported fields
}

MemPersister is the most naive implementation of event persistence This EventPersistence option works fine with all event types ill do better later

func NewMemPersister

func NewMemPersister() *MemPersister

func (*MemPersister) Flush

func (mp *MemPersister) Flush(ctx context.Context) error

func (*MemPersister) Persist

func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error

func (*MemPersister) Playback

func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error

func (*MemPersister) SetEventBroadcaster

func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent))

func (*MemPersister) Shutdown

func (mp *MemPersister) Shutdown(context.Context) error

func (*MemPersister) TakeDownRepo

func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error

type Operation

type Operation struct {
	// contains filtered or unexported fields
}

type RepoStreamCallbacks

type RepoStreamCallbacks struct {
	RepoCommit    func(evt *comatproto.SyncSubscribeRepos_Commit) error
	RepoSync      func(evt *comatproto.SyncSubscribeRepos_Sync) error
	RepoHandle    func(evt *comatproto.SyncSubscribeRepos_Handle) error
	RepoIdentity  func(evt *comatproto.SyncSubscribeRepos_Identity) error
	RepoAccount   func(evt *comatproto.SyncSubscribeRepos_Account) error
	RepoInfo      func(evt *comatproto.SyncSubscribeRepos_Info) error
	RepoMigrate   func(evt *comatproto.SyncSubscribeRepos_Migrate) error
	RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
	LabelLabels   func(evt *comatproto.LabelSubscribeLabels_Labels) error
	LabelInfo     func(evt *comatproto.LabelSubscribeLabels_Info) error
	Error         func(evt *ErrorFrame) error
}

func (*RepoStreamCallbacks) EventHandler

func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error

type Scheduler

type Scheduler interface {
	AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error
	Shutdown()
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

type XRPCStreamEvent

type XRPCStreamEvent struct {
	Error         *ErrorFrame
	RepoCommit    *comatproto.SyncSubscribeRepos_Commit
	RepoSync      *comatproto.SyncSubscribeRepos_Sync
	RepoHandle    *comatproto.SyncSubscribeRepos_Handle
	RepoIdentity  *comatproto.SyncSubscribeRepos_Identity
	RepoInfo      *comatproto.SyncSubscribeRepos_Info
	RepoMigrate   *comatproto.SyncSubscribeRepos_Migrate
	RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone
	RepoAccount   *comatproto.SyncSubscribeRepos_Account
	LabelLabels   *comatproto.LabelSubscribeLabels_Labels
	LabelInfo     *comatproto.LabelSubscribeLabels_Info

	// some private fields for internal routing perf
	PrivUid         models.Uid `json:"-" cborgen:"-"`
	PrivPdsId       uint       `json:"-" cborgen:"-"`
	PrivRelevantPds []uint     `json:"-" cborgen:"-"`
	Preserialized   []byte     `json:"-" cborgen:"-"`
}

func (*XRPCStreamEvent) Deserialize

func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error

func (*XRPCStreamEvent) GetSequence

func (evt *XRPCStreamEvent) GetSequence() (int64, bool)

func (*XRPCStreamEvent) Preserialize

func (evt *XRPCStreamEvent) Preserialize() error

serialize content into Preserialized cache

func (*XRPCStreamEvent) Sequence

func (evt *XRPCStreamEvent) Sequence() int64

func (*XRPCStreamEvent) Serialize

func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL