sqlite

package
v0.0.0-...-2e97daa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2024 License: MPL-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const TweetCreatedTopic = "tweet_created"

Variables

This section is empty.

Functions

func NewMigrations

func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)

func Open

func Open(conf config.Config) (*sql.DB, error)

Types

type AccountRepository

type AccountRepository struct {
	// contains filtered or unexported fields
}

func NewAccountRepository

func NewAccountRepository(tx *sql.Tx) (*AccountRepository, error)

func (*AccountRepository) Count

func (m *AccountRepository) Count() (int, error)

func (*AccountRepository) GetByAccountID

func (m *AccountRepository) GetByAccountID(accountID accounts.AccountID) (*accounts.Account, error)

func (*AccountRepository) GetByTwitterID

func (m *AccountRepository) GetByTwitterID(twitterID accounts.TwitterID) (*accounts.Account, error)

func (*AccountRepository) Save

func (m *AccountRepository) Save(account *accounts.Account) error

type AdaptersFactoryFn

type AdaptersFactoryFn = GenericAdaptersFactoryFn[app.Adapters]

type BackoffManager

type BackoffManager interface {
	// GetMessageErrorBackoff is used to backoff reprocessing of a single
	// specific message if its processing fails. The first time message
	// processing fails 1 is passed to this function, the second time 2 etc.
	GetMessageErrorBackoff(nackCount int) time.Duration

	// GetNoMessagesBackoff is used to backoff querying for new messages on the
	// queue. The first time in a row where the query returns no messages 1 is
	// passed to this function, then 2 is passed etc.
	GetNoMessagesBackoff(tick int) time.Duration
}

type DefaultBackoffManager

type DefaultBackoffManager struct {
}

func NewDefaultBackoffManager

func NewDefaultBackoffManager() DefaultBackoffManager

func (DefaultBackoffManager) GetMessageErrorBackoff

func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration

func (DefaultBackoffManager) GetNoMessagesBackoff

func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration

type GenericAdaptersFactoryFn

type GenericAdaptersFactoryFn[T any] func(*sql.DB, *sql.Tx) (T, error)

type GenericTransactionProvider

type GenericTransactionProvider[T any] struct {
	// contains filtered or unexported fields
}

func NewGenericTransactionProvider

func NewGenericTransactionProvider[T any](db *sql.DB, fn GenericAdaptersFactoryFn[T]) *GenericTransactionProvider[T]

func (*GenericTransactionProvider[T]) Transact

func (t *GenericTransactionProvider[T]) Transact(ctx context.Context, f func(context.Context, T) error) error

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(uuid string, payload []byte) (Message, error)

func (Message) Payload

func (m Message) Payload() []byte

func (Message) UUID

func (m Message) UUID() string

type MigrationFns

type MigrationFns struct {
	// contains filtered or unexported fields
}

func NewMigrationFns

func NewMigrationFns(db *sql.DB, pubsub *PubSub) *MigrationFns

func (*MigrationFns) CreatePubsubTables

func (m *MigrationFns) CreatePubsubTables(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error

func (*MigrationFns) Initial

func (m *MigrationFns) Initial(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error

type MigrationsStorage

type MigrationsStorage struct {
	// contains filtered or unexported fields
}

func NewMigrationsStorage

func NewMigrationsStorage(db *sql.DB) (*MigrationsStorage, error)

func (*MigrationsStorage) LoadState

func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)

func (*MigrationsStorage) LoadStatus

func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)

func (*MigrationsStorage) SaveState

func (b *MigrationsStorage) SaveState(name string, state migrations.State) error

func (*MigrationsStorage) SaveStatus

func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error

type ProcessedEventRepository

type ProcessedEventRepository struct {
	// contains filtered or unexported fields
}

func NewProcessedEventRepository

func NewProcessedEventRepository(tx *sql.Tx) (*ProcessedEventRepository, error)

func (*ProcessedEventRepository) Save

func (m *ProcessedEventRepository) Save(eventID domain.EventId, twitterID accounts.TwitterID) error

func (*ProcessedEventRepository) WasProcessed

func (m *ProcessedEventRepository) WasProcessed(eventID domain.EventId, twitterID accounts.TwitterID) (bool, error)

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

func NewPubSub

func NewPubSub(db *sql.DB, logger logging.Logger) *PubSub

func (*PubSub) InitializingQueries

func (p *PubSub) InitializingQueries() []string

func (*PubSub) Publish

func (p *PubSub) Publish(topic string, msg Message) error

func (*PubSub) PublishTx

func (p *PubSub) PublishTx(tx *sql.Tx, topic string, msg Message) error

func (*PubSub) QueueLength

func (p *PubSub) QueueLength(topic string) (int, error)

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMessage

type PublicKeyRepository

type PublicKeyRepository struct {
	// contains filtered or unexported fields
}

func NewPublicKeyRepository

func NewPublicKeyRepository(tx *sql.Tx) (*PublicKeyRepository, error)

func (*PublicKeyRepository) Count

func (m *PublicKeyRepository) Count() (int, error)

func (*PublicKeyRepository) Delete

func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error

func (*PublicKeyRepository) DeleteByPublicKey

func (m *PublicKeyRepository) DeleteByPublicKey(publicKey domain.PublicKey) error

func (*PublicKeyRepository) List

func (*PublicKeyRepository) ListByAccountID

func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error)

func (*PublicKeyRepository) ListByPublicKey

func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error)

func (*PublicKeyRepository) Save

func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(pubsub *PubSub, tx *sql.Tx) *Publisher

func (*Publisher) PublishTweetCreated

func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error

type ReceivedMessage

type ReceivedMessage struct {
	Message
	// contains filtered or unexported fields
}

func NewReceivedMessage

func NewReceivedMessage(message Message) *ReceivedMessage

func (*ReceivedMessage) Ack

func (m *ReceivedMessage) Ack() error

func (*ReceivedMessage) Nack

func (m *ReceivedMessage) Nack() error

type SessionRepository

type SessionRepository struct {
	// contains filtered or unexported fields
}

func NewSessionRepository

func NewSessionRepository(tx *sql.Tx) (*SessionRepository, error)

func (*SessionRepository) Delete

func (m *SessionRepository) Delete(id sessions.SessionID) error

func (*SessionRepository) Get

func (*SessionRepository) Save

func (m *SessionRepository) Save(session *sessions.Session) error

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	pubsub *PubSub,
	db *sql.DB,
) *Subscriber

func (*Subscriber) SubscribeToTweetCreated

func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) <-chan *ReceivedMessage

func (*Subscriber) TweetCreatedAnalysis

func (s *Subscriber) TweetCreatedAnalysis(ctx context.Context) (app.TweetCreatedAnalysis, error)

func (*Subscriber) TweetCreatedQueueLength

func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error)

type TestAdapters

type TestAdapters struct {
	SessionRepository        *SessionRepository
	AccountRepository        *AccountRepository
	PublicKeyRepository      *PublicKeyRepository
	ProcessedEventRepository *ProcessedEventRepository
	UserTokensRepository     *UserTokensRepository
	Publisher                *Publisher
}

type TestAdaptersFactoryFn

type TestAdaptersFactoryFn = GenericAdaptersFactoryFn[TestAdapters]

type TestTransactionProvider

type TestTransactionProvider = GenericTransactionProvider[TestAdapters]

func NewTestTransactionProvider

func NewTestTransactionProvider(db *sql.DB, fn TestAdaptersFactoryFn) *TestTransactionProvider

type TestedItems

type TestedItems struct {
	TransactionProvider *TestTransactionProvider
	Subscriber          *Subscriber
	MigrationsStorage   *MigrationsStorage
	PubSub              *PubSub

	MigrationsRunner           *migrations.Runner
	Migrations                 migrations.Migrations
	MigrationsProgressCallback migrations.ProgressCallback
}

type TransactionProvider

type TransactionProvider = GenericTransactionProvider[app.Adapters]

func NewTransactionProvider

func NewTransactionProvider(db *sql.DB, fn AdaptersFactoryFn) *TransactionProvider

type TweetCreatedEventTransport

type TweetCreatedEventTransport struct {
	AccountID string         `json:"accountID"`
	Tweet     TweetTransport `json:"tweet"`
	Event     []byte         `json:"event"`
	CreatedAt time.Time      `json:"createdAt"`
}

type TweetTransport

type TweetTransport struct {
	Text string `json:"text"`
}

type UserTokensRepository

type UserTokensRepository struct {
	// contains filtered or unexported fields
}

func NewUserTokensRepository

func NewUserTokensRepository(tx *sql.Tx) (*UserTokensRepository, error)

func (*UserTokensRepository) Get

func (*UserTokensRepository) Save

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL