Documentation ¶
Index ¶
- func NewRepo(db *sql.DB) *sqlRepo
- func NewSubscription(cfg *config.Config) (*pubsub.Subscription, error)
- func PublishFiles(pub XferPublisher, xfer *client.Transfer, files []*ach.File) error
- type CanceledTransfer
- type CutoffCallback
- type MockPublisher
- type MockXferMerging
- type Repository
- type Xfer
- type XferAggregator
- type XferMerging
- type XferPublisher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSubscription ¶
func NewSubscription(cfg *config.Config) (*pubsub.Subscription, error)
func PublishFiles ¶
PublishFiles attempts to upload all files to the Pipeline and returns all errors as a base.ErrorList.
All files are attempted to be published as downstream processors are expected to de-duplicate files.
Types ¶
type CanceledTransfer ¶
type CanceledTransfer struct {
TransferID string `json:"transferID"`
}
type CutoffCallback ¶
type CutoffCallback func() error
CutoffCallback is a function called before cutoff processing is performed.
type MockPublisher ¶
type MockPublisher struct { Xfers map[string]Xfer Cancels map[string]CanceledTransfer Err error }
func NewMockPublisher ¶
func NewMockPublisher() *MockPublisher
func (*MockPublisher) Cancel ¶
func (p *MockPublisher) Cancel(msg CanceledTransfer) error
func (*MockPublisher) Shutdown ¶
func (p *MockPublisher) Shutdown(ctx context.Context)
func (*MockPublisher) Upload ¶
func (p *MockPublisher) Upload(xfer Xfer) error
type MockXferMerging ¶
type MockXferMerging struct { LatestXfer *Xfer LatestCancel *CanceledTransfer Err error // contains filtered or unexported fields }
func (*MockXferMerging) HandleCancel ¶
func (merge *MockXferMerging) HandleCancel(cancel CanceledTransfer) error
func (*MockXferMerging) HandleXfer ¶
func (merge *MockXferMerging) HandleXfer(xfer Xfer) error
func (*MockXferMerging) WithEachMerged ¶
func (merge *MockXferMerging) WithEachMerged(func(*ach.File) error) (*processedTransfers, error)
type Repository ¶
type XferAggregator ¶
type XferAggregator struct {
// contains filtered or unexported fields
}
XferAggregator ...
this has a for loop which is triggered on cutoff warning
e.g. 10mins before 30mins before cutoff (10 mins is Moov's window, 30mins is ODFI)
consume as many transfers as possible, then upload.
func NewAggregator ¶
func NewAggregator( cfg *config.Config, agent upload.Agent, repo Repository, merger XferMerging, sub *pubsub.Subscription, cutoffCallbacks []CutoffCallback, ) (*XferAggregator, error)
func (*XferAggregator) RegisterRoutes ¶
func (xfagg *XferAggregator) RegisterRoutes(svc *admin.Server)
func (*XferAggregator) Shutdown ¶
func (xfagg *XferAggregator) Shutdown()
func (*XferAggregator) Start ¶
func (xfagg *XferAggregator) Start(ctx context.Context, cutoffs *schedule.CutoffTimes)
type XferMerging ¶
type XferMerging interface { HandleXfer(xfer Xfer) error HandleCancel(cancel CanceledTransfer) error WithEachMerged(func(*ach.File) error) (*processedTransfers, error) }
XferMerging represents logic for accepting ACH files to be merged together.
The idea is to take Xfers and store them on a filesystem (or other durable storage) prior to a cutoff window. The specific storage could be based on the FileHeader.
On the cutoff trigger WithEachMerged is called to merge files together and offer each merged file for an upload.
func NewMerging ¶
type XferPublisher ¶
type XferPublisher interface { Upload(xfer Xfer) error Cancel(msg CanceledTransfer) error Shutdown(ctx context.Context) }
XferPublisher is an interface for pushing Transfers (and their ACH files) to be uploaded to an ODFI. These implementations can be to push Transfers onto streams (e.g. kafka, rabbitmq) or inmem (the default in our OSS PayGate).
func NewPublisher ¶
func NewPublisher(cfg config.Pipeline) (XferPublisher, error)