staging_importer

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2021 License: MIT Imports: 15 Imported by: 0

README

Vivo Scramjet

module github.com/vivo-community/vivo scramjet ?

A persistent cache of arbitrary json objects associated with an id, that can be validated against a service and, when valid, examined for changes.

This makes it possible to gather some entities for ingest into a store of some sort - and be able to send only adds, updates or deletes to that store.

as library (API)

  • Staging Table

import (
	sj "gitlab.oit.duke.edu/scholars/staging_importer"
)
...

	typeName := "person"
  // 1) add data
	person1 := TestPerson{Id: "per0000001", Name: "Test1"}
	person2 := TestPerson{Id: "per0000002", Name: "Test2"}
	// must use anything of interface 'Storeable'
	pass1 := sj.Packet{Id: sj.Identifier{Id: person1.Id, Type: typeName}, Obj: person1}
	pass2 := sj.Packet{Id: sj.Identifier{Id: person2.Id, Type: typeName}, Obj: person2}

	people := []sj.Storeable{pass1, pass2}
	err := sj.BulkAddStaging(people...)

  // 2) run through a 'validator' function - would likely
  //    be a json schema validator
	alwaysOkay := func(json string) bool { return true }
	list, rejects, _ := sj.FilterTypeStaging("person", alwaysOkay)

  sj.BatchMarkValidInStaging(list)
  sj.BatchMarkInValidInStaging(rejects)

  // 3) Now the valid ones are marked and ready to go into
  //    'resource' table
    ...


    
  • Resources Table

import (
	sj "gitlab.oit.duke.edu/scholars/staging_importer"
)

...

	typeName := "person"
	list := sj.RetrieveValidStaging(typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, list...)

    ...

as executable (CLI)

General Idea

two tables, staging and resources

  • staging: [id+type=uid]

    actions:

    • stash ->
    • validate ->
    • stash and validate ->
  • resources: [uri(type)=uid]

    actions:

    • move over valid (could be updates)
    • get actual updates (only)

Basic structure

image of basic structure

Tables

image of tables

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DBPool *pgxpool.Pool
View Source
var Name string

Functions

func AddStagingResource

func AddStagingResource(obj interface{}, id string, typeName string) error

only add (presumed existence already checked)

func BatchDeleteResourcesFromResources

func BatchDeleteResourcesFromResources(resources ...Identifiable) error

func BatchDeleteStagingFromResources

func BatchDeleteStagingFromResources(resources ...Identifiable) error

func BatchMarkInvalidInStaging

func BatchMarkInvalidInStaging(resources []Identifiable) error

func BatchMarkValidInStaging

func BatchMarkValidInStaging(resources []Identifiable) error

func BulkAddStaging

func BulkAddStaging(items ...Storeable) error

func BulkAddStagingForDelete

func BulkAddStagingForDelete(items ...Identifiable) error

func BulkAddStagingResources

func BulkAddStagingResources(resources ...StagingResource) error

func BulkMoveStagingTypeToResources

func BulkMoveStagingTypeToResources(typeName string, items ...StagingResource) error

NOTE: only need 'typeName' param for clearing out from staging

func BulkRemoveResources

func BulkRemoveResources(items ...Identifiable) error

func BulkRemoveStagingDeletedFromResources

func BulkRemoveStagingDeletedFromResources(typeName string) error

func ClearAllResources

func ClearAllResources() error

func ClearAllStaging

func ClearAllStaging() error

func ClearDeletedFromStaging

func ClearDeletedFromStaging(id string, typeName string) error

func ClearResourceType

func ClearResourceType(typeName string) error

func ClearStagingType

func ClearStagingType(typeName string) error

call where valid = true? (after transfering to resources)

func ClearStagingTypeDeletes

func ClearStagingTypeDeletes(typeName string) error

func ClearStagingTypeValid

func ClearStagingTypeValid(typeName string) error

leave the is_valid = false for investigation

func DeleteFromStaging

func DeleteFromStaging(res StagingResource) error

func DropResources

func DropResources() error

func DropStaging

func DropStaging() error

func FilterTypeStaging

func FilterTypeStaging(typeName string, validator ValidatorFunc) ([]Identifiable, []Identifiable)

NOTE: this needs a 'typeName' param because it assumes validator is different per type

func GetDbName

func GetDbName() string

func GetMaxUpdatedAt

func GetMaxUpdatedAt(typeName string) time.Time

func GetPool

func GetPool() *pgxpool.Pool

func IntakeInChunks

func IntakeInChunks(ins ChunkableIntakeConfig) error

func MakeConnectionPool

func MakeConnectionPool(conf Config) error

NOTE: Prepared statements can be manually created with the Prepare method. However, this is rarely necessary because pgx includes an automatic statement cache by default

func MakeResourceSchema

func MakeResourceSchema()

NOTE: this calls Fatalf with errors

func MakeStagingSchema

func MakeStagingSchema()

func MarkInvalidInStaging

func MarkInvalidInStaging(res Storeable) error

TODO: should probably batch these when validating and mark valid, invalid in groups of 500 or something

func MarkValidInStaging

func MarkValidInStaging(res StagingResource) error

func ProcessOutake

func ProcessOutake(proc OutakeProcessConfig) error

func ProcessSingleStaging

func ProcessSingleStaging(item Identifiable, validator ValidatorFunc) error

func ProcessTypeStaging

func ProcessTypeStaging(typeName string, validator ValidatorFunc) error

func RemoveStagingDeletedFromResources

func RemoveStagingDeletedFromResources(id string, typeName string) error

func ResourceCount

func ResourceCount(typeName string) int

func ResourceTableExists

func ResourceTableExists() bool

TODO: the 'table_catalog' changes

func SaveResource

func SaveResource(obj Storeable) error

only does one at a time (not typically used)

func SaveStagingResource

func SaveStagingResource(obj Storeable) error

is there a need for this function?

func SaveStagingResourceDirect

func SaveStagingResourceDirect(res StagingResource, typeName string) error

func StagingDeleteCount

func StagingDeleteCount(typeName string) int

NOTE: only used in test - for verification

func StagingResourceExists

func StagingResourceExists(id string, typeName string) bool

returns false if error - maybe should not

func StagingTableExists

func StagingTableExists() bool

NOTE: could call Fatalf

func StashStaging

func StashStaging(docs ...Storeable) error

Types

type ChunkableIntakeConfig

type ChunkableIntakeConfig struct {
	Count     int
	ChunkSize int
	JustTest  bool
	TypeName  string
	ListMaker IntakeListMaker
	Progress  ProgressChecker
	Inspector JustTestingInspector
}

type Config

type Config struct {
	Database DatabaseInfo
}

more flexible?

type DatabaseInfo

type DatabaseInfo struct {
	Server         string
	Port           int
	Database       string
	User           string
	Password       string
	MaxConnections int
	AcquireTimeout int
}

type DeleteChecker

type DeleteChecker func([]string)

type DiffProcessConfig

type DiffProcessConfig struct {
	TypeName          string
	ExistingListMaker ExistingListMaker
	ListMaker         OutakeListMaker
	JustTest          bool
}

to look for diffs for duid (for instance) both lists have to be sent in

type ExistingListMaker

type ExistingListMaker func() (error, []Resource)

type Identifiable

type Identifiable interface {
	Identifier() Identifier
}

func RetrieveDeletedStaging

func RetrieveDeletedStaging(typeName string) ([]Identifiable, error)

type Identifier

type Identifier struct {
	Id, Type string
}

type IntakeListMaker

type IntakeListMaker func(int) ([]Storeable, error)

type JustTestingInspector

type JustTestingInspector func(...interface{})

type OutakeListMaker

type OutakeListMaker func() ([]string, error)

maybe interface instead of func type in struct?

type OutakeProcessConfig

type OutakeProcessConfig struct {
	TypeName  string
	ListMaker OutakeListMaker
	JustTest  bool
	Checker   DeleteChecker
	Inspector JustTestingInspector
}

type Packet

type Packet struct {
	Id  Identifier
	Obj interface{} // this will be serialized
}

func (Packet) Identifier

func (p Packet) Identifier() Identifier

func (Packet) Object

func (ps Packet) Object() interface{}

type ProgressChecker

type ProgressChecker func(int)

type Resource

type Resource struct {
	Id        string       `db:"id"`
	Type      string       `db:"type"`
	Hash      string       `db:"hash"`
	Data      pgtype.JSON  `db:"data"`
	DataB     pgtype.JSONB `db:"data_b"`
	CreatedAt time.Time    `db:"created_at"`
	UpdatedAt time.Time    `db:"updated_at"`
}

this is the raw structure in the database two json columms: * 'data' can be used for change comparison with hash * 'data_b' can be used for searches

func RetrieveTypeResources

func RetrieveTypeResources(typeName string) ([]Resource, error)

TODO: could just send in date - leave it up to library user to determine how it's figured out

func RetrieveTypeResourcesLimited

func RetrieveTypeResourcesLimited(typeName string, limit int) ([]Resource, error)

func (Resource) Identifier

func (res Resource) Identifier() Identifier

type StagingResource

type StagingResource struct {
	Id       string       `db:"id"`
	Type     string       `db:"type"`
	Data     []byte       `db:"data"`
	IsValid  sql.NullBool `db:"is_valid"`
	ToDelete sql.NullBool `db:"to_delete"`
}

NOTE: just making json []byte instead of pgtype.JSON

func RetrieveInvalidStaging

func RetrieveInvalidStaging(typeName string) []StagingResource

func RetrieveSingleStaging

func RetrieveSingleStaging(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingDelete

func RetrieveSingleStagingDelete(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingValid

func RetrieveSingleStagingValid(id string, typeName string) (StagingResource, error)

func RetrieveTypeStaging

func RetrieveTypeStaging(typeName string) []StagingResource

Staging ...

func RetrieveValidStaging

func RetrieveValidStaging(typeName string) []StagingResource

func (StagingResource) Identifier

func (res StagingResource) Identifier() Identifier

kind of like dual primary key

type Storeable

type Storeable interface {
	Identifier() Identifier
	Object() interface{}
}

type Stub

type Stub struct {
	Id Identifier
}

func (Stub) Identifier

func (s Stub) Identifier() Identifier

type ValidatorFunc

type ValidatorFunc func(json string) bool

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL