journal

package
v2.0.212+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2019 License: MIT Imports: 26 Imported by: 60

Documentation

Overview

Package journal contains all runtime components for Gazette journals,
including Fragment & Spool for journal content, Head (serving
replications), Tail (reads), and Broker (for brokering new writes).

Code generated by mockery v1.0.0

Code generated by mockery v1.0.0

Code generated by mockery v1.0.0

Code generated by mockery v1.0.0

Code generated by mockery v1.0.0

Index

Constants

View Source
const (
	AppendOpBufferSize = 100
)
View Source
const ReplicateOpBufferSize = 10

Variables

View Source
var (
	ErrExists            = errors.New("journal exists")
	ErrNotBroker         = errors.New("not journal broker")
	ErrNotFound          = errors.New("journal not found")
	ErrNotReplica        = errors.New("not journal replica")
	ErrNotYetAvailable   = errors.New("offset not yet available")
	ErrReplicationFailed = errors.New("replication failed")
	ErrWrongRouteToken   = errors.New("wrong route token")
	ErrWrongWriteHead    = errors.New("wrong write head")
)
View Source
var ErrInvalidDelta = errors.New("invalid delta")

Functions

func ErrorFromResponse

func ErrorFromResponse(response *http.Response) error

Maps a HTTP status code into a correponding Journal protocol error, or nil. Unknown status codes are converted into an error.

func NewWalkFuncAdapter

func NewWalkFuncAdapter(callback func(Fragment) error, rewrites ...string) filepath.WalkFunc

NewWalkFuncAdapter returns a filepath.WalkFunc which parses encountered files as Fragments, and passes each to the provided |callback|. Prefix |rewrites| may be included, as pairs of "from", "to" prefixes which are applied in order. For example, NewWalkFuncAdapter(cb, "/from/", "/foo/to/", "/foo/", "/") would rewrite path "/from/bar" => "/to/bar".

func StatusCodeForError

func StatusCodeForError(err error) int

Maps Journal protocol errors into a unique HTTP status code. Other errors are mapped into http.StatusInternalServerError.

Types

type AppendArgs

type AppendArgs struct {
	Journal Name
	// Content to be appended to |Journal|. The append will consume |Content|
	// until io.EOF, and abort the append (without committing any content)
	// if any other error is returned by |Content.Read()|.
	Content io.Reader
	// Context which may trace, cancel or supply a deadline for the operation.
	Context context.Context
}

func (AppendArgs) String

func (a AppendArgs) String() string

type AppendOp

type AppendOp struct {
	AppendArgs

	// Channel by which broker returns operation status.
	Result chan AppendResult `json:"-"`
}

type AppendResult

type AppendResult struct {
	// Any error that occurred during the append operation (PUT request.)
	Error error
	// Write head at the completion of the operation.
	WriteHead int64
	// RouteToken of the Journal. Set on ErrNotBroker.
	RouteToken
}

func (AppendResult) String

func (a AppendResult) String() string

type AsyncAppend

type AsyncAppend struct {
	// Read-only, and valid only after Ready is signaled.
	AppendResult
	// Signaled with the AppendOp has completed.
	Ready chan struct{}
}

Represents an AppendOp which is being asynchronously executed.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is responsible for scattering journal writes to each replica, i.e., brokering transactions.

func NewBroker

func NewBroker(journal Name) *Broker

func (*Broker) Append

func (b *Broker) Append(op AppendOp)

func (*Broker) StartServingOps

func (b *Broker) StartServingOps(writeHead int64) *Broker

StartServingOps starts a loop to consume config updates and serves appends. Updates are always handled before appends.

func (*Broker) Stop

func (b *Broker) Stop()

Stop shuts down the broker. It blocks until all pending config updates and appends are handled.

func (*Broker) UpdateConfig

func (b *Broker) UpdateConfig(config BrokerConfig)

type BrokerConfig

type BrokerConfig struct {
	// Replica instances which should be involved in brokered transactions.
	Replicas []Replicator
	// Token representing the Broker's view of the current replication topology.
	// Sent with replication requests and verified for consensus by each remote
	// replica: for a transaction to succeed, all replicas must agree on the
	// current |RouteToken|.
	RouteToken
	// Next offset of the next brokered write transaction. Also sent with
	// replication requests and verifed for consensus by each remote replica:
	// for a transaction to succeed, all replicas must agree on the |WriteHead|.
	WriteHead int64
	// contains filtered or unexported fields
}

BrokerConfig is used to periodically update Broker with updated cluster topology and replication configuration.

type Client

type Client interface {
	Creator
	Getter
	Header
	Writer
}

Composes Creator, Getter, Header, and Writer.

type Creator

type Creator interface {
	Create(journal Name) error
}

Performs a Gazette POST operation.

type Doer

type Doer interface {
	Do(*http.Request) (*http.Response, error)
}

Provides low-level routing and access to a Gazette service, suitable for proxying requests and modeled on http.Client. The client will perform journal-based routing to the appropriate Gazette instance. See gazette.Client.

type Fragment

type Fragment struct {
	Journal    Name
	Begin, End int64
	Sum        [sha1.Size]byte

	// Backing file of the fragment, if present locally.
	File FragmentFile
	// If fragment is remote, the time of last modification.
	// NOTE(joshk): Does not get set in Client use.
	// TODO(johnny): Is this the appropriate factoring?
	RemoteModTime time.Time
}

func LocalFragments

func LocalFragments(directory string, journal Name) []Fragment

LocalFragments returns fragments of |journal| under the local |directory|.

TODO(johnny): Collapse with NewWalkFuncAdapter above, or deprecate as part of a larger local-fragment change (Issues #30 & #31).

func ParseFragment

func ParseFragment(journal Name, contentName string) (Fragment, error)

func (Fragment) AsDirectURL

func (f Fragment) AsDirectURL(cfs cloudstore.FileSystem, duration time.Duration) (*url.URL, error)

func (Fragment) ContentName

func (f Fragment) ContentName() string

func (*Fragment) ContentPath

func (f *Fragment) ContentPath() string

func (Fragment) IsLocal

func (f Fragment) IsLocal() bool

func (Fragment) ReaderFromOffset

func (f Fragment) ReaderFromOffset(offset int64,
	cfs cloudstore.FileSystem) (io.ReadCloser, error)

func (Fragment) Size

func (f Fragment) Size() int64

type FragmentFile

type FragmentFile interface {
	Close() error
	Read(p []byte) (n int, err error)
	ReadAt(p []byte, off int64) (n int, err error)
	Seek(offset int64, whence int) (int64, error)
	Fd() uintptr
	Write(p []byte) (n int, err error)
}

Portions of os.File interface used by Fragment. An interface is used (rather than directly using *os.File) in support of test mocks.

type FragmentPersister

type FragmentPersister interface {
	Persist(Fragment)
}

FragmentPersister accepts completed local fragment spools, which should be persisted to long-term storage. See |gazette.Persister|.

type FragmentSet

type FragmentSet []Fragment

Maintains fragments ordered on |Begin| and |End|, with the invariant that no fragment is fully overlapped by another fragment in the set (though it may be overlapped by a combination of other fragments). Larger fragments are preferred (and will replace spans of overlapped smaller fragments). An implication of this invariant is that no two fragments have the same |Begin| or |End| (as that would imply an overlap). Both are monotonically increasing in the set: set[0].Begin represents the minimum offset, and set[len(set)-1].End represents the maximum offset.

func (*FragmentSet) Add

func (s *FragmentSet) Add(fragment Fragment) bool

func (*FragmentSet) BeginOffset

func (s *FragmentSet) BeginOffset() int64

func (*FragmentSet) EndOffset

func (s *FragmentSet) EndOffset() int64

func (*FragmentSet) LongestOverlappingFragment

func (s *FragmentSet) LongestOverlappingFragment(offset int64) int

Finds and returns the fragment covering |offset|, which has the most content after |offset|. If no fragment covers |offset|, the first fragment beginning after |offset| is returned.

type Getter

type Getter interface {
	Get(args ReadArgs) (ReadResult, io.ReadCloser)
}

Performs a Gazette GET operation.

type Head struct {
	// contains filtered or unexported fields
}

func NewHead

func NewHead(journal Name, directory string, persister FragmentPersister,
	updates chan<- Fragment) *Head

func (*Head) Replicate

func (h *Head) Replicate(op ReplicateOp)

func (*Head) StartServingOps

func (h *Head) StartServingOps(writeHead int64) *Head

func (*Head) Stop

func (h *Head) Stop()
type Header interface {
	Head(args ReadArgs) (result ReadResult, fragmentLocation *url.URL)
}

Performs a Gazette HEAD operation.

type IndexWatcher

type IndexWatcher struct {
	// contains filtered or unexported fields
}

IndexWatcher monitors a journal's storage location in the cloud filesystem for new fragments, by performing periodic directory listings. When new fragment metadata arrives, it's published to the journal Tail via a shared channel, which indexes the fragment and makes it available for read requests.

func NewIndexWatcher

func NewIndexWatcher(journal Name, cfs cloudstore.FileSystem,
	updates chan<- Fragment) *IndexWatcher

func (*IndexWatcher) StartWatchingIndex

func (w *IndexWatcher) StartWatchingIndex() *IndexWatcher

func (*IndexWatcher) Stop

func (w *IndexWatcher) Stop()

func (*IndexWatcher) WaitForInitialLoad

func (w *IndexWatcher) WaitForInitialLoad()

type Mark

type Mark struct {
	Journal Name
	Offset  int64
}

A Mark references a specific |Offset| within a |Journal|.

func NewMark

func NewMark(name Name, offset int64) Mark

type MarkedReader

type MarkedReader struct {
	Mark Mark
	io.ReadCloser
}

A MarkedReader delegates reads to an underlying reader, and maintains |Mark| such that it always points to the next byte to be read.

func NewMarkedReader

func NewMarkedReader(mark Mark, r io.ReadCloser) *MarkedReader

func (*MarkedReader) AdjustedMark

func (r *MarkedReader) AdjustedMark(br *bufio.Reader) Mark

AdjustedMark returns the current Mark adjusted for content read by |br| (which must wrap this MarkedReader) but unconsumed from |br|'s buffer.

func (*MarkedReader) Close

func (r *MarkedReader) Close() error

func (*MarkedReader) Read

func (r *MarkedReader) Read(p []byte) (int, error)

type MemoryBroker

type MemoryBroker struct {
	// DelayWrites indicates that writes should queue (and their promises not resolve) until:
	//   * The next explicit Flush, or
	//   * DelayWrites is set to false and another Write occurs.
	DelayWrites bool
	// Content written to each journal.
	Content map[Name]*bytes.Buffer
	// Pending content which will be written on the next Flush (or write, if !DelayWrites).
	Pending map[Name]*bytes.Buffer
	// contains filtered or unexported fields
}

MemoryBroker provides an in-memory implementation of the Client interface. The intended use is within unit tests which exercise components coordinating through the Client interface.

func NewMemoryBroker

func NewMemoryBroker() *MemoryBroker

NewMemoryBroker returns an initialized, zero-value MemoryBroker.

func (*MemoryBroker) Create

func (j *MemoryBroker) Create(journal Name) error

func (*MemoryBroker) Flush

func (j *MemoryBroker) Flush()

Flush resolves all pending writes and wakes any blocked read operations.

func (*MemoryBroker) Get

func (j *MemoryBroker) Get(args ReadArgs) (ReadResult, io.ReadCloser)

func (*MemoryBroker) Head

func (j *MemoryBroker) Head(args ReadArgs) (ReadResult, *url.URL)

func (*MemoryBroker) ReadFrom

func (j *MemoryBroker) ReadFrom(name Name, r io.Reader) (*AsyncAppend, error)

func (*MemoryBroker) Write

func (j *MemoryBroker) Write(name Name, b []byte) (*AsyncAppend, error)

type MockDoer

type MockDoer struct {
	mock.Mock
}

MockDoer is an autogenerated mock type for the Doer type

func (*MockDoer) Do

func (_m *MockDoer) Do(_a0 *http.Request) (*http.Response, error)

Do provides a mock function with given fields: _a0

type MockFragmentFile

type MockFragmentFile struct {
	mock.Mock
}

MockFragmentFile is an autogenerated mock type for the FragmentFile type

func (*MockFragmentFile) Close

func (_m *MockFragmentFile) Close() error

Close provides a mock function with given fields:

func (*MockFragmentFile) Fd

func (_m *MockFragmentFile) Fd() uintptr

Fd provides a mock function with given fields:

func (*MockFragmentFile) Read

func (_m *MockFragmentFile) Read(p []byte) (int, error)

Read provides a mock function with given fields: p

func (*MockFragmentFile) ReadAt

func (_m *MockFragmentFile) ReadAt(p []byte, off int64) (int, error)

ReadAt provides a mock function with given fields: p, off

func (*MockFragmentFile) Seek

func (_m *MockFragmentFile) Seek(offset int64, whence int) (int64, error)

Seek provides a mock function with given fields: offset, whence

func (*MockFragmentFile) Write

func (_m *MockFragmentFile) Write(p []byte) (int, error)

Write provides a mock function with given fields: p

type MockGetter

type MockGetter struct {
	mock.Mock
}

MockGetter is an autogenerated mock type for the Getter type

func (*MockGetter) Get

func (_m *MockGetter) Get(args ReadArgs) (ReadResult, io.ReadCloser)

Get provides a mock function with given fields: args

type MockHeader

type MockHeader struct {
	mock.Mock
}

MockHeader is an autogenerated mock type for the Header type

func (*MockHeader) Head

func (_m *MockHeader) Head(args ReadArgs) (ReadResult, *url.URL)

Head provides a mock function with given fields: args

type MockWriter

type MockWriter struct {
	mock.Mock
}

MockWriter is an autogenerated mock type for the Writer type

func (*MockWriter) ReadFrom

func (_m *MockWriter) ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error)

ReadFrom provides a mock function with given fields: journal, r

func (*MockWriter) Write

func (_m *MockWriter) Write(journal Name, buffer []byte) (*AsyncAppend, error)

Write provides a mock function with given fields: journal, buffer

type Name

type Name string

A typed journal name. By convention, journals are named using a forward- slash notation which captures their hierarchical relationships into organizations, topics and partitions. For example, a complete Name might be: "company-journals/interesting-topic/part-1234"

func (Name) String

func (n Name) String() string

type ReadArgs

type ReadArgs struct {
	Journal Name
	// Desired offset to begin reading from. Value -1 has special handling, where
	// the read is performed from the current write head. All other positive
	// values specify a desired exact byte offset to read from. If the offset is
	// not available (eg, because it represents a portion of Journal which has
	// been permantently deleted), the broker will return the next available
	// offset. Callers should therefore always inspect the ReadResult Offset.
	Offset int64
	// Whether the operation should block until content becomes available.
	// ErrNotYetAvailable is returned if a non-blocking read has no ready content.
	Blocking bool
	// Context which may trace, cancel or supply a deadline for the operation.
	Context context.Context

	// Deprecated: Server-side support for deadlines will be removed. Use
	// context.WithDeadline instead.
	// The time at which blocking will expire
	Deadline time.Time
}

func (ReadArgs) String

func (a ReadArgs) String() string

type ReadOp

type ReadOp struct {
	ReadArgs

	// Channel by which replica returns a ReadResult.
	Result chan ReadResult `json:"-"`
}

type ReadResult

type ReadResult struct {
	Error error
	// The effective offset of the operation.
	Offset int64
	// Write head at the completion of the operation.
	WriteHead int64
	// RouteToken of the Journal. Set on ErrNotReplica.
	RouteToken
	// Result fragment, set iff |Error| is nil.
	Fragment Fragment
}

func (ReadResult) String

func (a ReadResult) String() string

type Replica

type Replica struct {
	// contains filtered or unexported fields
}

Replica manages journal components required to serve brokered writes, replications, and reads. A Replica instance is capable of switching roles at any time (and multiple times), from a pure replica which may serve replication requests only, to a broker of the journal.

func NewReplica

func NewReplica(journal Name, localDir string, persister FragmentPersister,
	cfs cloudstore.FileSystem) *Replica

func (*Replica) Append

func (r *Replica) Append(op AppendOp)

func (*Replica) Read

func (r *Replica) Read(op ReadOp)

func (*Replica) Replicate

func (r *Replica) Replicate(op ReplicateOp)

func (*Replica) Shutdown

func (r *Replica) Shutdown()

func (*Replica) StartBrokeringWithPeers

func (r *Replica) StartBrokeringWithPeers(routeToken RouteToken, peers []Replicator)

Switch the Replica into broker mode. Appends will be brokered to |peers| with the topology captured by |routeToken|.

func (*Replica) StartReplicating

func (r *Replica) StartReplicating(routeToken RouteToken)

Switch the Replica into pure-replica mode.

type ReplicateArgs

type ReplicateArgs struct {
	Journal Name
	// WriteHead (eg, first byte) of the replicated transaction.
	// Already known and verified by all journal replicas.
	WriteHead int64
	// RouteToken of the transaction, also known and verified by all replicas.
	RouteToken
	// Flags whether replicas should begin a new spool for this transaction.
	NewSpool bool
	// Context which may trace, cancel or supply a deadline for the operation.
	Context context.Context
}

func (ReplicateArgs) String

func (a ReplicateArgs) String() string

type ReplicateOp

type ReplicateOp struct {
	ReplicateArgs

	// Channel by which replica returns a ReplicateResult.
	Result chan ReplicateResult `json:"-"`
}

type ReplicateResult

type ReplicateResult struct {
	Error error
	// Iff |Error| is ErrWrongWriteHead, then |ErrorWriteHead| is the replica's
	// own, strictly greater write head.
	ErrorWriteHead int64
	// Set iff |Error| is nil.
	Writer WriteCommitter
}

func (ReplicateResult) String

func (a ReplicateResult) String() string

type Replicator

type Replicator interface {
	Replicate(op ReplicateOp)
}

A Replicator is able to serve a ReplicateOp. It may be backed by a local Spool, or by a remote Gazette process.

type RetryReader

type RetryReader struct {
	// MarkedReader manages the current reader and offset tracking.
	MarkedReader
	// Whether read operations should block (the default). If Blocking is false,
	// than Read operations may return ErrNotYetAvailable.
	Blocking bool
	// LastResult retains the result of the last journal read operation.
	// Callers may access it to inspect metadata returned by the broker.
	// It may be invalidated on every Read call.
	LastResult ReadResult
	// Getter against which to perform read operations.
	Getter Getter
	// Context to use in read operations issued to Getter.
	Context context.Context
}

RetryReader wraps a Getter and MarkedReader to provide callers with a long-lived journal reader. RetryReader transparently handles and retries errors, and will block as needed to await new journal content.

func NewRetryReader deprecated

func NewRetryReader(mark Mark, getter Getter) *RetryReader

Deprecated: Use NewRetryReaderContext instead. NewRetryReader returns a RetryReader at |mark|, using the provided |getter| for all operations.

func NewRetryReaderContext

func NewRetryReaderContext(ctx context.Context, mark Mark, getter Getter) *RetryReader

NewRetryReaderContext returns a RetryReader at |mark|, using the provided |getter| and |ctx| for all operations.

func (*RetryReader) AdjustedSeek

func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)

AdjustedSeek sets the offset for the next Read, accounting for buffered data and updating the buffer as needed.

func (*RetryReader) Read

func (rr *RetryReader) Read(p []byte) (n int, err error)

Read returns the next available bytes of journal content, retrying as required retry errors or await content to be written. Read will return a non-nil error in the following cases:

  • If the RetryReader context is cancelled.
  • If Blocking is false, and ErrNotYetAvailable is returned by the broker.

All other errors are retried.

func (*RetryReader) Seek

func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next Read. It returns an error if (and only if) |whence| is io.SeekEnd, which is not supported.

type RouteToken

type RouteToken string

Token which describes the ordered set of responsible servers for a Journal: the first acts as broker, and the rest serve replications and reads (only). Structured as '|'-separated URLs rooting the server's Journal hierarchy. Ex: "http://srv-2/a/root|https://srv-1|http://12.34.56.7:8080/other/root".

type Spool

type Spool struct {
	Fragment
	// contains filtered or unexported fields
}

func NewSpool

func NewSpool(directory string, at Mark) (*Spool, error)

func (*Spool) Commit

func (s *Spool) Commit(delta int64) error

func (*Spool) LocalPath

func (s *Spool) LocalPath() string

func (*Spool) Write

func (s *Spool) Write(buf []byte) (n int, err error)

type Tail

type Tail struct {
	// contains filtered or unexported fields
}

func NewTail

func NewTail(journal Name, updates <-chan Fragment) *Tail

func (*Tail) EndOffset

func (t *Tail) EndOffset() int64

func (*Tail) Read

func (t *Tail) Read(op ReadOp)

func (*Tail) StartServingOps

func (t *Tail) StartServingOps() *Tail

func (*Tail) Stop

func (t *Tail) Stop()

type WriteCommitter

type WriteCommitter interface {
	io.Writer
	// Commits the first |count| bytes of previous Write([]byte) content.
	Commit(count int64) error
}

A WriteCommitter extends Writer with a protocol for committing those writes.

type Writer

type Writer interface {
	// Appends |buffer| to |journal|. Either all of |buffer| is written, or none
	// of it is. Returns a Promise which is resolved when the write has been
	// fully committed.
	Write(journal Name, buffer []byte) (*AsyncAppend, error)

	// Appends |r|'s content to |journal|, by reading until io.EOF. Either all of
	// |r| is written, or none of it is. Returns a Promise which is resolved when
	// the write has been fully committed.
	ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error)
}

A Writer allows for append-only writes to a named journal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL