pipeline

package
v0.17.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2023 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FileReceiver

type FileReceiver struct {
	// contains filtered or unexported fields
}

FileReceiver accepts an ACH file from a number of pubsub Subscriptions and finds the appropriate aggregator for the shardKey.

func Start

func Start(
	ctx context.Context,
	logger log.Logger,
	cfg *service.Config,
	consul *consul.Client,
	shardRepository shards.Repository,
	httpFiles stream.Subscription,
) (*FileReceiver, error)

func (*FileReceiver) RegisterAdminRoutes

func (fr *FileReceiver) RegisterAdminRoutes(r *admin.Server)

func (*FileReceiver) ReplaceStreamFiles added in v0.17.7

func (fr *FileReceiver) ReplaceStreamFiles(sub stream.Subscription)

func (*FileReceiver) Shutdown

func (fr *FileReceiver) Shutdown()

func (*FileReceiver) Start

func (fr *FileReceiver) Start(ctx context.Context)

type MockXferMerging

type MockXferMerging struct {
	LatestFile   *incoming.ACHFile
	LatestCancel *incoming.CancelACHFile

	Err error
	// contains filtered or unexported fields
}

func (*MockXferMerging) HandleCancel

func (merge *MockXferMerging) HandleCancel(cancel incoming.CancelACHFile) error

func (*MockXferMerging) HandleXfer

func (merge *MockXferMerging) HandleXfer(xfer incoming.ACHFile) error

func (*MockXferMerging) WithEachMerged

func (merge *MockXferMerging) WithEachMerged(f func(int, upload.Agent, *ach.File) error) (*processedFiles, error)

type XferMerging

type XferMerging interface {
	HandleXfer(xfer incoming.ACHFile) error
	HandleCancel(cancel incoming.CancelACHFile) error

	WithEachMerged(f func(int, upload.Agent, *ach.File) error) (*processedFiles, error)
}

XferMerging represents logic for accepting ACH files to be merged together.

The idea is to take Xfers and store them on a filesystem (or other durable storage) prior to a cutoff window. The specific storage could be based on the FileHeader.

On the cutoff trigger WithEachMerged is called to merge files together and offer each merged file for an upload.

func NewMerging

func NewMerging(logger log.Logger, consul *consul.Client, shard service.Shard, cfg service.UploadAgents) (XferMerging, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL