README
¶
borges

borges collects and stores Git repositories.
I have always imagined that Paradise will be a kind of library.
Borges is a set of tools for collection and storage of Git repositories at large scale. It is a distributed system, similar to a search engine, that uses a custom repository storage file format and is optimized for saving storage space and keeping repositories up-to-date.
Key concepts
-
Borges producer: a standalone process that reads repository URLs (from RabbitMQ or file) and schedules fetching this repository.
-
Borges consumer: a standalone process that takes URLs from RabbitMQ, clones remote repository and pushes it to the appropriate Rooted Repository in the storage (local filesystem or HDFS).
-
Borges packer: a standalone process that takes repository paths (or URLs) from a file and packs them into siva files (as a Rooted Repository) in the given output directory.
-
Rooted Repository: a standard Git repository that stores all objects from all repositories that share common history, identified by same initial commit. It is stored in Siva file format.
Consumer and Producer run independently, communicating though a RabbitMQ instance and storing repository meta-data in PostgreSQL.
Packer does not need a RabbitMQ or a PostgreSQL instance and is not meant to be used as a pipeline, that's what consumer and producer are meant for.
Read the borges package godoc for further details on how does borges archive the repositories.
CLI
Both producer and consumer are shipped as a single binary,
see borges --help
to get details about the main commands and their options.
Setting up borges
Borges needs a database and a message broker to do its job. It works with a PostgreSQL database by default and uses RabbitMQ. You can use the following environment variables to configure those:
CONFIG_DBUSER
, by default:testing
CONFIG_DBPASS
, by default:testing
CONFIG_DBHOST
, by default:0.0.0.0
CONFIG_DBPORT
, by default:5432
CONFIG_DBNAME
, by default:testing
CONFIG_DBSSLMODE
, by default:disable
CONFIG_DBAPPNAME
, by default: ``CONFIG_DBTIMEOUT
, by default:30s
Other important settings are:
CONFIG_TEMP_DIR
: Local path to store temporal files needed by the Borges consumer, by default:/tmp/sourced
CONFIG_CLEAN_TEMP_DIR
: Delete temporay directory before starting, by default:false
CONFIG_BROKER
: by default:amqp://localhost:5672
CONFIG_ROOT_REPOSITORIES_DIR
: .siva file storage. If no HDFS connection url is provided, this will be a path in local filesystem. Otherwise, it will be an HDFS directory, by default:/tmp/root-repositories
CONFIG_ROOT_REPOSITORIES_TEMP_DIR
: wheresiva.copy
files are temporary placed. Only needed for HDFS.CONFIG_LOCKING
, by default:local:
, other options:etcd:
CONFIG_HDFS
: (host:port) If this property is not provided, all root repositories will be stored into the local filesystem, by default:""
CONFIG_BUCKETSIZE
, by default:0
, number of characters used from the siva file name to create bucket directories. The value0
means that all files will be saved at the same level.
Producer
The producer runs as a service. It determines which repositories should be updated next and enqueues new jobs for them.
To launch the producer you just have to run it with the default configuration:
borges producer
Producer reads mentions from Rover's RabbitMQ queue, but it can also read URLs directly from a file with the special CLI option:
borges producer --source=file --file /path/to/file
The file must contain a url per line, it looks like:
https://github.com/a/repo1
https://github.com/b/repo2.git
http://github.com/c/repo3
http://github.com/d/repo4.git
You can change the priority of jobs produced with --priority
option. It is a number from 0 to 8 where 0 is the lowest priority:
borges producer --source=file --file /path/to/file --priority 8
When jobs fail they're sent to the buried queue. If you want to requeue them, you can pass the --republish-buried
flag (this only works for the mentions
source). For example:
borges producer --republish-buried
So a possible command to launch the producer could be:
$ CONFIG_DBUSER="user" \
CONFIG_DBPASS="pass" \
CONFIG_DBHOST="postgres" \
CONFIG_DBNAME="borges-db" \
CONFIG_BROKER="amqp://guest:guest@rabbitmq:5672" \
borges producer --loglevel=debug
For more details, use borges producer -h
.
Consumer
The consumer runs as a service. It gets jobs from the queue and dispatches them to a goroutine workers pool.
Each job is a request to update a repository. It can be a new or an existing one. The remote repository is fetched (incrementally when possible) and each reference is then pushed to a specific Rooted Repository, dedicated to storing all references from repositories that share the same initial commit.
Note that borges should be the only one creating and writing to the repository storage.
To launch a consumer instance from the command line with default configuration:
borges consumer
You can select the number of workers to use, by default it uses 8:
borges consumer --workers=20
A command you could use to run it could be:
$ CONFIG_TEMP_DIR="/borges/tmp" \
CONFIG_ROOT_REPOSITORIES_DIR="/borges/root-repositories" \
borges consumer --workers=20 --loglevel=debug
For more details, use borges consumer -h
Packer
The packer runs as a one time command getting jobs from a file with a repository path (or URL) per line and distributes these jobs across many workers to group them into Rooted Repositories and pack them as siva files.
This command does not need a PostgreSQL or a RabbitMQ connection and can be used locally without internet connection if all the repositories to pack are local.
Imagine we have the following file repos.txt
with the repositories we want to pack:
git://github.com/yada/yada.git
https://github.com/foo/bar
file:///home/me/some-repo
/home/me/another-repo
If no protocol is specified it will be treated as an absolute path to a repository, which can be a bare repository or a regular git repository.
You can pack the previous repos running this command:
borges pack --file=repos.txt --to=/home/me/packed-repos
With the --to
argument you can specify where you want the siva files stored. If the directory does not exist it will be created. If you omit this argument siva files will be stored in $PWD/repositories
by default.
For more detauls, use borges pack -h
Administration Notes
Both the producer and consumer services will run even if they cannot connect to the queue, or even if the queue is malfunctioning. If the queue does not work, they will just retry until it does.
Quickstart using docker containers
Download the images
Download the latest borges image
docker pull quay.io/srcd/borges
And then the PostgreSQL and RabbitMQ images (you can skip this step if you already have that setup for rovers).
docker pull postgres:9.6-alpine
docker pull rabbitmq:3-management
Running everything
Start RabbitMQ and PostgreSQL (you can skip this step if you already have that setup for rovers.
docker run -d --name postgres -e POSTGRES_PASSWORD=testing -p 5432:5432 -e POSTGRES_USER=testing postgres
docker run -d --hostname rabbitmq --name rabbitmq -p 8081:15672 -p 5672:5672 rabbitmq:3-management
Now, you can start the borges consumer, the component that will be listening for jobs and processing repositories.
docker run --name borges_consumer --link rabbitmq --link postgres \
-v /path/to/store/repos/locally:/borges/root-repositories \
-e CONFIG_DBUSER=testing -e CONFIG_DBPASS=testing \
-e CONFIG_DBHOST=postgres -e CONFIG_DBNAME=testing \
-e CONFIG_BROKER=amqp://guest:guest@rabbitmq:5672/ \
-e CONFIG_ROOT_REPOSITORIES_DIR=/borges/root-repositories \
quay.io/srcd/borges /bin/sh -c "borges init; borges consumer --loglevel=debug --workers=8"
Be sure to replace /path/to/store/repos/locally
with the path on your hard drive where you want your root repositories (as siva files) stored.
Finally, you need to send jobs to the borges consumer using the borges producer. If you have rovers setup already, you may want to use the rovers' mentions as the source.
docker run --name borges_consumer --link rabbitmq --link postgres \
-e CONFIG_DBUSER=testing -e CONFIG_DBPASS=testing \
-e CONFIG_DBHOST=postgres -e CONFIG_DBNAME=testing \
-e CONFIG_BROKER=amqp://guest:guest@rabbitmq:5672/ \
quay.io/srcd/borges borges producer --loglevel=debug
However, you can also process just a specific list of repositories without having to setup rovers on your own. Write the repository URLs in a file, one repository per line and feed it to the borges producer with the file
source. (This example assumes you have a repos.txt
in the current directory).
docker run --name borges_consumer_file --link rabbitmq --link postgres \
-e $(pwd):/opt/borges
-e CONFIG_DBUSER=testing -e CONFIG_DBPASS=testing \
-e CONFIG_DBHOST=postgres -e CONFIG_DBNAME=testing \
-e CONFIG_BROKER=amqp://guest:guest@rabbitmq:5672/ \
quay.io/srcd/borges borges producer --loglevel=debug --source=file --file=/opt/borges/repos.txt
Congratulations, now you have a fully working repository processing pipeline!
Running Borges in Kubernetes
You can use the official Helm chart to deploy Borges in your kubernetes cluster.
Development
Build
rm Makefile.main; rm -rf .ci
to make sure you will have the last Makefile changes.make dependencies
to download vendor dependencies using Glide.make packages
to generate binaries for several platforms.
You will find the built binaries in borges_linux_amd64/borges
and borges_darwin_amd64/borges
.
If you're running borges for the first time, make sure you initialize the schema of the database first. You can do so by running the following command:
borges init
Test
make test
Borges has 2 runtime dependencies and has tests that depend on them:
-
RabbitMQ
Consumers and Producers interact through a Queue. You can run one in Docker with the following command:
docker run -d --hostname rabbit --name rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
Note: a hostname needs to be provided, due to the fact that RabbitMQ stores data according to the host name
-
PostgreSQL
Consumers creates siva files with Rooted Repositories, but all repository metadata is stored in PostgreSQL. You can run one in Docker with the following command:
docker run --name postgres -e POSTGRES_DB=testing -e POSTGRES_USER=testing -e POSTGRES_PASSWORD=testing -p 5432:5432 -d postgres # to check it manually, use docker exec -ti some-postgres psql -U testing
Use make test-coverage
to run all tests and produce a coverage report.
License
GPLv3, see LICENSE
Documentation
¶
Overview ¶
borges archives repositories in a universal git library.
The goal of borges is fetching repositories and maintain them updated. Repositories are arranged in a repository storage where that contains one repository per init commit found.
We define root commit as any commit with no parents (the first commit of a repository). Note that a repository can contain multiple root commits.
For each reference, we define its init commit as the root commit that is reached by following the first parent of each commit in the history. This is the commit that would be obtained with:
$ git rev-list --first-parent <ref> | tail -n 1
When borges fetches a repository, it groups all references by init commit and pushes each group of references to a repository for its init commit.
Index ¶
- Constants
- Variables
- func RepositoryID(endpoints []string, isFork *bool, storer RepositoryStore) (uuid.UUID, error)
- func ResolveCommit(r *git.Repository, h plumbing.Hash) (*object.Commit, error)
- func StoreConfig(r *git.Repository, mr *model.Repository) error
- type Action
- type Archiver
- type Changes
- type Command
- type Consumer
- type Executor
- type Job
- type JobIter
- type Producer
- type Referencer
- type RepositoryStore
- type TemporaryCloner
- type TemporaryRepository
- type Worker
- type WorkerFunc
- type WorkerJob
- type WorkerPool
Constants ¶
const ( Create Action = "create" Update = "update" Delete = "delete" Invalid = "invalid" )
const ( FetchRefSpec = config.RefSpec("refs/*:refs/*") FetchHEAD = config.RefSpec("HEAD:refs/heads/HEAD") )
const TemporaryError = "temporary"
Variables ¶
var ( ErrCleanRepositoryDir = errors.NewKind("cleaning up local repo dir failed") ErrClone = errors.NewKind("cloning %s failed") ErrPushToRootedRepository = errors.NewKind("push to rooted repo %s failed") ErrArchivingRoots = errors.NewKind("archiving %d out of %d roots failed: %s") ErrEndpointsEmpty = errors.NewKind("endpoints is empty") ErrRepositoryIDNotFound = errors.NewKind("repository id not found: %s") ErrChanges = errors.NewKind("error computing changes") ErrAlreadyFetching = errors.NewKind("repository %s was already in a fetching status") ErrSetStatus = errors.NewKind("unable to set repository to status: %s") ErrFatal = errors.NewKind("fatal, %v: stacktrace: %s") )
var ( // ErrObjectTypeNotSupported returned by ResolveCommit when the referenced // object isn't a Commit nor a Tag. ErrObjectTypeNotSupported = errors.NewKind("object type %q not supported") )
Functions ¶
func RepositoryID ¶
func RepositoryID(endpoints []string, isFork *bool, storer RepositoryStore) (uuid.UUID, error)
RepositoryID tries to find a repository by the endpoint into the database. If no repository is found, it creates a new one and returns the ID.
func ResolveCommit ¶
ResolveCommit gets the hash of a commit that is referenced by a tag, per example. The only resolvable objects are Tags and Commits, if the object is not one of them, this method will return an ErrObjectTypeNotSupported. The output hash always will be a Commit hash.
func StoreConfig ¶ added in v0.6.0
func StoreConfig(r *git.Repository, mr *model.Repository) error
Types ¶
type Archiver ¶
type Archiver struct { // TemporaryCloner is used to clone repositories into temporary storage. TemporaryCloner TemporaryCloner // Timeout is the deadline to cancel a job. Timeout time.Duration // Store is the component where repository models are stored. Store RepositoryStore // RootedTransactioner is used to push new references to our repository // storage. RootedTransactioner repository.RootedTransactioner // LockSession is a locker service to prevent concurrent access to the same // rooted reporitories. LockSession lock.Session // contains filtered or unexported fields }
Archiver archives repositories. Archiver instances are thread-safe and can be reused.
See borges documentation for more details about the archiving rules.
func NewArchiver ¶
func NewArchiver(log *logrus.Entry, r RepositoryStore, tx repository.RootedTransactioner, tc TemporaryCloner, ls lock.Session, to time.Duration) *Archiver
type Changes ¶
Changes represents several actions to realize to our root repositories. The map key is the hash of a init commit, and the value is a slice of Command that can be add a new reference, delete a reference or update the hash a reference points to.
func NewChanges ¶
func NewChanges(old, new Referencer) (Changes, error)
NewChanges returns Changes needed to obtain the current state of the repository from a set of old references. The Changes could be create, update or delete. If an old reference has the same name of a new one, but the init commit is different, then the changes will contain a delete command and a create command. If a new reference has more than one init commit, at least one create command per init commit will be created.
Here are all possible cases for up to one reference. We use the notation a<11,01> to refer to reference 'a', pointing to hash '11' with initial commit '01'.
Old New Changes --- --- ------- Ø Ø Ø Ø a<11,01> 01 -> c<a,11> a<11,01> Ø 01 -> d<a,11> a<11,01> a<12,01> 01 -> u<a,11,12> a<11,01> a<11,02> 01 -> d<a,11> | 02 -> c<a,11> (invalid) a<11,01> a<12,02> 01 -> d<a,11> | 02 -> c<a,12>
type Command ¶
Command is the way to represent a change into a reference. It could be: - Create: A new reference is created - Update: A previous reference is updated. This means its head changes. - Delete: A previous reference does not exist now.
type Consumer ¶
type Consumer struct { Notifiers struct { QueueError func(error) } WorkerPool *WorkerPool Queue queue.Queue // contains filtered or unexported fields }
Consumer consumes jobs from a queue and uses multiple workers to process them.
func NewConsumer ¶
func NewConsumer(queue queue.Queue, pool *WorkerPool) *Consumer
NewConsumer creates a new consumer.
type Executor ¶ added in v0.8.0
type Executor struct {
// contains filtered or unexported fields
}
Executor retrieves jobs from an job iterator and passes them to a worker pool to be executed. Executor acts as a producer-consumer in a single component.
func NewExecutor ¶ added in v0.8.0
func NewExecutor( log *logrus.Entry, q queue.Queue, pool *WorkerPool, store RepositoryStore, iter JobIter, ) *Executor
NewExecutor creates a new job executor.
type Job ¶
type Job struct {
RepositoryID uuid.UUID
}
Job represents a borges job to fetch and archive a repository.
type JobIter ¶
type JobIter interface { io.Closer // Next returns the next job. It returns io.EOF if there are no more jobs. Next() (*Job, error) }
JobIter is an iterator of Job.
func NewLineJobIter ¶
func NewLineJobIter(r io.ReadCloser, storer RepositoryStore) JobIter
NewLineJobIter returns a JobIter that returns jobs generated from a reader with a list of repository URLs, one per line.
func NewMentionJobIter ¶
func NewMentionJobIter(q queue.Queue, storer RepositoryStore) JobIter
NewMentionJobIter returns a JobIter that returns jobs generated from mentions received from a queue (e.g. from rovers).
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a service to generate jobs and put them to the queue.
func NewProducer ¶
func NewProducer( log *logrus.Entry, jobIter JobIter, queue queue.Queue, priority queue.Priority, jobRetries int, ) *Producer
NewProducer creates a new producer.
type Referencer ¶
type Referencer interface { // References retrieves a slice of *model.Reference or an error. References() ([]*model.Reference, error) }
Referencer can retrieve reference models (*model.Reference).
func NewGitReferencer ¶
func NewGitReferencer(r *git.Repository) Referencer
NewGitReferencer takes a *git.Repository and returns a Referencer that retrieves any valid reference from it. Symbolic references and references that do not point to commits (possibly through a tag) are silently ignored. It might return an error if any operation fails in the underlying repository.
func NewModelReferencer ¶
func NewModelReferencer(r *model.Repository) Referencer
NewModelReferencer takes a *model.Repository and returns a Referencer that accesses its references. The resulting Referencer never returns an error.
type RepositoryStore ¶ added in v0.14.0
type RepositoryStore interface { // Create inserts a new Repository in the store. Create(repo *model.Repository) error // Get returns a Repository given its ID. Get(id kallax.ULID) (*model.Repository, error) // GetByEndpoints returns the Repositories that have common endpoints with the // list of endpoints passed. GetByEndpoints(endpoints ...string) ([]*model.Repository, error) // SetStatus changes the status of the given repository. SetStatus(repo *model.Repository, status model.FetchStatus) error // SetEndpoints updates the endpoints of the repository. SetEndpoints(repo *model.Repository, endpoints ...string) error // UpdateFailed updates the given repository as failed with the given // status. No modifications are performed to the repository itself // other than setting its status, all the modification to the repo // fields must be done before calling this method. That is, changing // FetchErrorAt and so on should be done manually before. Refer to the // concrete implementation to know what is being updated. UpdateFailed(repo *model.Repository, status model.FetchStatus) error // Update updates the given repository as successfully fetched. // No modifications are performed to the repository other than setting // the Fetched status and the time when it was fetched, all other changes // should be done to the repo before calling this method. Refer to the // concrete implementation to know what is being updated. UpdateFetched(repo *model.Repository, fetchedAt time.Time) error }
RepositoryStore is the access layer to the storage of repositories.
type TemporaryCloner ¶
type TemporaryCloner interface {
Clone(ctx context.Context, id, url string) (TemporaryRepository, error)
}
func NewTemporaryCloner ¶
func NewTemporaryCloner(tmpFs billy.Filesystem) TemporaryCloner
type TemporaryRepository ¶
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker is a worker that processes jobs from a channel.
func NewWorker ¶
func NewWorker(logentry *logrus.Entry, do WorkerFunc, ch chan *WorkerJob) *Worker
NewWorker creates a new Worker. The first parameter is a WorkerContext that will be passed to the processing function on every call. The second parameter is the processing function itself that will be called for every job. The third parameter is a channel that the worker will consume jobs from.
type WorkerFunc ¶ added in v0.13.0
WorkerFunc is the function the workers will execute.
type WorkerJob ¶
type WorkerJob struct { *Job // contains filtered or unexported fields }
A WorkerJob is a job to be passed to the worker. It contains the Job itself and an acknowledger that the worker uses to signal that it finished the job.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a pool of workers that can process jobs.
func NewArchiverWorkerPool ¶
func NewArchiverWorkerPool( log *logrus.Entry, r RepositoryStore, tx repository.RootedTransactioner, tc TemporaryCloner, ls lock.Service, to time.Duration) *WorkerPool
NewArchiverWorkerPool creates a new WorkerPool that uses an Archiver to process jobs. It takes optional start, stop and warn notifier functions that are equal to the Archiver notifiers but with additional WorkerContext.
func NewWorkerPool ¶
func NewWorkerPool(log *logrus.Entry, f WorkerFunc) *WorkerPool
NewWorkerPool creates a new empty worker pool. It takes a function to be used by workers to process jobs. The pool is started with no workers. SetWorkerCount must be called to start them.
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close() error
Close stops all the workers in the pool and frees resources used by it waiting until all the current jobs finish.
func (*WorkerPool) Do ¶
func (wp *WorkerPool) Do(j *WorkerJob)
Do executes a job. It blocks until a worker is assigned to process the job and then it returns, with the worker processing the job asynchronously.
func (*WorkerPool) Len ¶
func (wp *WorkerPool) Len() int
Len returns the number of workers currently in the pool.
func (*WorkerPool) SetWorkerCount ¶
func (wp *WorkerPool) SetWorkerCount(workers int)
SetWorkerCount changes the number of running workers. Workers will be started or stopped as necessary to satisfy the new worker count. It blocks until the all required workers are started or stopped. Each worker, if busy, will finish its current job before stopping.
func (*WorkerPool) Stop ¶ added in v0.13.0
func (wp *WorkerPool) Stop() error
Stop stops all the workers in the pool and frees the resources used by it as well as stopping the workers and their current jobs.