jocko

package module
v0.0.0-...-d6c1418 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2017 License: MIT Imports: 5 Imported by: 0

README

JOCKO

Kafka/distributed commit log service in Go.

Travis CI

Goals of this project:

  • Implement Kafka in Go
  • Protocol compatible with Kafka so Kafka clients and services work with Jocko
  • Make operating simpler
  • Distribute a single binary
  • Use Serf for discovery, Raft for consensus (and remove the need to run ZooKeeper)
  • Smarter configuration settings
    • Able to use percentages of disk space for retention policies rather than only bytes and time kept
    • Handling size configs when you change the number of partitions or add topics
  • Learn a lot and have fun

TODO

  • Producing
  • Fetching
  • Partition consensus and distribution
  • Protocol
    • Produce
    • Fetch
    • Metadata
    • Create Topics
    • Delete Topics
    • Consumer group
  • Discovery
  • Replication [current task]

Reading

License

MIT


Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	ID() int32
	IsController() bool
	CreateTopic(topic string, partitions int32) error
	StartReplica(*Partition) error
	DeleteTopic(topic string) error
	Partition(topic string, id int32) (*Partition, error)
	ClusterMember(brokerID int32) *ClusterMember
	BecomeLeader(topic string, id int32, command *protocol.PartitionState) error
	BecomeFollower(topic string, id int32, command *protocol.PartitionState) error
	Join(addr ...string) (int, error)
	Cluster() []*ClusterMember
	TopicPartitions(topic string) ([]*Partition, error)
	IsLeaderOfPartition(topic string, id int32, leaderID int32) bool
}

Broker is the interface that wraps the Broker's methods.

type ClusterMember

type ClusterMember struct {
	ID   int32  `json:"id"`
	Port int    `json:"port"`
	IP   string `json:"addr"`

	SerfPort int          `json:"-"`
	RaftPort int          `json:"-"`
	Status   MemberStatus `json:"-"`
	// contains filtered or unexported fields
}

ClusterMember is used as a wrapper around a broker's info and a connection to it.

func (*ClusterMember) Addr

func (b *ClusterMember) Addr() *net.TCPAddr

Addr is used to get the address of the member.

func (*ClusterMember) Read

func (b *ClusterMember) Read(p []byte) (int, error)

Read is used to read from the member.

func (*ClusterMember) Write

func (b *ClusterMember) Write(p []byte) (int, error)

Write is used to write the member.

type CommitLog

type CommitLog interface {
	DeleteAll() error
	NewReader(offset int64, maxBytes int32) (io.Reader, error)
	TruncateTo(int64) error
	NewestOffset() int64
	OldestOffset() int64
	Append([]byte) (int64, error)
}

CommitLog is the interface that wraps the commit log's methods and is used to manage a partition's data.

type MemberStatus

type MemberStatus int

MemberStatus is the state that a member is in.

const (
	StatusNone MemberStatus = iota
	StatusAlive
	StatusLeaving
	StatusLeft
	StatusFailed
	StatusReap
)

Different possible states of serf member

type Partition

type Partition struct {
	Topic           string  `json:"topic"`
	ID              int32   `json:"id"`
	Replicas        []int32 `json:"replicas"`
	ISR             []int32 `json:"isr"`
	Leader          int32   `json:"leader"`
	PreferredLeader int32   `json:"preferred_leader"`

	LeaderAndISRVersionInZK int32     `json:"-"`
	CommitLog               CommitLog `json:"-"`

	Conn io.ReadWriter `json:"-"`
}

Partition is the unit of storage in Jocko.

func NewPartition

func NewPartition(topic string, id int32) *Partition

NewPartition is used to create a new partition.

func (*Partition) Append

func (p *Partition) Append(ms []byte) (int64, error)

Append is used to append message sets to the partition.

func (*Partition) Delete

func (p *Partition) Delete() error

Delete is used to delete the partition's data/commitlog.

func (*Partition) HighWatermark

func (p *Partition) HighWatermark() int64

HighWatermark is used to get the newest offset of the partition.

func (*Partition) IsFollowing

func (r *Partition) IsFollowing(id int32) bool

IsFollowing is used to check if the given broker ID's should follow/replicate the leader.

func (*Partition) IsLeader

func (r *Partition) IsLeader(id int32) bool

IsLeader is used to check if the given broker ID's the partition's leader.

func (*Partition) IsOpen

func (r *Partition) IsOpen() bool

IsOpen is used to check whether the partition's commit log has been initialized.

func (*Partition) LeaderID

func (p *Partition) LeaderID() int32

LeaderID is used to get the partition's leader broker ID.

func (*Partition) LowWatermark

func (p *Partition) LowWatermark() int64

LowWatermark is used to oldest offset of the partition.

func (*Partition) NewReader

func (p *Partition) NewReader(offset int64, maxBytes int32) (io.Reader, error)

NewReader is used to create a reader at the given offset and will read up to maxBytes.

func (*Partition) Read

func (p *Partition) Read(b []byte) (int, error)

Write is used to directly read the given bytes from the partition's leader.

func (*Partition) String

func (r *Partition) String() string

String returns the topic/Partition as a string.

func (*Partition) TruncateTo

func (p *Partition) TruncateTo(offset int64) error

TruncateTo is used to truncate the partition's logs before the given offset.

func (*Partition) Write

func (p *Partition) Write(b []byte) (int, error)

Write is used to directly write the given bytes to the partition's leader.

type Proxy

type Proxy interface {
	FetchMessages(clientID string, fetchRequest *protocol.FetchRequest) (*protocol.FetchResponses, error)
	CreateTopic(clientID string, createRequest *protocol.CreateTopicRequest) (*protocol.CreateTopicsResponse, error)
}

Proxy is the interface that wraps Proxy methods for forwarding requests to an existing Jocko server and returning server response to caller

type Raft

type Raft interface {
	Bootstrap(peers []*ClusterMember, commandCh chan<- RaftCommand, leaderCh chan<- bool) (err error)
	Apply(cmd RaftCommand) error
	IsLeader() bool
	LeaderID() string
	WaitForBarrier() error
	AddPeer(addr string) error
	RemovePeer(addr string) error
	Shutdown() error
	Addr() string
}

Raft is the interface that wraps Raft's methods and is used to manage consensus for the Jocko cluster.

type RaftCmdType

type RaftCmdType int

type RaftCommand

type RaftCommand struct {
	Cmd  RaftCmdType      `json:"type"`
	Data *json.RawMessage `json:"data"`
}

type Serf

type Serf interface {
	Bootstrap(node *ClusterMember, reconcileCh chan<- *ClusterMember) error
	Cluster() []*ClusterMember
	Member(memberID int32) *ClusterMember
	Join(addrs ...string) (int, error)
	Shutdown() error
	Addr() string
}

Serf is the interface that wraps Serf methods and is used to manage the cluster membership for Jocko nodes.

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL