cloudant

package module
v0.0.0-...-0c94297 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: Apache-2.0 Imports: 20 Imported by: 3

README

This repository is now archived.

A Go SDK for IBM Cloudant is available at https://github.com/ibm/cloudant-go-sdk

go-cloudant

A Cloudant library for Golang.

Build Status

This library is not complete, may change in incompatible ways in future versions, and comes with no support whatsoever.

Description

A Cloudant library for Golang.

Installation

go get github.com/cloudant-labs/go-cloudant

Current Features

  • Session authentication
  • Keep-Alive & Connection Pooling
  • Configurable request retrying
  • Hard limit on request concurrency
  • Stream /_all_docs & /_changes
  • Manage /_bulk_docs uploads

Getting Started

Running the Tests

To run the tests, you need a Cloudant (or CouchDB) database to talk to. The tests expect the following environment variables to be available:

COUCH_USER
COUCH_PASS
COUCH_HOST_URL # optional

If the last one isn't set, the host url is assumed to be https://$COUCH_USER.cloudant.com.

If you want to run against a local CouchDB in Docker, try:

docker run -d -p 5984:5984 --rm --name couchdb couchdb:1.6
curl -XPUT 'http://127.0.0.1:5984/_config/admins/mrblobby' -d '"blobbypassword"'
export COUCH_USER="mrblobby"
export COUCH_PASS="blobbypassword"
export COUCH_HOST_URL="http://127.0.0.1:5984"
go test

Note -- this library does not allow for unauthenticated connections, so you can't run against a CouchDB node in admin party mode. This is a good thing.

Creating a Cloudant client
// create a Cloudant client (max. request concurrency 5) with default retry configuration:
//   - maximum retries per request:     3
//   - random retry delay minimum:      5  seconds
//   - random retry delay maximum:      30 seconds
client1, err1 := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)

// create a Cloudant client (max. request concurrency 20) with _custom_ retry configuration:
//   - maximum retries per request:     5
//   - random retry delay minimum:      10  seconds
//   - random retry delay maximum:      60 seconds
client2, err2 := cloudant.CreateClientWithRetry("user123", "pa55w0rd01", "https://user123.cloudant.com", 20, 5, 10, 60)
Get a document
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

type Doc struct {
    Id     string    `json:"_id"`
    Rev    string    `json:"_rev"`
    Foo    string    `json:"foo"`
}

doc = new(Doc)
err = db.Get("my_doc", doc)

fmt.Println(doc.Foo)  // prints 'foo' key
Set a document
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

myDoc := &Doc{
        Id:     "my_doc_id",
        Rev:    "2-xxxxxxx",
        Foo:    "bar",
}

newRev, err := db.Set(myDoc)

fmt.Println(newRev)  // prints '_rev' of new document revision
Delete a document
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

err := db.Delete("my_doc_id", "2-xxxxxxx")
Using /_bulk_docs
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

myDoc1 := Doc{
        Id:     "doc1",
        Rev:    "1-xxxxxxx",
        Foo:    "bar",
}

myDoc2 := Doc{
        Id:     "doc2",
        Rev:    "2-xxxxxxx",
        Foo:    "bar",
}

myDoc3 := Doc{
        Id:     "doc3",
        Rev:    "3-xxxxxxx",
        Foo:    "bar",
}

uploader := db.Bulk(50, 1048576, 60) // new uploader using batch size 50, max batch size 1MB, flushing documents to server every 60 seconds

// Note: workers only flush their document batch to the server:
//  1)  periodically (set to -1 to disable).
//  2)  when the maximum number of documents per batch is reached.
//  3)  when the maximum batch size (in bytes) is reached (set to -1 to disable).
//  4)  if a document is uploaded using `.UploadNow(doc)`.
//  5)  if a client calls `.Flush()` or `.Stop()`.

uploader.FireAndForget(myDoc1)

upload.Flush() // blocks until all received documents have been uploaded

r2 := uploader.UploadNow(myDoc2) // uploaded as soon as it's received by a worker

r2.Wait()
if r2.Error != nil {
    fmt.Println(r2.Response.Id, r2.Response.Rev) // prints new document '_id' and 'rev'
}

r3 := uploader.Upload(myDoc3) // queues until the worker creates a full batch of 50 documents

upload.AsyncFlush() // asynchronously uploads all received documents

upload.Stop() // blocks until all documents have been uploaded and workers have stopped

r3.Wait()
if r3.Error != nil {
    fmt.Println(r3.Response.Id, r3.Response.Rev) // prints new document '_id' and 'rev'
}
Using /_all_docs
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

rows, err := db.All(&allDocsQuery{})

// OR include some query options...
//
// q := cloudant.NewAllDocsQuery().
//        Limit(123).
//        StartKey("foo1").
//        EndKey("foo2").
//        Build()
//
//    rows, err := db.All(q)

for{
    row, more := <-rows
    if more {
        fmt.Println(row.ID, row.Value.Rev)  // prints document 'id' and 'rev'
    } else {
        break
    }
}
Using /_changes
// create a Cloudant client (max. request concurrency 5)
client, err := cloudant.CreateClient("user123", "pa55w0rd01", "https://user123.cloudant.com", 5)
db, err := client.GetOrCreate("my_database")

query := cloudant.NewChangesQuery().IncludeDocs().Build()

changes, err := db.Changes(query)

for {
    change, more := <-changes
    if more {
        fmt.Println(change.Seq, change.Id, change.Rev)  // prints change 'seq', 'id' and 'rev'

        // Doc body
        str, _ := json.MarshalIndent(change.Doc, "", "  ")
        fmt.Printf("%s\n", str)
    } else {
        break
    }
}
Using Follower

Follower is a robust changes feed follower that runs in continuous mode, emitting events from the changes feed on a channel. Its aims is to stay running until told to terminate.

client, err := cloudant.CreateClient(...)

db, err := client.Get(DATABASE)
if err != nil {
    fmt.Printf("error\n")
    return
}

// Only generate a Seq ID every 100 changes
follower := cloudant.NewFollower(db, 100)
changes, err := follower.Follow()
if err != nil {
    fmt.Println(err)
    return
}

for {
    changeEvent := <-changes

    switch changeEvent.EventType {
    case cloudant.ChangesHeartbeat:
        fmt.Println("tick")
    case cloudant.ChangesError:
        fmt.Println(changeEvent.Err)
    case cloudant.ChangesTerminated:
        fmt.Println("terminated; resuming from last known sequence id")
        changes, err = follower.Follow()
        if err != nil {
            fmt.Println("resumption error ", err)
            return
        }
    case cloudant.ChangesInsert:
        fmt.Printf("INSERT %s\n", changeEvent.Meta.ID)
    case cloudant.ChangesDelete:
        fmt.Printf("DELETE %s\n", changeEvent.Meta.ID)
    default:
        fmt.Printf("UPDATE %s\n", changeEvent.Meta.ID)
    }
}

Documentation

Index

Constants

View Source
const (
	// ChangesInsert is a new document, with _rev starting with "1-"
	ChangesInsert = iota
	// ChangesUpdate is a new revison of an existing document
	ChangesUpdate
	// ChangesDelete is a document deletion
	ChangesDelete
	// ChangesHeartbeat is an empty line sent to keep the connection open
	ChangesHeartbeat
	// ChangesTerminated means far end closed the connection
	ChangesTerminated
	ChangesError
)

Constants defining the possible event types in a changes feed

View Source
const (
	VERSION = "0.1.0"
)

Variables

View Source
var LogFunc = log.Printf

LogFunc is a function that logs the provided message with optional fmt.Sprintf-style arguments. By default, logs to the default log.Logger.

Functions

func Endpoint

func Endpoint(base url.URL, pathStr string, params url.Values) (string, error)

Endpoint is a convenience function to build url-strings

Types

type AllDBsQueryBuilder

type AllDBsQueryBuilder interface {
	EndKey(string) AllDBsQueryBuilder
	InclusiveEnd() AllDBsQueryBuilder
	Limit(int) AllDBsQueryBuilder
	Skip(int) AllDBsQueryBuilder
	StartKey(string) AllDBsQueryBuilder
	Build() *allDBsQuery
}

AllDBsQueryBuilder defines the available parameter-setting functions.

func NewAllDBsQuery

func NewAllDBsQuery() AllDBsQueryBuilder

NewAllDBsQuery is the entry point.

type AllDocsQueryBuilder

type AllDocsQueryBuilder interface {
	Conflicts() AllDocsQueryBuilder
	DeletedConflicts() AllDocsQueryBuilder
	Descending() AllDocsQueryBuilder
	EndKey(string) AllDocsQueryBuilder
	IncludeDocs() AllDocsQueryBuilder
	InclusiveEnd() AllDocsQueryBuilder
	Key(string) AllDocsQueryBuilder
	Keys([]string) AllDocsQueryBuilder
	Limit(int) AllDocsQueryBuilder
	Meta() AllDocsQueryBuilder
	R(int) AllDocsQueryBuilder
	RevsInfo() AllDocsQueryBuilder
	Skip(int) AllDocsQueryBuilder
	StartKey(string) AllDocsQueryBuilder
	Build() *allDocsQuery
}

AllDocsQueryBuilder defines the available parameter-setting functions.

func NewAllDocsQuery

func NewAllDocsQuery() AllDocsQueryBuilder

NewAllDocsQuery is the entry point.

type AllRow

type AllRow struct {
	ID    string      `json:"id"`
	Value AllRowValue `json:"value"`
	Doc   interface{} `json:"doc"`
}

AllRow represents a row in the json array returned by all_docs

type AllRowValue

type AllRowValue struct {
	Rev string `json:"rev"`
}

AllRowValue represents a part returned by _all_docs

type BulkDocsRequest

type BulkDocsRequest struct {
	Docs     []interface{} `json:"docs"`
	NewEdits bool          `json:"new_edits"`
}

BulkDocsRequest is the JSON body of a request to the _bulk_docs endpoint

type BulkDocsResponse

type BulkDocsResponse struct {
	Error  string `json:"error,omitempty"`
	ID     string `json:"id"`
	Reason string `json:"reason,omitempty"`
	Rev    string `json:"rev,omitempty"`
}

BulkDocsResponse is the JSON body of the response from the _bulk_docs endpoint

type BulkJob

type BulkJob struct {
	Error error

	Response *BulkDocsResponse
	// contains filtered or unexported fields
}

BulkJob represents the state of a single document to be uploaded as part of a batch

func (*BulkJob) Wait

func (j *BulkJob) Wait()

Wait blocks while the job is being executed.

type BulkJobI

type BulkJobI interface {
	Wait()
	// contains filtered or unexported methods
}

BulkJobI ...

type Change

type Change struct {
	ID      string
	Rev     string
	Seq     string
	Deleted bool
	Doc     map[string]interface{} // Only present if Changes() called with include_docs=true
}

Change represents a part returned by _changes

type ChangeEvent

type ChangeEvent struct {
	EventType int
	Meta      *DocumentMeta
	Seq       string
	Doc       map[string]interface{}
	Err       error
}

ChangeEvent is the message structure delivered by the Read function

type ChangeRow

type ChangeRow struct {
	ID      string                 `json:"id"`
	Seq     string                 `json:"seq"` // If using CouchDB1.6, this is a number
	Changes []ChangeRowChanges     `json:"changes"`
	Deleted bool                   `json:"deleted"`
	Doc     map[string]interface{} `json:"doc"`
}

ChangeRow represents a part returned by _changes

func (*ChangeRow) UnmarshalJSON

func (c *ChangeRow) UnmarshalJSON(data []byte) error

UnmarshalJSON is here for coping with CouchDB1.6's sequence IDs being numbers, not strings as in Cloudant and CouchDB2.X.

See https://play.golang.org/p/BytXCeHMvt

type ChangeRowChanges

type ChangeRowChanges struct {
	Rev string `json:"rev"`
}

ChangeRowChanges represents a part returned by _changes

type ChangesQueryBuilder

type ChangesQueryBuilder interface {
	Conflicts() ChangesQueryBuilder
	Descending() ChangesQueryBuilder
	DocIDs([]string) ChangesQueryBuilder
	Feed(string) ChangesQueryBuilder
	Filter(string) ChangesQueryBuilder
	Heartbeat(int) ChangesQueryBuilder
	IncludeDocs() ChangesQueryBuilder
	Limit(int) ChangesQueryBuilder
	SeqInterval(int) ChangesQueryBuilder
	Since(string) ChangesQueryBuilder
	Style(string) ChangesQueryBuilder
	Timeout(int) ChangesQueryBuilder
	Build() *changesQuery
}

ChangesQueryBuilder defines the available parameter-setting functions.

func NewChangesQuery

func NewChangesQuery() ChangesQueryBuilder

NewChangesQuery is the entry point.

type CouchClient

type CouchClient struct {
	// contains filtered or unexported fields
}

CouchClient is the representation of a client connection

func CreateClient

func CreateClient(username, password, rootStrURL string, concurrency int) (*CouchClient, error)

CreateClient returns a new client (with max. retry 3 using a random 5-30 secs delay).

func CreateClientWithRetry

func CreateClientWithRetry(username, password, rootStrURL string, concurrency, retryCountMax,
	retryDelayMin, retryDelayMax int) (*CouchClient, error)

CreateClientWithRetry returns a new client with configurable retry parameters

func (*CouchClient) AllDBs

func (c *CouchClient) AllDBs(args *allDBsQuery) (*[]string, error)

AllDBs returns a list of all DBs

func (*CouchClient) Delete

func (c *CouchClient) Delete(databaseName string) error

Delete deletes a specified database.

func (*CouchClient) Execute

func (c *CouchClient) Execute(job *Job)

Execute submits a job for execution. The client must call `job.Wait()` before attempting access the response attribute. Always call `job.Close()` to ensure the underlying connection is terminated.

func (*CouchClient) Exists

func (c *CouchClient) Exists(databaseName string) (bool, error)

Exists checks the existence of a specified database. Returns true if the database exists, else false.

func (*CouchClient) Get

func (c *CouchClient) Get(databaseName string) (*Database, error)

Get returns a database. It is assumed to exist.

func (*CouchClient) GetOrCreate

func (c *CouchClient) GetOrCreate(databaseName string) (*Database, error)

GetOrCreate returns a database. If the database doesn't exist on the server then it will be created.

func (*CouchClient) LogIn

func (c *CouchClient) LogIn() error

LogIn creates a session.

func (*CouchClient) LogOut

func (c *CouchClient) LogOut()

LogOut deletes the current session.

func (*CouchClient) Ping

func (c *CouchClient) Ping() (err error)

Ping can be used to check whether a server is alive. It sends an HTTP HEAD request to the server's URL.

func (*CouchClient) Stop

func (c *CouchClient) Stop()

Stop kills all running workers. Once called the client is no longer able to execute new jobs.

type CouchError

type CouchError struct {
	Err        string `json:"error"`
	Reason     string `json:"reason"`
	StatusCode int
}

CouchError is a server error response

func (*CouchError) Error

func (e *CouchError) Error() string

Error() implements the error interface

type CredentialsExpiredResponse

type CredentialsExpiredResponse struct {
	Error string `json:"error"`
}

type Database

type Database struct {
	Name string
	URL  *url.URL
	// contains filtered or unexported fields
}

Database holds a reference to an authenticated client connection and the name of a remote database

func (*Database) All

func (d *Database) All(args *allDocsQuery) (<-chan *AllRow, error)

All returns a channel in which AllRow types can be received.

func (*Database) Bulk

func (d *Database) Bulk(batchSize int, batchMaxBytes int, flushSecs int) *Uploader

Bulk returns a new bulk document uploader.

func (*Database) Changes

func (d *Database) Changes(args *changesQuery) (<-chan *Change, error)

Changes returns a channel in which Change types can be received. See: https://console.bluemix.net/docs/services/Cloudant/api/database.html#get-changes

func (*Database) Delete

func (d *Database) Delete(documentID, rev string) error

Delete a document with a specified revision.

func (*Database) Get

func (d *Database) Get(documentID string, args *getQuery, target interface{}) error

Get a document from the database. See: https://console.bluemix.net/docs/services/Cloudant/api/document.html#read

func (*Database) Info

func (d *Database) Info() (*Info, error)

Info returns database information. See https://console.bluemix.net/docs/services/Cloudant/api/database.html#getting-database-details

func (*Database) Set

func (d *Database) Set(document interface{}) (*DocumentMeta, error)

Set a document. The specified type may have a json attributes '_id' and '_rev'. If no '_id' is given the database will generate one for you.

type DocumentMeta

type DocumentMeta struct {
	ID  string `json:"id"`
	Rev string `json:"rev"`
}

DocumentMeta is a CouchDB id/rev pair

type Follower

type Follower struct {
	// contains filtered or unexported fields
}

Follower is the orchestrator

func NewFollower

func NewFollower(database *Database, interval int) *Follower

NewFollower creates a Follower on database's changes

func (*Follower) Close

func (f *Follower) Close()

Close will terminate the Follower

func (*Follower) Follow

func (f *Follower) Follow() (<-chan *ChangeEvent, error)

Follow starts listening to the changes feed

type GetQueryBuilder

type GetQueryBuilder interface {
	Attachments() GetQueryBuilder
	AttEncodingInfo() GetQueryBuilder
	AttsSince([]string) GetQueryBuilder
	Conflicts() GetQueryBuilder
	DeletedConflicts() GetQueryBuilder
	Latest() GetQueryBuilder
	LocalSeq() GetQueryBuilder
	Meta() GetQueryBuilder
	OpenRevs([]string) GetQueryBuilder
	Rev(string) GetQueryBuilder
	Revs() GetQueryBuilder
	RevsInfo() GetQueryBuilder
	Build() *getQuery
}

GetQueryBuilder defines the available parameter-setting functions.

func NewGetQuery

func NewGetQuery() GetQueryBuilder

NewGetQuery is the entry point.

type Info

type Info struct {
	IsCompactRunning bool   `json:"compact_running"`
	DataSize         int    `json:"data_size"`
	DocDelCount      int    `json:"doc_del_count"`
	DocCount         int    `json:"doc_count"`
	DiskSize         int    `json:"disk_size"`
	UpdateSeq        string `json:"update_seq"`
}

Info represents the account meta-data

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job wraps all requests

func CreateJob

func CreateJob(request *http.Request) *Job

CreateJob makes a new Job from a HTTP request.

func UploadBulkDocs

func UploadBulkDocs(bulkDocs *BulkDocsRequest, database *Database) (result *Job, err error)

UploadBulkDocs performs a synchronous _bulk_docs POST

func (*Job) Close

func (j *Job) Close()

Close closes the response body reader to prevent a memory leak, even if not used

func (*Job) Response

func (j *Job) Response() *http.Response

Response returns the http response

func (*Job) Wait

func (j *Job) Wait()

Wait blocks while the job is being executed.

type QueryBuilder

type QueryBuilder interface {
	GetQuery() (url.Values, error)
}

QueryBuilder is used by functions implementing Cloudant API calls that have many optional parameters

type Uploader

type Uploader struct {
	NewEdits bool
	// contains filtered or unexported fields
}

Uploader is where Mr Smartypants live

func (*Uploader) AsyncFlush

func (u *Uploader) AsyncFlush()

AsyncFlush asynchronously uploads all received documents.

func (*Uploader) BulkUploadSimple

func (u *Uploader) BulkUploadSimple(docs []interface{}) ([]BulkDocsResponse, error)

BulkUploadSimple does a one-shot synchronous bulk upload

func (*Uploader) FireAndForget

func (u *Uploader) FireAndForget(doc interface{})

FireAndForget adds a document to the upload queue ready for processing by the upload worker(s).

func (*Uploader) Flush

func (u *Uploader) Flush()

Flush blocks until all received documents have been uploaded.

func (*Uploader) Stop

func (u *Uploader) Stop()

Stop uploads all received documents and then terminates the upload worker(s)

func (*Uploader) Upload

func (u *Uploader) Upload(doc interface{}) *BulkJob

Upload adds a document to the upload queue ready for processing by the upload worker(s). A BulkJob type is returned to the client so that progress can be monitored.

func (*Uploader) UploadNow

func (u *Uploader) UploadNow(doc interface{}) *BulkJob

UploadNow adds a priority document to the upload queue ready for processing by the upload worker(s). Once received by a worker it triggers the upload of the entire batch (regardless of the current batch size). A BulkJob type is returned to the client so that progress can be monitored.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL