downloader

package
v0.0.0-...-ea4e456 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: MPL-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TimeWindowTaskStateNew     = TimeWindowTaskState{"new"}
	TimeWindowTaskStateStarted = TimeWindowTaskState{"started"}
	TimeWindowTaskStateDone    = TimeWindowTaskState{"done"}
	TimeWindowTaskStateError   = TimeWindowTaskState{"error"}
)

Functions

func IsGlobalEventKindToDownload

func IsGlobalEventKindToDownload(eventKind domain.EventKind) bool

Types

type BootstrapRelaySource

type BootstrapRelaySource interface {
	GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}

type CurrentTimeProvider

type CurrentTimeProvider interface {
	GetCurrentTime() time.Time
}

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

func NewDownloader

func NewDownloader(
	bootstrapRelaySource BootstrapRelaySource,
	relaySource RelaySource,
	publicKeySource PublicKeySource,
	logger logging.Logger,
	metrics Metrics,
	relayDownloaderFactory *RelayDownloaderFactory,
) *Downloader

func (*Downloader) Run

func (d *Downloader) Run(ctx context.Context) error

Will fetch pubkeys from the database and a list of hardcoded kinds from each relay found in out database. These will be used to create tasks that specify nostr filters and contain a time window to control the since and until filter keys. Theses filters are used to start queries for each relay. Events found this way will be published to all subscribers of the downloader publisher.

type Metrics

type Metrics interface {
	ReportNumberOfRelayDownloaders(n int)
	ReportReceivedEvent(address domain.RelayAddress)
}

type PublicKeySource

type PublicKeySource interface {
	GetPublicKeys(ctx context.Context) (PublicKeys, error)
}

type PublicKeys

type PublicKeys struct {
	// contains filtered or unexported fields
}

func NewPublicKeys

func NewPublicKeys(publicKeysToMonitor []domain.PublicKey, publicKeysToMonitorFollowees []domain.PublicKey) PublicKeys

func (PublicKeys) All

func (p PublicKeys) All() []domain.PublicKey

func (PublicKeys) Equal

func (p PublicKeys) Equal(o PublicKeys) bool

func (PublicKeys) PublicKeysToMonitor

func (p PublicKeys) PublicKeysToMonitor() []domain.PublicKey

func (PublicKeys) PublicKeysToMonitorFollowees

func (p PublicKeys) PublicKeysToMonitorFollowees() []domain.PublicKey

type PublicKeysToReplicate

type PublicKeysToReplicate struct {
	// contains filtered or unexported fields
}

func NewPublicKeysToReplicate

func NewPublicKeysToReplicate(authors []domain.PublicKey, tagged []domain.PublicKey) *PublicKeysToReplicate

func (PublicKeysToReplicate) Authors

func (p PublicKeysToReplicate) Authors() []domain.PublicKey

func (PublicKeysToReplicate) Tagged

func (p PublicKeysToReplicate) Tagged() []domain.PublicKey

type ReceivedEventPublisher

type ReceivedEventPublisher interface {
	Publish(relay domain.RelayAddress, event domain.UnverifiedEvent)
}

type RelayConnections

type RelayConnections interface {
	GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
}

type RelayDownloader

type RelayDownloader struct {
	// contains filtered or unexported fields
}

func NewRelayDownloader

func NewRelayDownloader(
	address domain.RelayAddress,
	scheduler Scheduler,
	receivedEventPublisher ReceivedEventPublisher,
	relayConnections RelayConnections,
	logger logging.Logger,
	metrics Metrics,
) *RelayDownloader

func (*RelayDownloader) Start

func (d *RelayDownloader) Start(ctx context.Context) error

Will fetch tasks for the current relay and use them to query it and then publish the event to a pubsub.

type RelayDownloaderFactory

type RelayDownloaderFactory struct {
	// contains filtered or unexported fields
}

func NewRelayDownloaderFactory

func NewRelayDownloaderFactory(
	relayConnections RelayConnections,
	receivedEventPublisher ReceivedEventPublisher,
	scheduler Scheduler,
	logger logging.Logger,
	metrics Metrics,
) *RelayDownloaderFactory

func (*RelayDownloaderFactory) CreateRelayDownloader

func (r *RelayDownloaderFactory) CreateRelayDownloader(address domain.RelayAddress) (*RelayDownloader, error)

type RelaySource

type RelaySource interface {
	GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}

type RelayTaskGenerator

type RelayTaskGenerator struct {
	// contains filtered or unexported fields
}

RelayTaskGenerator maintains 3 TimeWindowTaskGenerator, one for each query type. Each TimeWindowTaskGenerator maintains a list of TimeWindowTaskTracker, one for each time window. Each TimeWindowTaskTracker maintains a list of runningRelayDownloader, one for each concurrency setting. Each TimeWindowTaskTracker uses a TimeWindow

func NewRelayTaskGenerator

func NewRelayTaskGenerator(
	currentTimeProvider CurrentTimeProvider,
	logger logging.Logger,
) (*RelayTaskGenerator, error)

func (*RelayTaskGenerator) AddSubscription

func (t *RelayTaskGenerator) AddSubscription(ctx context.Context, ch chan Task)

func (*RelayTaskGenerator) SendOutTasks

func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bool, error)

type Scheduler

type Scheduler interface {
	GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error)
}

type Task

type Task interface {
	Ctx() context.Context
	Filter() domain.Filter

	OnReceivedEOSE()
	OnError(err error)
}

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

A TaskScheduler is responsible for generating tasks for all relays by maintaing a list of TaskGenerators.

func NewTaskScheduler

func NewTaskScheduler(
	publicKeySource PublicKeySource,
	currentTimeProvider CurrentTimeProvider,
	logger logging.Logger,
) *TaskScheduler

func (*TaskScheduler) GetTasks

func (t *TaskScheduler) GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error)

func (*TaskScheduler) Run

func (t *TaskScheduler) Run(ctx context.Context) error

type TimeWindow

type TimeWindow struct {
	// contains filtered or unexported fields
}

func MustNewTimeWindow

func MustNewTimeWindow(start time.Time, duration time.Duration) TimeWindow

func NewTimeWindow

func NewTimeWindow(start time.Time, duration time.Duration) (TimeWindow, error)

func (TimeWindow) Advance

func (t TimeWindow) Advance() TimeWindow

func (TimeWindow) End

func (t TimeWindow) End() time.Time

func (TimeWindow) Start

func (t TimeWindow) Start() time.Time

type TimeWindowTask

type TimeWindowTask struct {
	// contains filtered or unexported fields
}

func NewTimeWindowTask

func NewTimeWindowTask(ctx context.Context, filter domain.Filter, tracker tracker) *TimeWindowTask

func (*TimeWindowTask) Ctx

func (t *TimeWindowTask) Ctx() context.Context

func (*TimeWindowTask) Filter

func (t *TimeWindowTask) Filter() domain.Filter

func (*TimeWindowTask) OnError

func (t *TimeWindowTask) OnError(err error)

func (*TimeWindowTask) OnReceivedEOSE

func (t *TimeWindowTask) OnReceivedEOSE()

type TimeWindowTaskGenerator

type TimeWindowTaskGenerator struct {
	// contains filtered or unexported fields
}

func NewTimeWindowTaskGenerator

func NewTimeWindowTaskGenerator(
	currentTimeProvider CurrentTimeProvider,
	logger logging.Logger,
) (*TimeWindowTaskGenerator, error)

func (*TimeWindowTaskGenerator) Generate

func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error)

A task generator creates a task tracker per concurrency setting. The tracker will be used to return the corresponding task, if the task is still runnning it will return no task. If the task is done it will discard the current tracker, create a new one and return a new task. Each task generated will be pushed to all subscribers of the scheduler

type TimeWindowTaskState

type TimeWindowTaskState struct {
	// contains filtered or unexported fields
}

type TimeWindowTaskTracker

type TimeWindowTaskTracker struct {
	// contains filtered or unexported fields
}

func NewTimeWindowTaskTracker

func NewTimeWindowTaskTracker(window TimeWindow) (*TimeWindowTaskTracker, error)

func (*TimeWindowTaskTracker) CheckIfDoneAndEnd

func (t *TimeWindowTaskTracker) CheckIfDoneAndEnd() bool

func (*TimeWindowTaskTracker) MaybeStart

func (t *TimeWindowTaskTracker) MaybeStart(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) (Task, bool, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL