infra

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownMessageType = errors.New("unknown message type")

ErrUnknownMessageType is returned when the message type is not recognized.

Functions

func NewAlloraClient

func NewAlloraClient(rpcURL string, timeout time.Duration) (domain.AlloraClientInterface, error)

func NewFranzClient

func NewFranzClient(seeds []string, user, password string) (*kgo.Client, error)

func NewKafkaClient

func NewKafkaClient(client KafkaClient, router domain.TopicRouter) (domain.StreamingClient, error)

Update the constructor to accept a TopicRouter.

func NewPgProcessedBlock

func NewPgProcessedBlock(db DBPool) (domain.ProcessedBlockRepositoryInterface, error)

NewPgProcessedBlock creates a new instance of pgProcessedBlock

Types

type AlloraClient

type AlloraClient struct {
	// contains filtered or unexported fields
}

func (*AlloraClient) GetBlockByHeight

func (a *AlloraClient) GetBlockByHeight(ctx context.Context, height int64) (*coretypes.ResultBlock, error)

GetBlockByHeight implements domain.AlloraClientInterface.

func (*AlloraClient) GetBlockResults

func (a *AlloraClient) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error)

GetBlockResults implements domain.AlloraClientInterface.

func (*AlloraClient) GetHeader

func (a *AlloraClient) GetHeader(ctx context.Context, height int64) (*coretypes.ResultHeader, error)

func (*AlloraClient) GetLatestBlockHeight

func (a *AlloraClient) GetLatestBlockHeight(ctx context.Context) (int64, error)

GetLatestBlockHeight implements domain.AlloraClientInterface.

type DBPool

type DBPool interface {
	QueryRow(ctx context.Context, sql string, args ...interface{}) RowInterface
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}

DBPool defines the interface for the database pool. This allows us to mock the database interactions in tests.

type KafkaClient

type KafkaClient interface {
	Produce(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error))
	Flush(ctx context.Context) error
	Close()
}

type RPCClient

type RPCClient interface {
	Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
	BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
	Status(ctx context.Context) (*coretypes.ResultStatus, error)
	Header(ctx context.Context, height *int64) (*coretypes.ResultHeader, error)
}

type RowInterface

type RowInterface = pgx.Row

type TopicRouterImpl

type TopicRouterImpl struct {
	// contains filtered or unexported fields
}

TopicRouterImpl determines the Kafka topic for a given message type.

func NewTopicRouter

func NewTopicRouter(mapping map[string]string) *TopicRouterImpl

NewTopicRouter initializes a new TopicRouter with the provided mappings.

func (*TopicRouterImpl) GetTopic

func (tr *TopicRouterImpl) GetTopic(msgType string) (string, error)

GetTopic returns the Kafka topic for the given message type.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL