Documentation ¶
Index ¶
- Constants
- Variables
- func NewBrokerLookup() *brokerLookup
- func NewReplicaLookup() *replicaLookup
- func TestJoin(t testing.T, s1 *Server, other ...*Server)
- func WaitForLeader(t testing.T, servers ...*Server) (*Server, []*Server)
- type Broker
- type CommitLog
- type Conn
- func (c *Conn) APIVersions(req *protocol.APIVersionsRequest) (*protocol.APIVersionsResponse, error)
- func (c *Conn) AlterConfigs(req *protocol.AlterConfigsRequest) (*protocol.AlterConfigsResponse, error)
- func (c *Conn) Close() error
- func (c *Conn) ControlledShutdown(req *protocol.ControlledShutdownRequest) (*protocol.ControlledShutdownResponse, error)
- func (c *Conn) CreateTopics(req *protocol.CreateTopicRequests) (*protocol.CreateTopicsResponse, error)
- func (c *Conn) DeleteTopics(req *protocol.DeleteTopicsRequest) (*protocol.DeleteTopicsResponse, error)
- func (c *Conn) DescribeConfigs(req *protocol.DescribeConfigsRequest) (*protocol.DescribeConfigsResponse, error)
- func (c *Conn) DescribeGroups(req *protocol.DescribeGroupsRequest) (*protocol.DescribeGroupsResponse, error)
- func (c *Conn) Fetch(req *protocol.FetchRequest) (*protocol.FetchResponse, error)
- func (c *Conn) FindCoordinator(req *protocol.FindCoordinatorRequest) (*protocol.FindCoordinatorResponse, error)
- func (c *Conn) Heartbeat(req *protocol.HeartbeatRequest) (*protocol.HeartbeatResponse, error)
- func (c *Conn) JoinGroup(req *protocol.JoinGroupRequest) (*protocol.JoinGroupResponse, error)
- func (c *Conn) LeaderAndISR(req *protocol.LeaderAndISRRequest) (*protocol.LeaderAndISRResponse, error)
- func (c *Conn) LeaveGroup(req *protocol.LeaveGroupRequest) (*protocol.LeaveGroupResponse, error)
- func (c *Conn) ListGroups(req *protocol.ListGroupsRequest) (*protocol.ListGroupsResponse, error)
- func (c *Conn) LocalAddr() net.Addr
- func (c *Conn) Metadata(req *protocol.MetadataRequest) (*protocol.MetadataResponse, error)
- func (c *Conn) OffsetCommit(req *protocol.OffsetCommitRequest) (*protocol.OffsetCommitResponse, error)
- func (c *Conn) OffsetFetch(req *protocol.OffsetFetchRequest) (*protocol.OffsetFetchResponse, error)
- func (c *Conn) Offsets(req *protocol.OffsetsRequest) (*protocol.OffsetsResponse, error)
- func (c *Conn) Produce(req *protocol.ProduceRequest) (*protocol.ProduceResponse, error)
- func (c *Conn) Read(b []byte) (int, error)
- func (c *Conn) RemoteAddr() net.Addr
- func (c *Conn) SaslHandshake(req *protocol.SaslHandshakeRequest) (*protocol.SaslHandshakeResponse, error)
- func (c *Conn) SetDeadline(t time.Time) error
- func (c *Conn) SetReadDeadline(t time.Time) error
- func (c *Conn) SetWriteDeadline(t time.Time) error
- func (c *Conn) StopReplica(req *protocol.StopReplicaRequest) (*protocol.StopReplicaResponse, error)
- func (c *Conn) SyncGroup(req *protocol.SyncGroupRequest) (*protocol.SyncGroupResponse, error)
- func (c *Conn) UpdateMetadata(req *protocol.UpdateMetadataRequest) (*protocol.UpdateMetadataResponse, error)
- func (c *Conn) Write(b []byte) (int, error)
- type Context
- func (ctx *Context) Deadline() (deadline time.Time, ok bool)
- func (ctx *Context) Done() <-chan struct{}
- func (ctx *Context) Err() error
- func (c *Context) Header() *protocol.RequestHeader
- func (ctx *Context) Request() interface{}
- func (ctx *Context) Response() interface{}
- func (ctx *Context) String() string
- func (ctx *Context) Value(key interface{}) interface{}
- type Counter
- type Dialer
- type Handler
- type Metrics
- type Replica
- type Replicator
- type ReplicatorConfig
- type Resolver
- type SASL
- type Server
Constants ¶
const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) )
Variables ¶
var ( ErrTopicExists = errors.New("topic exists already") ErrInvalidArgument = errors.New("no logger set") OffsetsTopicName = "__consumer_offsets" OffsetsTopicNumPartitions = 50 )
Functions ¶
func NewBrokerLookup ¶
func NewBrokerLookup() *brokerLookup
func NewReplicaLookup ¶
func NewReplicaLookup() *replicaLookup
func WaitForLeader ¶
WaitForLeader waits for one of the servers to be leader, failing the test if no one is the leader. Returns the leader (if there is one) and non-leaders.
Types ¶
type Broker ¶
Broker represents a broker in a Jocko cluster, like a broker in a Kafka cluster.
func (*Broker) JoinLAN ¶
Join is used to have the broker join the gossip ring. The given address should be another broker listening on the Serf address.
func (*Broker) LANMembers ¶
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn implemenets net.Conn for connections to Jocko brokers. It's used as an internal client for replication fetches and leader and ISR requests.
func Dial ¶
Dial creates a connection to the broker on the given network and address on the default dialer.
func (*Conn) APIVersions ¶
func (c *Conn) APIVersions(req *protocol.APIVersionsRequest) (*protocol.APIVersionsResponse, error)
APIVersions sends an api version request and returns the response.
func (*Conn) AlterConfigs ¶
func (c *Conn) AlterConfigs(req *protocol.AlterConfigsRequest) (*protocol.AlterConfigsResponse, error)
AlterConfigs sends an alter configs request and returns the response.
func (*Conn) ControlledShutdown ¶
func (c *Conn) ControlledShutdown(req *protocol.ControlledShutdownRequest) (*protocol.ControlledShutdownResponse, error)
ControlledShutdown sends a controlled shutdown request and returns the response.
func (*Conn) CreateTopics ¶
func (c *Conn) CreateTopics(req *protocol.CreateTopicRequests) (*protocol.CreateTopicsResponse, error)
CreateTopics sends a create topics request and returns the response.
func (*Conn) DeleteTopics ¶
func (c *Conn) DeleteTopics(req *protocol.DeleteTopicsRequest) (*protocol.DeleteTopicsResponse, error)
DeleteTopics sends a delete topic request and returns the response.
func (*Conn) DescribeConfigs ¶
func (c *Conn) DescribeConfigs(req *protocol.DescribeConfigsRequest) (*protocol.DescribeConfigsResponse, error)
DescribeConfigs sends an describe configs request and returns the response.
func (*Conn) DescribeGroups ¶
func (c *Conn) DescribeGroups(req *protocol.DescribeGroupsRequest) (*protocol.DescribeGroupsResponse, error)
DescribeGroups sends a describe groups request and returns the response.
func (*Conn) Fetch ¶
func (c *Conn) Fetch(req *protocol.FetchRequest) (*protocol.FetchResponse, error)
Fetch sends a fetch request and returns the response.
func (*Conn) FindCoordinator ¶
func (c *Conn) FindCoordinator(req *protocol.FindCoordinatorRequest) (*protocol.FindCoordinatorResponse, error)
FindCoordinator sends a find coordinator request and returns the response.
func (*Conn) Heartbeat ¶
func (c *Conn) Heartbeat(req *protocol.HeartbeatRequest) (*protocol.HeartbeatResponse, error)
Heartbeat sends a heartbeat request and returns the response.
func (*Conn) JoinGroup ¶
func (c *Conn) JoinGroup(req *protocol.JoinGroupRequest) (*protocol.JoinGroupResponse, error)
JoinGroup sends a join group request and returns the response.
func (*Conn) LeaderAndISR ¶
func (c *Conn) LeaderAndISR(req *protocol.LeaderAndISRRequest) (*protocol.LeaderAndISRResponse, error)
LeaderAndISR sends a leader and ISR request and returns the response.
func (*Conn) LeaveGroup ¶
func (c *Conn) LeaveGroup(req *protocol.LeaveGroupRequest) (*protocol.LeaveGroupResponse, error)
LeaveGroup sends a leave group request and returns the response.
func (*Conn) ListGroups ¶
func (c *Conn) ListGroups(req *protocol.ListGroupsRequest) (*protocol.ListGroupsResponse, error)
ListGroups sends a list groups request and returns the response.
func (*Conn) Metadata ¶
func (c *Conn) Metadata(req *protocol.MetadataRequest) (*protocol.MetadataResponse, error)
Metadata sends a metadata request and returns the response.
func (*Conn) OffsetCommit ¶
func (c *Conn) OffsetCommit(req *protocol.OffsetCommitRequest) (*protocol.OffsetCommitResponse, error)
OffsetCommit sends an offset commit and returns the response.
func (*Conn) OffsetFetch ¶
func (c *Conn) OffsetFetch(req *protocol.OffsetFetchRequest) (*protocol.OffsetFetchResponse, error)
OffsetFetch sends an offset fetch and returns the response.
func (*Conn) Offsets ¶
func (c *Conn) Offsets(req *protocol.OffsetsRequest) (*protocol.OffsetsResponse, error)
Offsets sends an offsets request and returns the response.
func (*Conn) Produce ¶
func (c *Conn) Produce(req *protocol.ProduceRequest) (*protocol.ProduceResponse, error)
Produce sends a produce request and returns the response.
func (*Conn) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*Conn) SaslHandshake ¶
func (c *Conn) SaslHandshake(req *protocol.SaslHandshakeRequest) (*protocol.SaslHandshakeResponse, error)
SaslHandshake sends a sasl handshake request and returns the response.
func (*Conn) SetDeadline ¶
SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline. See net.Conn SetDeadline.
func (*Conn) SetReadDeadline ¶
SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.
func (*Conn) SetWriteDeadline ¶
SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.
func (*Conn) StopReplica ¶
func (c *Conn) StopReplica(req *protocol.StopReplicaRequest) (*protocol.StopReplicaResponse, error)
StopReplica sends a stop replica request and returns the response.
func (*Conn) SyncGroup ¶
func (c *Conn) SyncGroup(req *protocol.SyncGroupRequest) (*protocol.SyncGroupResponse, error)
SyncGroup sends a sync group request and returns the response.
func (*Conn) UpdateMetadata ¶
func (c *Conn) UpdateMetadata(req *protocol.UpdateMetadataRequest) (*protocol.UpdateMetadataResponse, error)
UpdateMetadata sends an update metadata request and returns the response.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) Header ¶
func (c *Context) Header() *protocol.RequestHeader
type Counter ¶
type Counter = prometheus.Counter
Alias prometheus' counter, probably only need to use Inc() though.
type Dialer ¶
type Dialer struct { // Unique ID for client connections established by this Dialer. ClientID string // Timeout is the max duration a dial will wait for a connect to complete. Timeout time.Duration // Deadline is the absolute time after which the dial will fail. Zero is no deadline. Deadline time.Time // LocalAddr is the local address to dial. LocalAddr net.Addr // RemoteAddr is the remote address to dial. RemoteAddr net.Addr // KeepAlive is the keep-alive period for a network connection. KeepAlive time.Duration // FallbackDelay is the duration to wait before spawning a fallback connection. If 0, default duration is 300ms. FallbackDelay time.Duration // Resolver species an alternative resolver to use. Resolver Resolver // TLS enables the Dialer to secure connections. If nil, standard net.Conn is used. TLS *tls.Config // DualStack enables RFC 6555-compliant "happy eyeballs" dialing. DualStack bool // SASL enables SASL plain authentication. SASL *SASL }
Dialer is like the net.Dialer API but for opening connections to Jocko brokers.
type Handler ¶
type Handler interface { Run(context.Context, <-chan *Context, chan<- *Context) Leave() error Shutdown() error }
Broker is the interface that wraps the Broker's methods.
type Metrics ¶
type Metrics struct {
RequestsHandled Counter
}
Metrics is used for tracking metrics.
type Replica ¶
type Replica struct { BrokerID int32 Partition structs.Partition IsLocal bool Log CommitLog Hw int64 Leo int64 Replicator *Replicator sync.Mutex }
Replica
type Replicator ¶
type Replicator struct {
// contains filtered or unexported fields
}
Replicator fetches from the partition's leader producing to itself the follower, thereby replicating the partition.
func NewReplicator ¶
func NewReplicator(config ReplicatorConfig, replica *Replica, leader client) *Replicator
NewReplicator returns a new replicator instance.
func (*Replicator) Close ¶
func (r *Replicator) Close() error
Close the replicator object when we are no longer following
func (*Replicator) Replicate ¶
func (r *Replicator) Replicate()
Replicate start fetching messages from the leader and appending them to the local commit log.
type ReplicatorConfig ¶
type Resolver ¶
type Resolver interface { // LookupHost looks up the given host using the local resolver. LookupHost(ctx context.Context, host string) ([]string, error) }
Resolver provides service discovery of the hosts of a kafka cluster.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is used to handle the TCP connections, decode requests, defer to the broker, and encode the responses.