borges

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2017 License: GPL-3.0 Imports: 31 Imported by: 9

README

borges Build Status codecov.io

borges archives repositories in a universal git library.

I have always imagined that Paradise will be a kind of library.

borges uses the producer/consumer pattern, where a producer generates jobs and one or more consumers get the jobs and execute them. Each borges job triggers an update of a repository.

Read the borges package godoc for further details on how borges archives repositories.

CLI

Run borges --help to get help about the main commands (producer and consumer) and their options.

Setting up borges

Borges needs a database and a broker to do its job. It will connect to a postgres database by default and use rabbitMQ as default too. If you want to set up some configuration about the database, you can do it through the following environment variables:

  • CONFIG_DBUSER, by default: testing
  • CONFIG_DBPASS, by default: testing
  • CONFIG_DBHOST, by default: 0.0.0.0
  • CONFIG_DBPORT, by default: 5432
  • CONFIG_DBNAME, by default: testing
  • CONFIG_DBSSLMODE, by default: disable
  • CONFIG_DBAPPNAME, by default: ``
  • CONFIG_DBTIMEOUT, by default: 30s

To config other imprtant settings you should use:

  • CONFIG_TEMP_DIR: Local path to store temporal files needed by the Borges consumer, by default: /tmp/sourced
  • CONFIG_BROKER: by default: amqp://localhost:5672
  • CONFIG_ROOT_REPOSITORIES_DIR: where to leave siva files, if no HDFS connection url is provided, this will be a local path. If not, it will be an HDFS folder, by default: /tmp/root-repositories
  • CONFIG_LOCKING, by default: local:, other options: etcd:
  • CONFIG_HDFS: (host:port) If this property is not provided, all root repositories will be stored into the local fs, by default: ""

Producer

The producer runs as a service. It determines which repositories should be updated next and enqueues new jobs for them.

To launch the producer you just have to run it with the default configuration:

borges producer

Producer reads mentions from rovers rabbit queue by default, but it can read urls directly from a file with the cli option:

borges producer --source=file --file /path/to/file

The file must contain a url per line, it looks like:

https://github.com/a/repo1
https://github.com/b/repo2.git
http://github.com/c/repo3
http://github.com/d/repo4.git

So a possible command to launch the producer could be:

$ CONFIG_DBUSER="user" \
CONFIG_DBPASS="pass" \
CONFIG_DBHOST="postgres" \
CONFIG_DBNAME="borges-db"  \
CONFIG_BROKER="amqp://guest:guest@rabbitmq:5672" \
borges producer --loglevel=debug

If you need some help just type borges producer -h

Consumer

The consumer runs as a service. It gets jobs from the queue and dispatches them to a worker pool.

Each job is a request to update a repository. It can be a new or an existing one. The repository is fetched (incrementally when possible) and each reference is pushed to a local repository dedicated to all references from all repositories that share the same init commit.

Note that borges should be the only one creating and writing to our repository storage.

To run a consumer instance from the command line with default configuration:

borges consumer

You can select the number of workers to use, by default it uses 8:

borges consumer --workers=20

A command you could use to run it could be:

$ CONFIG_TEMP_DIR="/borges/tmp"  \
CONFIG_ROOT_REPOSITORIES_DIR="/borges/root-repositories"  \
borges consumer --workers=20 --loglevel=debug

To get help run borges consumer -h

Administration Notes

Both the producer and consumer services will run even if they cannot connect to the queue, or even if the queue is malfunctioning. If the queue does not work, they will just retry until it does.

Development

Build

  • rm Makefile.main; rm -rf .ci to make sure you will have the last Makefile changes.
  • make dependencies to download vendor dependencies using Glide.
  • make packages to generate binaries for several platforms.

You will find the built binaries in borges_linux_amd64/borges and borges_darwin_amd64/borges.

If you're running borges for the first time, make sure you initialize the schema of the database first. You can do so by running the following command:

borges init

Test

make test

Borges has 2 runtime dependencies and has tests that depend on them:

  • RabbitMQ

    Consumers and Producers interact through a Queue. You can run one in Docker with the following command:

    docker run -d --hostname rabbit --name rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
    

    Note: a hostname needs to be provided, due to the fact that rabbitmq stores data according to the host name

  • PostgreSQL

    Consumers make SIVA files with RootedRepositories, but all repository metadata is stored in PostgreSQL. You can run one in Docker with the following command:

    docker run --name postgres  -e POSTGRES_DB=testing -e POSTGRES_USER=testing -e POSTGRES_PASSWORD=testing  -p 5432:5432 -d postgres
    # to check it manually, use
    docker exec -ti some-postgres psql -U testing
    

Use make test-coverage to run all tests and produce a coverage report.

Documentation

Overview

borges archives repositories in a universal git library.

The goal of borges is fetching repositories and maintain them updated. Repositories are arranged in a repository storage where that contains one repository per init commit found.

We define root commit as any commit with no parents (the first commit of a repository). Note that a repository can contain multiple root commits.

For each reference, we define its init commit as the root commit that is reached by following the first parent of each commit in the history. This is the commit that would be obtained with:

$ git rev-list --first-parent <ref> | tail -n 1

When borges fetches a repository, it groups all references by init commit and pushes each group of references to a repository for its init commit.

Index

Constants

View Source
const (
	Create  Action = "create"
	Update         = "update"
	Delete         = "delete"
	Invalid        = "invalid"
)
View Source
const (
	FetchRefSpec = config.RefSpec("refs/*:refs/*")
	FetchHEAD    = config.RefSpec("HEAD:refs/heads/HEAD")
)

Variables

View Source
var (
	ErrCleanRepositoryDir     = errors.NewKind("cleaning up local repo dir failed")
	ErrClone                  = errors.NewKind("cloning %s failed")
	ErrPushToRootedRepository = errors.NewKind("push to rooted repo %s failed")
	ErrArchivingRoots         = errors.NewKind("archiving %d out of %d roots failed: %s")
	ErrEndpointsEmpty         = errors.NewKind("endpoints is empty")
	ErrRepositoryIDNotFound   = errors.NewKind("repository id not found: %s")
	ErrChanges                = errors.NewKind("error computing changes")
	ErrAlreadyFetching        = errors.NewKind("repository %s was already in a fetching status")
	ErrSetStatus              = errors.NewKind("unable to set repository to status: %s")
)
View Source
var (
	// ErrAlreadyStopped signals that an operation cannot be done because
	// the entity is already sopped.
	ErrAlreadyStopped = errors.NewKind("already stopped: %s")

	ErrWaitForJobs = errors.NewKind("no more jobs at the moment")

	ErrReferencedObjectTypeNotSupported error = stderrors.New("referenced object type not supported")
)

Functions

func RepositoryID

func RepositoryID(endpoints []string, isFork *bool, storer *model.RepositoryStore) (uuid.UUID, error)

RepositoryID tries to find a repository by the endpoint into the database. If no repository is found, it creates a new one and returns the ID.

func ResolveCommit

func ResolveCommit(r *git.Repository, h plumbing.Hash) (*object.Commit, error)

ResolveCommit gets the hash of a commit that is referenced by a tag, per example. The only resolvable objects are Tags and Commits. If the object is not one of them, This method will return an ErrReferencedObjectTypeNotSupported. The output hash always will be a Commit hash.

func StoreConfig added in v0.6.0

func StoreConfig(r *git.Repository, mr *model.Repository) error

func WithInProcRepository

func WithInProcRepository(r *git.Repository, f func(string) error) error

Types

type Action

type Action string

type Archiver

type Archiver struct {

	// TemporaryCloner is used to clone repositories into temporary storage.
	TemporaryCloner TemporaryCloner

	// Timeout is the deadline to cancel a job.
	Timeout time.Duration

	// RepositoryStore is the database where repository models are stored.
	RepositoryStorage *model.RepositoryStore

	// RootedTransactioner is used to push new references to our repository
	// storage.
	RootedTransactioner repository.RootedTransactioner

	// LockSession is a locker service to prevent concurrent access to the same
	// rooted reporitories.
	LockSession lock.Session
	// contains filtered or unexported fields
}

Archiver archives repositories. Archiver instances are thread-safe and can be reused.

See borges documentation for more details about the archiving rules.

func (*Archiver) Do

func (a *Archiver) Do(j *Job) error

Do archives a repository according to a job.

type Changes

type Changes map[model.SHA1][]*Command

Changes represents several actions to realize to our root repositories. The map key is the hash of a init commit, and the value is a slice of Command that can be add a new reference, delete a reference or update the hash a reference points to.

func NewChanges

func NewChanges(old, new Referencer) (Changes, error)

NewChanges returns Changes needed to obtain the current state of the repository from a set of old references. The Changes could be create, update or delete. If an old reference has the same name of a new one, but the init commit is different, then the changes will contain a delete command and a create command. If a new reference has more than one init commit, at least one create command per init commit will be created.

Here are all possible cases for up to one reference. We use the notation a<11,01> to refer to reference 'a', pointing to hash '11' with initial commit '01'.

Old		New		Changes
---		---		-------
Ø		Ø		Ø
Ø		a<11,01>	01 -> c<a,11>
a<11,01>	Ø		01 -> d<a,11>
a<11,01>	a<12,01>	01 -> u<a,11,12>
a<11,01>	a<11,02>	01 -> d<a,11> | 02 -> c<a,11> (invalid)
a<11,01>	a<12,02>	01 -> d<a,11> | 02 -> c<a,12>

func (Changes) Add

func (c Changes) Add(new *model.Reference)

func (Changes) Delete

func (c Changes) Delete(old *model.Reference)

func (Changes) Update

func (c Changes) Update(old, new *model.Reference)

type Command

type Command struct {
	Old *model.Reference
	New *model.Reference
}

Command is the way to represent a change into a reference. It could be: - Create: A new reference is created - Update: A previous reference is updated. This means its head changes. - Delete: A previous reference does not exist now.

func (*Command) Action

func (c *Command) Action() Action

Action returns the action related to this command depending of his content

type Consumer

type Consumer struct {
	Notifiers struct {
		QueueError func(error)
	}
	WorkerPool *WorkerPool
	Queue      queue.Queue
	// contains filtered or unexported fields
}

Consumer consumes jobs from a queue and uses multiple workers to process them.

func NewConsumer

func NewConsumer(queue queue.Queue, pool *WorkerPool) *Consumer

NewConsumer creates a new consumer.

func (*Consumer) Start

func (c *Consumer) Start()

Start initializes the consumer and starts it, blocking until it is stopped.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop stops the consumer. Note that it does not close the underlying queue and worker pool. It blocks until the consumer has actually stopped.

type Job

type Job struct {
	RepositoryID uuid.UUID
}

Job represents a borges job to fetch and archive a repository.

type JobIter

type JobIter interface {
	io.Closer
	// Next returns the next job. It returns io.EOF if there are no more
	// jobs. If there are no more jobs at the moment, but there can be
	// in the future, it returns an error of kind ErrWaitForJobs.
	Next() (*Job, error)
}

JobIter is an iterator of Job.

func NewLineJobIter

func NewLineJobIter(r io.ReadCloser, storer *model.RepositoryStore) JobIter

NewLineJobIter returns a JobIter that returns jobs generated from a reader with a list of repository URLs, one per line.

func NewMentionJobIter

func NewMentionJobIter(q queue.Queue, storer *model.RepositoryStore) JobIter

NewMentionJobIter returns a JobIter that returns jobs generated from mentions received from a queue (e.g. from rovers).

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a service to generate jobs and put them to the queue.

func NewProducer

func NewProducer(log log15.Logger, jobIter JobIter, queue queue.Queue) *Producer

NewProducer creates a new producer.

func (*Producer) Start

func (p *Producer) Start()

Start starts the producer services. It blocks until Stop is called.

func (*Producer) Stop

func (p *Producer) Stop()

Stop stops the producer.

type Referencer

type Referencer interface {
	// References retrieves a slice of *model.Reference or an error.
	References() ([]*model.Reference, error)
}

Referencer can retrieve reference models (*model.Reference).

func NewGitReferencer

func NewGitReferencer(r *git.Repository) Referencer

NewGitReferencer takes a *git.Repository and returns a Referencer that retrieves any valid reference from it. Symbolic references and references that do not point to commits (possibly through a tag) are silently ignored. It might return an error if any operation fails in the underlying repository.

func NewModelReferencer

func NewModelReferencer(r *model.Repository) Referencer

NewModelReferencer takes a *model.Repository and returns a Referencer that accesses its references. The resulting Referencer never returns an error.

type TemporaryCloner

type TemporaryCloner interface {
	Clone(ctx context.Context, id, url string) (TemporaryRepository, error)
}

func NewTemporaryCloner

func NewTemporaryCloner(tmpFs billy.Filesystem) TemporaryCloner

type TemporaryRepository

type TemporaryRepository interface {
	io.Closer
	Referencer
	Push(ctx context.Context, url string, refspecs []config.RefSpec) error
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a worker that processes jobs from a channel.

func NewWorker

func NewWorker(log log15.Logger, do func(log15.Logger, *Job) error, ch chan *WorkerJob) *Worker

NewWorker creates a new Worker. The first parameter is a WorkerContext that will be passed to the processing function on every call. The second parameter is the processing function itself that will be called for every job. The third parameter is a channel that the worker will consume jobs from.

func (*Worker) IsRunning

func (w *Worker) IsRunning() bool

IsRunning returns true if the worker is running.

func (*Worker) Start

func (w *Worker) Start()

Start processes jobs from the input channel until it is stopped. Start blocks until the worker is stopped or the channel is closed.

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker. It blocks until it is actually stopped. If it is currently processing a job, it will finish before stopping.

type WorkerJob

type WorkerJob struct {
	*Job
	queue.Acknowledger
}

A WorkerJob is a job to be passed to the worker. It contains the Job itself and an acknowledger that the worker uses to signal that it finished the job.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of workers that can process jobs.

func NewArchiverWorkerPool

NewArchiverWorkerPool creates a new WorkerPool that uses an Archiver to process jobs. It takes optional start, stop and warn notifier functions that are equal to the Archiver notifiers but with additional WorkerContext.

func NewWorkerPool

func NewWorkerPool(log log15.Logger, f func(log15.Logger, *Job) error) *WorkerPool

NewWorkerPool creates a new empty worker pool. It takes a function to be used by workers to process jobs. The pool is started with no workers. SetWorkerCount must be called to start them.

func (*WorkerPool) Close

func (wp *WorkerPool) Close() error

Close stops all the workers in the pool and frees resources used by it. Workers are It blocks until it finishes.

func (*WorkerPool) Do

func (wp *WorkerPool) Do(j *WorkerJob)

Do executes a job. It blocks until a worker is assigned to process the job and then it returns, with the worker processing the job asynchronously.

func (*WorkerPool) Len

func (wp *WorkerPool) Len() int

Len returns the number of workers currently in the pool.

func (*WorkerPool) SetWorkerCount

func (wp *WorkerPool) SetWorkerCount(workers int)

SetWorkerCount changes the number of running workers. Workers will be started or stopped as necessary to satisfy the new worker count. It blocks until the all required workers are started or stopped. Each worker, if busy, will finish its current job before stopping.

Directories

Path Synopsis
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL