Documentation ¶
Index ¶
- Constants
- Variables
- func MockVCS(commits []*vcsinfo.LongCommit, depsContentMap map[string]string, ...) vcsinfo.VCS
- func Register(id string, constructor Constructor)
- type BTIStore
- type Constructor
- type FileSystemSource
- type GoogleStorageSource
- type Ingester
- type IngestionStore
- type MockVCSImpl
- func (m MockVCSImpl) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error)
- func (m MockVCSImpl) Details(ctx context.Context, hash string, getBranches bool) (*vcsinfo.LongCommit, error)
- func (m MockVCSImpl) From(start time.Time) []string
- func (m MockVCSImpl) GetFile(ctx context.Context, fileName, commitHash string) (string, error)
- func (m MockVCSImpl) IndexOf(ctx context.Context, hash string) (int, error)
- func (m MockVCSImpl) LastNIndex(N int) []*vcsinfo.IndexCommit
- func (m MockVCSImpl) Range(begin, end time.Time) []*vcsinfo.IndexCommit
- func (m MockVCSImpl) ResolveCommit(ctx context.Context, commitHash string) (string, error)
- func (m MockVCSImpl) SetSecondaryRepo(secVCS vcsinfo.VCS, extractor depot_tools.DEPSExtractor)
- func (m MockVCSImpl) Update(ctx context.Context, pull, allBranches bool) error
- type Processor
- type ResultFileLocation
- type Source
Constants ¶
const ( // TABLE_FILES_PROCESSED is the table to keep track of processed files. TABLE_FILES_PROCESSED = "files-processed" // COLFAM_FILES_PROCESSED is the column family used to keep track of processed files. COLFAM_FILES_PROCESSED = "fproc" // COL_STATE is the column used to keep track of processed files. COL_STATE = "st" )
const ( MEASUREMENT_INGESTION = "ingestion" TAG_INGESTION_METRIC = "metric" TAG_INGESTER_ID = "ingester" TAG_INGESTER_SOURCE = "source" )
Tag names used to collect metrics.
const (
// Limit the number of times the ingester tries to get a file before giving up.
MAX_URI_GET_TRIES = 4
)
const PROCESSED_FILES_BUCKET = "processed_files"
BoltDB bucket where MD5 hashes of processed files are stored.
Variables ¶
var ( // VAL_TRUE is a true value. VAL_TRUE = []byte("t") // BigTableConfig describes the tables and column families used by this // package. It can be used by bt.InitBigtable to set up the tables. BigTableConfig = bt.TableConfig{ TABLE_FILES_PROCESSED: { COLFAM_FILES_PROCESSED, }, } )
var ( // IgnoreResultsFileErr can be returned by the Process function of a processor to // indicated that this file should be considered ignored. It is up to the processor // to write to the log. IgnoreResultsFileErr = errors.New("Ignore this file.") )
Functions ¶
func MockVCS ¶
func MockVCS(commits []*vcsinfo.LongCommit, depsContentMap map[string]string, pathContentMap map[string]string) vcsinfo.VCS
MockVCS returns an instance of VCS that returns the commits passed as arguments. To control the GetFile function use these two parameters:
depsContentMap maps commits from a hash to a dependency file. pathContentMap maps file names to string content.
Currently the GetFile function will only consider the fileName or the hash but not a combination of both. The fileName has priority.
func Register ¶
func Register(id string, constructor Constructor)
Register registers the given constructor to create an instance of a Processor.
Types ¶
type BTIStore ¶
type BTIStore struct {
// contains filtered or unexported fields
}
BTIStore implementes the IngestionStore interface.
func (*BTIStore) ContainsResultFileHash ¶
ContainsResultFileHash implements the IngestionStore interface.
func (*BTIStore) SetResultFileHash ¶
SetResultFileHash implements the IngestionStore interface.
type Constructor ¶
type Constructor func(vcs vcsinfo.VCS, config *sharedconfig.IngesterConfig, client *http.Client, eventBus eventbus.EventBus) (Processor, error)
Constructor is the signature that has to be implemented to register a Processor implementation to be instantiated by name from a config struct.
vcs is an instance that might be shared across multiple ingesters. config is ususally parsed from a JSON5 file. client can be assumed to be ready to serve the needs of the resulting Processor. eventBus is the eventbus to be used by the ingester (optional).
type FileSystemSource ¶
type FileSystemSource struct {
// contains filtered or unexported fields
}
FileSystemSource implements the Source interface to read from the local file system.
func (*FileSystemSource) Poll ¶
func (f *FileSystemSource) Poll(startTime, endTime int64) <-chan ResultFileLocation
See Source interface.
func (*FileSystemSource) SetEventChannel ¶
func (f *FileSystemSource) SetEventChannel(resultCh chan<- ResultFileLocation) error
SetEventChannel implements the Source interface.
type GoogleStorageSource ¶
type GoogleStorageSource struct {
// contains filtered or unexported fields
}
GoogleStorageSource implements the Source interface for Google Storage.
func (*GoogleStorageSource) Poll ¶
func (g *GoogleStorageSource) Poll(startTime, endTime int64) <-chan ResultFileLocation
See Source interface.
func (*GoogleStorageSource) SetEventChannel ¶
func (g *GoogleStorageSource) SetEventChannel(resultCh chan<- ResultFileLocation) error
SetEventChannel implements the Source interface.
type Ingester ¶
type Ingester struct {
// contains filtered or unexported fields
}
Ingester is the main type that drives ingestion for a single type.
func IngestersFromConfig ¶
func IngestersFromConfig(ctx context.Context, config *sharedconfig.Config, client *http.Client, eventBus eventbus.EventBus, ingestionStore IngestionStore) ([]*Ingester, error)
IngestersFromConfig creates a list of ingesters from a config struct. Usually the struct is created from parsing a config file. client is assumed to be suitable for the given application. If e.g. the processors of the current application require an authenticated http client, then it is expected that client meets these requirements.
func NewIngester ¶
func NewIngester(ingesterID string, ingesterConf *sharedconfig.IngesterConfig, vcs vcsinfo.VCS, sources []Source, processor Processor, ingestionStore IngestionStore, eventBus eventbus.EventBus) (*Ingester, error)
NewIngester creates a new ingester with the given id and configuration around the supplied vcs (version control system), input sources and Processor instance. The ingester is event driven by storage events with a background process that polls the storage locations. The given eventBus cannot be nil and must be shared with the sources that are passed. To only do polling-based ingestion use an in-memory eventbus (created via eventbus.New()). To drive ingestion from storage events use a PubSub-based eventbus (created via the gevent.New(...) function).
type IngestionStore ¶
type IngestionStore interface { // Clear completely clears the datastore. Mostly used for testing. Clear() error // SetResultFileHash sets the given md5 hash in the database. SetResultFileHash(md5 string) error // ContainsResultFileHash returns true if the provided md5 hash is in the DB. ContainsResultFileHash(md5 string) (bool, error) // Close closes the ingestion store. Close() error }
IngestionStore keeps track of files being ingested based on their MD5 hashes.
func NewBTIStore ¶
func NewBTIStore(projectID, bigTableInstance, nameSpace string) (IngestionStore, error)
NewBTIStore creates a BigTable backed implemenation of IngestionStore. nameSpace is a prefix that is added to every row key to allow multitenancy.
type MockVCSImpl ¶
type MockVCSImpl struct {
// contains filtered or unexported fields
}
func (MockVCSImpl) ByIndex ¶
func (m MockVCSImpl) ByIndex(ctx context.Context, N int) (*vcsinfo.LongCommit, error)
func (MockVCSImpl) Details ¶
func (m MockVCSImpl) Details(ctx context.Context, hash string, getBranches bool) (*vcsinfo.LongCommit, error)
func (MockVCSImpl) LastNIndex ¶
func (m MockVCSImpl) LastNIndex(N int) []*vcsinfo.IndexCommit
func (MockVCSImpl) Range ¶
func (m MockVCSImpl) Range(begin, end time.Time) []*vcsinfo.IndexCommit
func (MockVCSImpl) ResolveCommit ¶
func (MockVCSImpl) SetSecondaryRepo ¶
func (m MockVCSImpl) SetSecondaryRepo(secVCS vcsinfo.VCS, extractor depot_tools.DEPSExtractor)
type Processor ¶
type Processor interface { // Process ingests a single result file. Process(ctx context.Context, resultsFile ResultFileLocation) error }
Processor is the core of an ingester. It takes instances of ResultFileLocation and ingests them. It is responsible for the storage of ingested data.
type ResultFileLocation ¶
type ResultFileLocation interface { // Open returns a reader that allows to read the content of the file. Open() (io.ReadCloser, error) // Name returns the full path of the file. The last segment is usually the // the file name. Name() string // StorageIDs return the bucket and object ID for the given location. StorageIDs() (string, string) // MD5 returns the MD5 hash of the content of the file. MD5() string // Timestamp returns the timestamp when the file was last updated. TimeStamp() int64 // Content returns the content of the file if has been read or nil otherwise. Content() []byte }
ResultFileLocation is an abstract interface to a file like object that contains results that need to be ingested.
func FileSystemResult ¶
func FileSystemResult(path, rootDir string) (ResultFileLocation, error)
FileSystemResult returns a ResultFileLocation for files. path is the path where the target file resides and rootDir is the root of all paths.
type Source ¶
type Source interface { // ID returns a unique identifier for this source. ID() string // Poll returns a channel to read all the result files that originated between // the given timestamps in seconds since the epoch. Poll(startTime, endTime int64) <-chan ResultFileLocation // SetEventChannel configures storage events and sets up routines to send // new results to the given channel. SetEventChannel(resultCh chan<- ResultFileLocation) error }
Source defines an ingestion source that returns lists of result files either through polling or in an event driven mode.
func NewFileSystemSource ¶
func NewGoogleStorageSource ¶
func NewGoogleStorageSource(baseName, bucket, rootDir string, client *http.Client, eventBus eventbus.EventBus) (Source, error)
NewGoogleStorageSource returns a new instance of GoogleStorageSource based on the bucket and directory provided. The id is used to identify the Source and is generally the same id as the ingester.