Documentation ¶
Overview ¶
Package storage provides access to the Store and Range abstractions. Each Cockroach node handles one or more stores, each of which multiplexes to one or more ranges, identified by [start, end) keys. Ranges are contiguous regions of the keyspace. Each range implements an instance of the Raft consensus algorithm to synchronize participating range replicas.
Each store is represented by a single engine.Engine instance. The ranges hosted by a store all have access to the same engine, but write to only a range-limited keyspace within it. Ranges access the underlying engine via the MVCC interface, which provides historical versioned values.
Package storage is a generated protocol buffer package. It is generated from these files: cockroach/storage/status.proto It has these top-level messages: StoreStatus
Example (Rebalancing) ¶
// Model a set of stores in a cluster, // randomly adding / removing stores and adding bytes. g := gossip.New(nil, nil) // Have to call g.SetNodeID before call g.AddInfo g.SetNodeID(roachpb.NodeID(1)) stopper := stop.NewStopper() defer stopper.Stop() sp := NewStorePool(g, TestTimeUntilStoreDeadOff, stopper) alloc := MakeAllocator(sp, AllocatorOptions{AllowRebalance: true, Deterministic: true}) var wg sync.WaitGroup g.RegisterCallback(gossip.MakePrefixPattern(gossip.KeyStorePrefix), func(_ string, _ roachpb.Value) { wg.Done() }) const generations = 100 const nodes = 20 // Initialize testStores. var testStores [nodes]testStore for i := 0; i < len(testStores); i++ { testStores[i].StoreID = roachpb.StoreID(i) testStores[i].Node = roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)} testStores[i].Capacity = roachpb.StoreCapacity{Capacity: 1 << 30, Available: 1 << 30} } // Initialize the cluster with a single range. testStores[0].add(alloc.randGen.Int63n(1 << 20)) for i := 0; i < generations; i++ { // First loop through test stores and add data. wg.Add(len(testStores)) for j := 0; j < len(testStores); j++ { // Add a pretend range to the testStore if there's already one. if testStores[j].Capacity.RangeCount > 0 { testStores[j].add(alloc.randGen.Int63n(1 << 20)) } if err := g.AddInfoProto(gossip.MakeStoreKey(roachpb.StoreID(j)), &testStores[j].StoreDescriptor, 0); err != nil { panic(err) } } wg.Wait() // Next loop through test stores and maybe rebalance. for j := 0; j < len(testStores); j++ { ts := &testStores[j] if alloc.ShouldRebalance(ts.StoreID) { target := alloc.RebalanceTarget(ts.StoreID, roachpb.Attributes{}, []roachpb.ReplicaDescriptor{{NodeID: ts.Node.NodeID, StoreID: ts.StoreID}}) if target != nil { testStores[j].rebalance(&testStores[int(target.StoreID)], alloc.randGen.Int63n(1<<20)) } } } // Output store capacities as hexidecimal 2-character values. if i%(generations/50) == 0 { var maxBytes int64 for j := 0; j < len(testStores); j++ { bytes := testStores[j].Capacity.Capacity - testStores[j].Capacity.Available if bytes > maxBytes { maxBytes = bytes } } if maxBytes > 0 { for j := 0; j < len(testStores); j++ { endStr := " " if j == len(testStores)-1 { endStr = "" } bytes := testStores[j].Capacity.Capacity - testStores[j].Capacity.Available fmt.Printf("%03d%s", (999*bytes)/maxBytes, endStr) } fmt.Printf("\n") } } } var totBytes int64 var totRanges int32 for i := 0; i < len(testStores); i++ { totBytes += testStores[i].Capacity.Capacity - testStores[i].Capacity.Available totRanges += testStores[i].Capacity.RangeCount } fmt.Printf("Total bytes=%d, ranges=%d\n", totBytes, totRanges)
Output: 999 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 976 000 284 734 000 000 000 000 000 000 999 000 000 000 000 000 000 000 000 000 336 148 196 651 000 999 000 000 000 652 628 000 000 717 000 613 422 297 507 000 462 470 391 464 999 589 428 135 211 540 681 481 000 237 306 314 291 157 165 343 411 355 666 657 863 537 999 295 588 551 796 645 461 209 872 710 422 501 528 664 603 496 683 497 970 537 983 500 739 584 861 461 624 588 999 826 572 429 595 596 640 538 646 566 822 565 899 646 666 621 823 603 558 604 999 821 511 625 581 518 636 584 824 643 856 576 898 726 832 837 842 665 605 768 999 989 637 833 619 630 695 600 897 714 867 578 954 718 821 755 963 755 646 765 999 961 764 799 637 690 838 655 967 705 908 621 953 747 874 782 973 753 734 707 999 993 741 852 691 733 823 663 903 627 826 673 999 740 832 795 899 717 673 683 944 952 743 877 709 658 866 721 909 618 836 719 999 735 870 732 896 693 697 684 939 907 823 874 695 651 929 773 954 693 798 746 999 732 936 744 873 717 718 672 857 853 838 829 710 697 894 834 985 707 823 708 999 703 938 704 874 746 721 662 831 842 853 818 717 697 875 827 999 678 843 674 918 687 890 679 882 710 677 695 859 808 836 768 725 673 855 805 999 677 893 718 991 709 922 704 925 709 729 740 856 797 876 803 712 723 882 863 989 724 924 798 999 756 962 766 935 721 739 768 887 859 891 837 713 768 958 883 970 718 965 815 999 746 978 777 976 790 815 772 867 846 890 872 764 787 892 878 928 720 920 784 999 723 926 754 922 736 752 750 808 795 828 840 752 764 868 867 937 743 906 784 999 729 904 748 952 714 759 742 841 828 855 853 756 760 853 892 900 741 902 797 999 740 882 743 927 739 736 754 844 845 842 841 752 763 861 913 898 802 902 835 999 741 911 776 940 768 737 788 897 885 883 852 780 833 884 923 898 799 931 835 999 786 957 795 987 816 758 780 909 863 879 886 793 858 875 910 873 786 947 808 999 782 951 756 967 815 740 792 919 842 864 866 799 839 865 900 884 844 926 822 999 814 963 763 952 844 796 812 923 861 864 857 841 875 846 902 861 876 914 863 999 835 916 785 931 846 868 844 901 872 853 881 847 882 926 939 927 903 923 910 993 881 915 870 999 890 910 895 939 944 928 918 894 936 938 938 922 898 944 900 973 917 921 882 999 906 917 925 915 959 959 951 882 908 944 959 935 911 931 896 951 953 930 901 999 911 946 937 920 944 966 956 892 921 924 977 930 920 955 884 984 977 905 901 999 915 928 950 923 960 969 931 915 911 927 970 937 923 953 897 946 952 886 891 999 919 885 934 910 962 946 917 925 912 949 979 915 940 952 929 968 963 890 888 999 931 881 941 912 951 940 921 909 914 972 999 929 956 946 944 984 996 941 938 981 960 915 935 918 974 972 931 921 940 945 983 935 946 962 952 999 993 947 905 956 946 891 941 925 984 950 921 926 909 958 973 972 954 969 966 999 976 952 911 943 968 891 959 929 995 948 931 940 978 967 949 957 979 971 979 979 981 959 941 933 977 883 970 918 999 960 934 944 978 992 967 980 977 995 964 991 999 964 955 935 987 923 957 920 993 960 933 955 985 982 988 987 975 993 958 991 999 950 943 946 978 943 976 931 988 979 936 938 984 956 972 971 953 999 942 975 975 946 948 936 958 934 958 928 969 985 929 935 971 968 968 978 946 999 974 967 976 968 944 943 967 947 964 943 986 994 952 949 973 978 971 990 931 978 981 967 971 980 958 960 975 949 955 928 982 999 933 947 961 970 973 972 919 977 988 958 952 957 944 973 989 947 955 914 967 999 937 933 961 968 975 953 929 996 999 950 939 948 956 980 966 945 973 915 965 991 936 948 973 956 958 928 936 980 999 922 933 943 959 964 956 929 953 903 973 967 926 953 939 951 952 930 923 970 999 933 928 940 959 980 939 922 963 922 974 970 934 937 940 957 960 938 924 964 999 928 936 948 981 998 964 940 979 942 978 985 944 947 954 969 961 928 936 955 996 911 942 942 986 999 952 941 974 955 974 980 934 931 957 964 961 934 938 938 992 918 933 933 972 999 949 935 981 951 975 975 930 929 942 971 961 931 936 947 999 933 934 929 970 997 958 937 978 955 962 956 946 937 935 972 966 916 945 931 996 940 929 924 961 999 965 935 988 958 961 946 951 940 928 Total bytes=990773690, ranges=1903
Index ¶
- Constants
- Variables
- func DeleteRange(txn *client.Txn, b *client.Batch, key roachpb.RKey) error
- func InsertRange(txn *client.Txn, b *client.Batch, key roachpb.RKey) error
- func ProcessStoreEvent(l StoreEventListener, event interface{})
- func SetupRangeTree(batch engine.Engine, ms *engine.MVCCStats, timestamp roachpb.Timestamp, ...) error
- type Allocator
- func (a *Allocator) AllocateTarget(required roachpb.Attributes, existing []roachpb.ReplicaDescriptor, ...) (*roachpb.StoreDescriptor, error)
- func (a *Allocator) ComputeAction(zone config.ZoneConfig, desc *roachpb.RangeDescriptor) (AllocatorAction, float64)
- func (a Allocator) RebalanceTarget(storeID roachpb.StoreID, required roachpb.Attributes, ...) *roachpb.StoreDescriptor
- func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor) (roachpb.ReplicaDescriptor, error)
- func (a Allocator) ShouldRebalance(storeID roachpb.StoreID) bool
- type AllocatorAction
- type AllocatorOptions
- type BalanceMode
- type BeginScanRangesEvent
- type CommandQueue
- type EndScanRangesEvent
- type MergeRangeEvent
- type NotBootstrappedError
- type RegisterRangeEvent
- type RemoveRangeEvent
- type Replica
- func (r *Replica) AdminMerge(args roachpb.AdminMergeRequest, origLeftDesc *roachpb.RangeDescriptor) (roachpb.AdminMergeResponse, error)
- func (r *Replica) AdminSplit(args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor) (roachpb.AdminSplitResponse, error)
- func (r *Replica) Append(entries []raftpb.Entry) error
- func (r *Replica) ApplySnapshot(snap raftpb.Snapshot) error
- func (r *Replica) BeginTransaction(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.BeginTransactionResponse, error)
- func (r *Replica) ChangeReplicas(changeType roachpb.ReplicaChangeType, replica roachpb.ReplicaDescriptor, ...) error
- func (r *Replica) ConditionalPut(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.ConditionalPutResponse, error)
- func (r *Replica) ContainsKey(key roachpb.Key) bool
- func (r *Replica) ContainsKeyRange(start, end roachpb.Key) bool
- func (r *Replica) Delete(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.DeleteResponse, error)
- func (r *Replica) DeleteRange(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.DeleteRangeResponse, error)
- func (r *Replica) Desc() *roachpb.RangeDescriptor
- func (r *Replica) Destroy(origDesc roachpb.RangeDescriptor) error
- func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.EndTransactionResponse, []roachpb.Intent, error)
- func (r *Replica) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error)
- func (r *Replica) FirstIndex() (uint64, error)
- func (r *Replica) GC(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.GCResponse, error)
- func (r *Replica) Get(batch engine.Engine, h roachpb.Header, args roachpb.GetRequest) (roachpb.GetResponse, []roachpb.Intent, error)
- func (r *Replica) GetGCMetadata() (*roachpb.GCMetadata, error)
- func (r *Replica) GetLastVerificationTimestamp() (roachpb.Timestamp, error)
- func (r *Replica) GetMVCCStats() engine.MVCCStats
- func (r *Replica) GetMaxBytes() int64
- func (r *Replica) GetReplica() *roachpb.ReplicaDescriptor
- func (r *Replica) HeartbeatTxn(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.HeartbeatTxnResponse, error)
- func (r *Replica) Increment(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.IncrementResponse, error)
- func (r *Replica) InitialState() (raftpb.HardState, raftpb.ConfState, error)
- func (r *Replica) IsFirstRange() bool
- func (r *Replica) LastIndex() (uint64, error)
- func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.LeaderLeaseResponse, error)
- func (r *Replica) Less(i btree.Item) bool
- func (r *Replica) Merge(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.MergeResponse, error)
- func (r *Replica) PushTxn(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.PushTxnResponse, error)
- func (r *Replica) Put(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.PutResponse, error)
- func (r *Replica) RangeLookup(batch engine.Engine, h roachpb.Header, args roachpb.RangeLookupRequest) (roachpb.RangeLookupResponse, []roachpb.Intent, error)
- func (r *Replica) ReplicaDescriptor(replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
- func (r *Replica) ResolveIntent(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.ResolveIntentResponse, error)
- func (r *Replica) ResolveIntentRange(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.ResolveIntentRangeResponse, error)
- func (r *Replica) ReverseScan(batch engine.Engine, h roachpb.Header, args roachpb.ReverseScanRequest) (roachpb.ReverseScanResponse, []roachpb.Intent, error)
- func (r *Replica) Scan(batch engine.Engine, h roachpb.Header, args roachpb.ScanRequest) (roachpb.ScanResponse, []roachpb.Intent, error)
- func (r *Replica) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
- func (r *Replica) SetHardState(st raftpb.HardState) error
- func (r *Replica) SetLastVerificationTimestamp(timestamp roachpb.Timestamp) error
- func (r *Replica) SetMaxBytes(maxBytes int64)
- func (r *Replica) Snapshot() (raftpb.Snapshot, error)
- func (r *Replica) String() string
- func (r *Replica) Term(i uint64) (uint64, error)
- func (r *Replica) TruncateLog(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, ...) (roachpb.TruncateLogResponse, error)
- type ReplicationStatusEvent
- type SequenceCache
- func (sc *SequenceCache) ClearData(e engine.Engine) error
- func (sc *SequenceCache) CopyFrom(e engine.Engine, originRangeID roachpb.RangeID) error
- func (sc *SequenceCache) CopyInto(e engine.Engine, destRangeID roachpb.RangeID) error
- func (sc *SequenceCache) Get(e engine.Engine, id []byte, dest *roachpb.SequenceCacheEntry) (uint32, uint32, error)
- func (sc *SequenceCache) GetAllTransactionID(e engine.Engine, id []byte) ([]roachpb.KeyValue, error)
- func (sc *SequenceCache) Iterate(e engine.Engine, f func([]byte, []byte, roachpb.SequenceCacheEntry))
- func (sc *SequenceCache) Put(e engine.Engine, id []byte, epoch, seq uint32, txnKey roachpb.Key, ...) error
- type SplitRangeEvent
- type StartStoreEvent
- type Store
- func (s *Store) AddReplicaTest(rng *Replica) error
- func (s *Store) AppliedIndex(groupID roachpb.RangeID) (uint64, error)
- func (s *Store) Attrs() roachpb.Attributes
- func (s *Store) Bootstrap(ident roachpb.StoreIdent, stopper *stop.Stopper) error
- func (s *Store) BootstrapRange(initialValues []roachpb.KeyValue) error
- func (s *Store) CanApplySnapshot(rangeID roachpb.RangeID, snap raftpb.Snapshot) bool
- func (s *Store) Capacity() (roachpb.StoreCapacity, error)
- func (s *Store) Clock() *hlc.Clock
- func (s *Store) ClusterID() string
- func (s *Store) Context(ctx context.Context) context.Context
- func (s *Store) DB() *client.DB
- func (s *Store) Descriptor() (*roachpb.StoreDescriptor, error)
- func (s *Store) DisableRaftLogQueue(disabled bool)
- func (s *Store) DisableReplicaGCQueue(disabled bool)
- func (s *Store) Engine() engine.Engine
- func (s *Store) EventFeed() StoreEventFeed
- func (s *Store) ForceRaftLogScanAndProcess(t util.Tester)
- func (s *Store) ForceReplicaGCScan(t util.Tester)
- func (s *Store) ForceReplicationScan(t util.Tester)
- func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error)
- func (s *Store) GetStatus() (*StoreStatus, error)
- func (s *Store) Gossip() *gossip.Gossip
- func (s *Store) GossipStore()
- func (s *Store) GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (multiraft.WriteableGroupStorage, error)
- func (s *Store) IsStarted() bool
- func (s *Store) LookupReplica(start, end roachpb.RKey) *Replica
- func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, ...) error
- func (s *Store) NewRangeDescriptor(start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor) (*roachpb.RangeDescriptor, error)
- func (s *Store) NewSnapshot() engine.Engine
- func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd roachpb.RaftCommand) <-chan error
- func (s *Store) PublishStatus() error
- func (s *Store) RaftLocker() sync.Locker
- func (s *Store) RaftStatus(rangeID roachpb.RangeID) *raft.Status
- func (s *Store) RemoveReplica(rep *Replica, origDesc roachpb.RangeDescriptor) error
- func (s *Store) ReplicaCount() int
- func (s *Store) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
- func (s *Store) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
- func (s *Store) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)
- func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
- func (s *Store) SetRangeRetryOptions(ro retry.Options)
- func (s *Store) SplitRange(origRng, newRng *Replica) error
- func (s *Store) Start(stopper *stop.Stopper) error
- func (s *Store) StartedAt() int64
- func (s *Store) Stopper() *stop.Stopper
- func (s *Store) StoreID() roachpb.StoreID
- func (s *Store) String() string
- func (s *Store) Tracer() *tracer.Tracer
- func (s *Store) WaitForInit()
- type StoreContext
- type StoreEventFeed
- type StoreEventListener
- type StoreList
- type StorePool
- type StoreStatus
- func (m *StoreStatus) Marshal() (data []byte, err error)
- func (m *StoreStatus) MarshalTo(data []byte) (int, error)
- func (*StoreStatus) ProtoMessage()
- func (m *StoreStatus) Reset()
- func (m *StoreStatus) Size() (n int)
- func (m *StoreStatus) String() string
- func (m *StoreStatus) Unmarshal(data []byte) error
- type StoreStatusEvent
- type Stores
- func (ls *Stores) AddStore(s *Store)
- func (ls *Stores) FirstRange() (*roachpb.RangeDescriptor, error)
- func (ls *Stores) GetStore(storeID roachpb.StoreID) (*Store, error)
- func (ls *Stores) GetStoreCount() int
- func (ls *Stores) HasStore(storeID roachpb.StoreID) bool
- func (ls *Stores) RangeLookup(key roachpb.RKey, _ *roachpb.RangeDescriptor, ...) ([]roachpb.RangeDescriptor, error)
- func (ls *Stores) RemoveStore(s *Store)
- func (ls *Stores) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
- func (ls *Stores) VisitStores(visitor func(s *Store) error) error
- type TimestampCache
- func (tc *TimestampCache) Add(start, end roachpb.Key, timestamp roachpb.Timestamp, txnID []byte, ...)
- func (tc *TimestampCache) Clear(clock *hlc.Clock)
- func (tc *TimestampCache) GetMax(start, end roachpb.Key, txnID []byte) (roachpb.Timestamp, roachpb.Timestamp)
- func (tc *TimestampCache) MergeInto(dest *TimestampCache, clear bool)
- func (tc *TimestampCache) SetLowWater(lowWater roachpb.Timestamp)
- type UpdateRangeEvent
Examples ¶
Constants ¶
const ( // RaftLogQueueTimerDuration is the duration between checking the // raft logs. RaftLogQueueTimerDuration = time.Second // RaftLogQueueStaleThreshold is the minimum threshold for stale raft log // entries. A stale entry is one which all replicas of the range have // progressed past and thus is no longer needed and can be pruned. RaftLogQueueStaleThreshold = 1 )
const ( // TestTimeUntilStoreDead is the test value for TimeUntilStoreDead to // quickly mark stores as dead. TestTimeUntilStoreDead = 5 * time.Millisecond // TestTimeUntilStoreDeadOff is the test value for TimeUntilStoreDead that // prevents the store pool from marking stores as dead. TestTimeUntilStoreDeadOff = 24 * time.Hour )
const ( // DefaultHeartbeatInterval is how often heartbeats are sent from the // transaction coordinator to a live transaction. These keep it from // being preempted by other transactions writing the same keys. If a // transaction fails to be heartbeat within 2x the heartbeat interval, // it may be aborted by conflicting txns. DefaultHeartbeatInterval = 5 * time.Second )
const ( // DefaultLeaderLeaseDuration is the default duration of the leader lease. DefaultLeaderLeaseDuration = time.Second )
raftInitialLogIndex is the starting point for the raft log. We bootstrap the raft membership by synthesizing a snapshot as if there were some discarded prefix to the log, so we must begin the log at an arbitrary index greater than 1.
const ( // MinTSCacheWindow specifies the minimum duration to hold entries in // the cache before allowing eviction. After this window expires, // transactions writing to this node with timestamps lagging by more // than minCacheWindow will necessarily have to advance their commit // timestamp. MinTSCacheWindow = 10 * time.Second )
const ( // ReplicaGCQueueInactivityThreshold is the inactivity duration after which // a range will be considered for garbage collection. Exported for testing. ReplicaGCQueueInactivityThreshold = 10 * 24 * time.Hour // 10 days )
Variables ¶
var ( ErrInvalidLengthStatus = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowStatus = fmt.Errorf("proto: integer overflow") )
var ( // TestStoreContext has some fields initialized with values relevant // in tests. TestStoreContext = StoreContext{ RaftTickInterval: 100 * time.Millisecond, RaftHeartbeatIntervalTicks: 1, RaftElectionTimeoutTicks: 2, ScanInterval: 10 * time.Minute, } )
TestingCommandFilter may be set in tests to intercept the handling of commands and artificially generate errors. Return nil to continue with regular processing or non-nil to terminate processing with the returned error. Note that in a multi-replica test this filter will be run once for each replica and must produce consistent results each time. Should only be used in tests in the storage and storage_test packages.
Functions ¶
func DeleteRange ¶
DeleteRange removes a range from the RangeTree. This should only be called from operations that remove ranges, such as AdminMerge.
func InsertRange ¶
InsertRange adds a new range to the RangeTree. This should only be called from operations that create new ranges, such as AdminSplit.
func ProcessStoreEvent ¶
func ProcessStoreEvent(l StoreEventListener, event interface{})
ProcessStoreEvent dispatches an event on the StoreEventListener.
Types ¶
type Allocator ¶
type Allocator struct {
// contains filtered or unexported fields
}
Allocator makes allocation decisions based on available capacity in other stores which match the required attributes for a desired range replica.
When choosing a new allocation target, three candidates from available stores meeting a max fraction of bytes used threshold (maxFractionUsedThreshold) are chosen at random and the least loaded of the three is selected in order to bias loading towards a more balanced cluster, while still spreading load over all available servers. "Load" is defined according to fraction of bytes used, if greater than minFractionUsedThreshold; otherwise it's defined according to range count.
When choosing a rebalance target, a random store is selected from amongst the set of stores with fraction of bytes within rebalanceFromMean from the mean.
func MakeAllocator ¶
func MakeAllocator(storePool *StorePool, options AllocatorOptions) Allocator
MakeAllocator creates a new allocator using the specified StorePool.
func (*Allocator) AllocateTarget ¶
func (a *Allocator) AllocateTarget(required roachpb.Attributes, existing []roachpb.ReplicaDescriptor, relaxConstraints bool, filter func(storeDesc *roachpb.StoreDescriptor, count, used *stat) bool) (*roachpb.StoreDescriptor, error)
AllocateTarget returns a suitable store for a new allocation with the required attributes. Nodes already accommodating existing replicas are ruled out as targets. If relaxConstraints is true, then the required attributes will be relaxed as necessary, from least specific to most specific, in order to allocate a target. If needed, a filter function can be added that further filter the results. The function will be passed the storeDesc and the used and new counts. It returns a bool indicating inclusion or exclusion from the set of stores being considered.
func (*Allocator) ComputeAction ¶
func (a *Allocator) ComputeAction(zone config.ZoneConfig, desc *roachpb.RangeDescriptor) ( AllocatorAction, float64)
ComputeAction determines the exact operation needed to repair the supplied range, as governed by the supplied zone configuration. It returns the required action that should be taken and a replica on which the action should be performed.
func (Allocator) RebalanceTarget ¶
func (a Allocator) RebalanceTarget(storeID roachpb.StoreID, required roachpb.Attributes, existing []roachpb.ReplicaDescriptor) *roachpb.StoreDescriptor
RebalanceTarget returns a suitable store for a rebalance target with required attributes. Rebalance targets are selected via the same mechanism as AllocateTarget(), except the chosen target must follow some additional criteria. Namely, if chosen, it must further the goal of balancing the cluster.
The supplied parameters are the StoreID of the replica being rebalanced, the required attributes for the replica being rebalanced, and a list of the existing replicas of the range (which must include the replica being rebalanced).
Simply ignoring a rebalance opportunity in the event that the target chosen by AllocateTarget() doesn't fit balancing criteria is perfectly fine, as other stores in the cluster will also be doing their probabilistic best to rebalance. This helps prevent a stampeding herd targeting an abnormally under-utilized store.
func (Allocator) RemoveTarget ¶
func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor) (roachpb.ReplicaDescriptor, error)
RemoveTarget returns a suitable replica to remove from the provided replica set. It attempts to consider which of the provided replicas would be the best candidate for removal.
TODO(mrtracy): removeTarget eventually needs to accept the attributes from the zone config associated with the provided replicas. This will allow it to make correct decisions in the case of ranges with heterogeneous replica requirements (i.e. multiple data centers).
type AllocatorAction ¶
type AllocatorAction int
AllocatorAction enumerates the various replication adjustments that may be recommended by the allocator.
const ( AllocatorNoop AllocatorAction AllocatorRemove AllocatorAdd AllocatorRemoveDead )
These are the possible allocator actions.
type AllocatorOptions ¶
type AllocatorOptions struct { // AllowRebalance allows this store to attempt to rebalance its own // replicas to other stores. AllowRebalance bool // Mode determines the strategy that will be used to locate stores for // allocation decisions in a way that balances load across the cluster. Mode BalanceMode // Deterministic makes allocation decisions deterministic, based on // current cluster statistics. If this flag is not set, allocation operations // will have random behavior. This flag is intended to be set for testing // purposes only. Deterministic bool }
AllocatorOptions are configurable options which effect the way that the replicate queue will handle rebalancing opportunities.
type BalanceMode ¶
type BalanceMode int
A BalanceMode is a configurable mode which effects how the allocator makes rebalancing decisions.
const ( // BalanceModeUsage balances ranges between stores by primarily // considering disk space usage, but also considers range counts in nascent // clusters. BalanceModeUsage BalanceMode = iota // BalanceModeRangeCount balances ranges by considering the total range // count of each node. BalanceModeRangeCount )
func (*BalanceMode) Set ¶
func (r *BalanceMode) Set(value string) error
Set configures the given BalanceMode from a string provided from the command line. It returns an error if the provided string value is not recognized. Needed to implement pflag.Value.
func (*BalanceMode) String ¶
func (r *BalanceMode) String() string
String is needed to implement the pflag.Value interface, allowing this to be set from the command line.
func (*BalanceMode) Type ¶
func (r *BalanceMode) Type() string
Type is needed by the pflag.Value interface.
type BeginScanRangesEvent ¶
BeginScanRangesEvent occurs when the store is about to scan over all ranges. During such a scan, each existing range will be published to the feed as a RegisterRangeEvent with the Scan flag set. This is used because downstream consumers may be tracking statistics via the Deltas in UpdateRangeEvent; this event informs subscribers to clear currently cached values.
type CommandQueue ¶
type CommandQueue struct {
// contains filtered or unexported fields
}
A CommandQueue maintains an interval tree of keys or key ranges for executing commands. New commands affecting keys or key ranges must wait on already-executing commands which overlap their key range.
Before executing, a command invokes GetWait() to initialize a WaitGroup with the number of overlapping commands which are already running. The wait group is waited on by the caller for confirmation that all overlapping, pending commands have completed and the pending command can proceed.
After waiting, a command is added to the queue's already-executing set via Add(). Add accepts a parameter indicating whether the command is read-only. Read-only commands don't need to wait on other read-only commands, so the wait group returned via GetWait() doesn't include read-only on read-only overlapping commands as an optimization.
Once commands complete, Remove() is invoked to remove the executing command and decrement the counts on any pending WaitGroups, possibly signaling waiting commands who were gated by the executing command's affected key(s).
CommandQueue is not thread safe.
func NewCommandQueue ¶
func NewCommandQueue() *CommandQueue
NewCommandQueue returns a new command queue.
func (*CommandQueue) Add ¶
func (cq *CommandQueue) Add(readOnly bool, spans ...roachpb.Span) []interface{}
Add adds commands to the queue which affect the specified key ranges. Ranges without an end key affect only the start key. The returned interface is the key for the command queue and must be re-supplied on subsequent invocation of Remove().
Add should be invoked after waiting on already-executing, overlapping commands via the WaitGroup initialized through GetWait().
func (*CommandQueue) Clear ¶
func (cq *CommandQueue) Clear()
Clear removes all executing commands, signaling any waiting commands.
func (*CommandQueue) GetWait ¶
GetWait initializes the supplied wait group with the number of executing commands which overlap the specified key ranges. If an end key is empty, it only affects the start key. The caller should call wg.Wait() to wait for confirmation that all gating commands have completed or failed, and then call Add() to add the keys to the command queue. readOnly is true if the requester is a read-only command; false for read-write.
func (*CommandQueue) Remove ¶
func (cq *CommandQueue) Remove(keys []interface{})
Remove is invoked to signal that the command associated with the specified key has completed and should be removed. Any pending commands waiting on this command will be signaled if this is the only command upon which they are still waiting.
Remove is invoked after a mutating command has been committed to the Raft log and applied to the underlying state machine. Similarly, Remove is invoked after a read-only command has been executed against the underlying state machine.
type EndScanRangesEvent ¶
EndScanRangesEvent occurs when the store has finished scanning all ranges. Every BeginScanRangeEvent will eventually be followed by an EndScanRangeEvent.
type MergeRangeEvent ¶
type MergeRangeEvent struct { StoreID roachpb.StoreID Merged UpdateRangeEvent Removed RemoveRangeEvent }
MergeRangeEvent occurs whenever a range is merged into another. This Event contains two component events: an UpdateRangeEvent for the range which subsumed the other, and a RemoveRangeEvent for the range that was subsumed.
type NotBootstrappedError ¶
type NotBootstrappedError struct{}
A NotBootstrappedError indicates that an engine has not yet been bootstrapped due to a store identifier not being present.
func (*NotBootstrappedError) Error ¶
func (e *NotBootstrappedError) Error() string
Error formats error.
type RegisterRangeEvent ¶
type RegisterRangeEvent struct { StoreID roachpb.StoreID Desc *roachpb.RangeDescriptor Stats engine.MVCCStats Scan bool }
RegisterRangeEvent occurs in two scenarios. Firstly, while a store broadcasts its list of ranges to initialize one or more new accumulators (with Scan set to true), or secondly, when a new range is initialized on the store (for example through replication), with Scan set to false. This event includes the Range's RangeDescriptor and current MVCCStats.
type RemoveRangeEvent ¶
type RemoveRangeEvent struct { StoreID roachpb.StoreID Desc *roachpb.RangeDescriptor Stats engine.MVCCStats }
RemoveRangeEvent occurs whenever a Range is removed from a store. This structure includes the Range's RangeDescriptor and the Range's previous MVCCStats before it was removed.
type Replica ¶
type Replica struct { sync.RWMutex // Protects the following fields: // contains filtered or unexported fields }
A Replica is a contiguous keyspace with writes managed via an instance of the Raft consensus algorithm. Many ranges may exist in a store and they are unlikely to be contiguous. Ranges are independent units and are responsible for maintaining their own integrity by replacing failed replicas, splitting and merging as appropriate.
func NewReplica ¶
func NewReplica(desc *roachpb.RangeDescriptor, rm *Store) (*Replica, error)
NewReplica initializes the replica using the given metadata.
func (*Replica) AdminMerge ¶
func (r *Replica) AdminMerge(args roachpb.AdminMergeRequest, origLeftDesc *roachpb.RangeDescriptor) (roachpb.AdminMergeResponse, error)
AdminMerge extends this range to subsume the range that comes next in the key space. The merge is performed inside of a distributed transaction which writes the updated range descriptor for the subsuming range and deletes the range descriptor for the subsumed one. It also updates the range addressing metadata. The handover of responsibility for the reassigned key range is carried out seamlessly through a merge trigger carried out as part of the commit of that transaction. A merge requires that the two ranges are collocated on the same set of replicas.
The supplied RangeDescriptor is used as a form of optimistic lock. See the comment of "AdminSplit" for more information on this pattern.
func (*Replica) AdminSplit ¶
func (r *Replica) AdminSplit(args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor) (roachpb.AdminSplitResponse, error)
AdminSplit divides the range into into two ranges, using either args.SplitKey (if provided) or an internally computed key that aims to roughly equipartition the range by size. The split is done inside of a distributed txn which writes updated and new range descriptors, and updates the range addressing metadata. The handover of responsibility for the reassigned key range is carried out seamlessly through a split trigger carried out as part of the commit of that transaction.
The supplied RangeDescriptor is used as a form of optimistic lock. An operation which might split a range should obtain a copy of the range's current descriptor before making the decision to split. If the decision is affirmative the descriptor is passed to AdminSplit, which performs a Conditional Put on the RangeDescriptor to ensure that no other operation has modified the range in the time the decision was being made. TODO(tschottdorf): should assert that split key is not a local key.
func (*Replica) ApplySnapshot ¶
ApplySnapshot implements the multiraft.WriteableGroupStorage interface.
func (*Replica) BeginTransaction ¶
func (r *Replica) BeginTransaction(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.BeginTransactionRequest) (roachpb.BeginTransactionResponse, error)
BeginTransaction writes the initial transaction record. Fails in the event that a transaction record is already written. This may occur if a transaction is started with a batch containing writes to different ranges, and the range containing the txn record fails to receive the write batch before a heartbeat or txn push is performed first and aborts the transaction.
func (*Replica) ChangeReplicas ¶
func (r *Replica) ChangeReplicas(changeType roachpb.ReplicaChangeType, replica roachpb.ReplicaDescriptor, desc *roachpb.RangeDescriptor) error
ChangeReplicas adds or removes a replica of a range. The change is performed in a distributed transaction and takes effect when that transaction is committed. When removing a replica, only the NodeID and StoreID fields of the Replica are used.
The supplied RangeDescriptor is used as a form of optimistic lock. See the comment of "AdminSplit" for more information on this pattern.
func (*Replica) ConditionalPut ¶
func (r *Replica) ConditionalPut(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.ConditionalPutRequest) (roachpb.ConditionalPutResponse, error)
ConditionalPut sets the value for a specified key only if the expected value matches. If not, the return value contains the actual value.
func (*Replica) ContainsKey ¶
ContainsKey returns whether this range contains the specified key.
func (*Replica) ContainsKeyRange ¶
ContainsKeyRange returns whether this range contains the specified key range from start to end.
func (*Replica) Delete ¶
func (r *Replica) Delete(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.DeleteRequest) (roachpb.DeleteResponse, error)
Delete deletes the key and value specified by key.
func (*Replica) DeleteRange ¶
func (r *Replica) DeleteRange(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.DeleteRangeRequest) (roachpb.DeleteRangeResponse, error)
DeleteRange deletes the range of key/value pairs specified by start and end keys.
func (*Replica) Desc ¶
func (r *Replica) Desc() *roachpb.RangeDescriptor
Desc atomically returns the range's descriptor.
func (*Replica) Destroy ¶
func (r *Replica) Destroy(origDesc roachpb.RangeDescriptor) error
Destroy cleans up all data associated with this range, leaving a tombstone.
func (*Replica) EndTransaction ¶
func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.EndTransactionRequest) (roachpb.EndTransactionResponse, []roachpb.Intent, error)
EndTransaction either commits or aborts (rolls back) an extant transaction according to the args.Commit parameter. TODO(tschottdorf): return nil reply on any error. The error itself must be the authoritative source of information.
func (*Replica) Entries ¶
Entries implements the raft.Storage interface. Note that maxBytes is advisory and this method will always return at least one entry even if it exceeds maxBytes. Passing maxBytes equal to zero disables size checking. TODO(bdarnell): consider caching for recent entries, if rocksdb's builtin caching is insufficient.
func (*Replica) FirstIndex ¶
FirstIndex implements the raft.Storage interface.
func (*Replica) GC ¶
func (r *Replica) GC(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.GCRequest) (roachpb.GCResponse, error)
GC iterates through the list of keys to garbage collect specified in the arguments. MVCCGarbageCollect is invoked on each listed key along with the expiration timestamp. The GC metadata specified in the args is persisted after GC.
func (*Replica) Get ¶
func (r *Replica) Get(batch engine.Engine, h roachpb.Header, args roachpb.GetRequest) (roachpb.GetResponse, []roachpb.Intent, error)
Get returns the value for a specified key.
func (*Replica) GetGCMetadata ¶
func (r *Replica) GetGCMetadata() (*roachpb.GCMetadata, error)
GetGCMetadata reads the latest GC metadata for this range.
func (*Replica) GetLastVerificationTimestamp ¶
GetLastVerificationTimestamp reads the timestamp at which the range's data was last verified.
func (*Replica) GetMVCCStats ¶
GetMVCCStats returns a copy of the MVCC stats object for this range.
func (*Replica) GetMaxBytes ¶
GetMaxBytes atomically gets the range maximum byte limit.
func (*Replica) GetReplica ¶
func (r *Replica) GetReplica() *roachpb.ReplicaDescriptor
GetReplica returns the replica for this range from the range descriptor. Returns nil if the replica is not found.
func (*Replica) HeartbeatTxn ¶
func (r *Replica) HeartbeatTxn(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.HeartbeatTxnRequest) (roachpb.HeartbeatTxnResponse, error)
HeartbeatTxn updates the transaction status and heartbeat timestamp after receiving transaction heartbeat messages from coordinator. Returns the updated transaction.
func (*Replica) Increment ¶
func (r *Replica) Increment(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.IncrementRequest) (roachpb.IncrementResponse, error)
Increment increments the value (interpreted as varint64 encoded) and returns the newly incremented value (encoded as varint64). If no value exists for the key, zero is incremented.
func (*Replica) InitialState ¶
InitialState implements the raft.Storage interface.
func (*Replica) IsFirstRange ¶
IsFirstRange returns true if this is the first range.
func (*Replica) LeaderLease ¶
func (r *Replica) LeaderLease(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.LeaderLeaseRequest) (roachpb.LeaderLeaseResponse, error)
LeaderLease sets the leader lease for this range. The command fails only if the desired start timestamp collides with a previous lease. Otherwise, the start timestamp is wound back to right after the expiration of the previous lease (or zero). If this range replica is already the lease holder, the expiration will be extended or shortened as indicated. For a new lease, all duties required of the range leader are commenced, including clearing the command queue and timestamp cache.
func (*Replica) Merge ¶
func (r *Replica) Merge(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.MergeRequest) (roachpb.MergeResponse, error)
Merge is used to merge a value into an existing key. Merge is an efficient accumulation operation which is exposed by RocksDB, used by Cockroach for the efficient accumulation of certain values. Due to the difficulty of making these operations transactional, merges are not currently exposed directly to clients. Merged values are explicitly not MVCC data.
func (*Replica) PushTxn ¶
func (r *Replica) PushTxn(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.PushTxnRequest) (roachpb.PushTxnResponse, error)
PushTxn resolves conflicts between concurrent txns (or between a non-transactional reader or writer and a txn) in several ways depending on the statuses and priorities of the conflicting transactions. The PushTxn operation is invoked by a "pusher" (the writer trying to abort a conflicting txn or the reader trying to push a conflicting txn's commit timestamp forward), who attempts to resolve a conflict with a "pushee" (args.PushTxn -- the pushee txn whose intent(s) caused the conflict). A pusher is either transactional, in which case PushTxn is completely initialized, or not, in which case the PushTxn has only the priority set.
Txn already committed/aborted: If pushee txn is committed or aborted return success.
Txn Timeout: If pushee txn entry isn't present or its LastHeartbeat timestamp isn't set, use its as LastHeartbeat. If current time - LastHeartbeat > 2 * DefaultHeartbeatInterval, then the pushee txn should be either pushed forward, aborted, or confirmed not pending, depending on value of Request.PushType.
Old Txn Epoch: If persisted pushee txn entry has a newer Epoch than PushTxn.Epoch, return success, as older epoch may be removed.
Lower Txn Priority: If pushee txn has a lower priority than pusher, adjust pushee's persisted txn depending on value of args.PushType. If args.PushType is ABORT_TXN, set txn.Status to ABORTED, and priority to one less than the pusher's priority and return success. If args.PushType is PUSH_TIMESTAMP, set txn.Timestamp to just after PushTo.
Higher Txn Priority: If pushee txn has a higher priority than pusher, return TransactionPushError. Transaction will be retried with priority one less than the pushee's higher priority.
func (*Replica) Put ¶
func (r *Replica) Put(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.PutRequest) (roachpb.PutResponse, error)
Put sets the value for a specified key.
func (*Replica) RangeLookup ¶
func (r *Replica) RangeLookup(batch engine.Engine, h roachpb.Header, args roachpb.RangeLookupRequest) (roachpb.RangeLookupResponse, []roachpb.Intent, error)
RangeLookup is used to look up RangeDescriptors - a RangeDescriptor is a metadata structure which describes the key range and replica locations of a distinct range in the cluster.
RangeDescriptors are stored as values in the cockroach cluster's key-value store. However, they are always stored using special "Range Metadata keys", which are "ordinary" keys with a special prefix prepended. The Range Metadata Key for an ordinary key can be generated with the `keys.RangeMetaKey(key)` function. The RangeDescriptor for the range which contains a given key can be retrieved by generating its Range Metadata Key and dispatching it to RangeLookup.
Note that the Range Metadata Key sent to RangeLookup is NOT the key at which the desired RangeDescriptor is stored. Instead, this method returns the RangeDescriptor stored at the _lowest_ existing key which is _greater_ than the given key. The returned RangeDescriptor will thus contain the ordinary key which was originally used to generate the Range Metadata Key sent to RangeLookup.
The "Range Metadata Key" for a range is built by appending the end key of the range to the respective meta prefix.
Lookups for range metadata keys usually want to read inconsistently, but some callers need a consistent result; both are supported.
This method has an important optimization in the inconsistent case: instead of just returning the request RangeDescriptor, it also returns a slice of additional range descriptors immediately consecutive to the desired RangeDescriptor. This is intended to serve as a sort of caching pre-fetch, so that the requesting nodes can aggressively cache RangeDescriptors which are likely to be desired by their current workload. The Reverse flag specifies whether descriptors are prefetched in descending or ascending order.
func (*Replica) ReplicaDescriptor ¶
ReplicaDescriptor returns information about the given member of this replica's range.
func (*Replica) ResolveIntent ¶
func (r *Replica) ResolveIntent(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.ResolveIntentRequest) (roachpb.ResolveIntentResponse, error)
ResolveIntent resolves a write intent from the specified key according to the status of the transaction which created it.
func (*Replica) ResolveIntentRange ¶
func (r *Replica) ResolveIntentRange(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.ResolveIntentRangeRequest) (roachpb.ResolveIntentRangeResponse, error)
ResolveIntentRange resolves write intents in the specified key range according to the status of the transaction which created it.
func (*Replica) ReverseScan ¶
func (r *Replica) ReverseScan(batch engine.Engine, h roachpb.Header, args roachpb.ReverseScanRequest) (roachpb.ReverseScanResponse, []roachpb.Intent, error)
ReverseScan scans the key range specified by start key through end key in descending order up to some maximum number of results.
func (*Replica) Scan ¶
func (r *Replica) Scan(batch engine.Engine, h roachpb.Header, args roachpb.ScanRequest) (roachpb.ScanResponse, []roachpb.Intent, error)
Scan scans the key range specified by start key through end key in ascending order up to some maximum number of results.
func (*Replica) Send ¶
func (r *Replica) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
Send adds a command for execution on this range. The command's affected keys are verified to be contained within the range and the range's leadership is confirmed. The command is then dispatched either along the read-only execution path or the read-write Raft command queue. TODO(tschottdorf): use BatchRequest w/o pointer receiver.
func (*Replica) SetHardState ¶
SetHardState implements the multiraft.WriteableGroupStorage interface.
func (*Replica) SetLastVerificationTimestamp ¶
SetLastVerificationTimestamp writes the timestamp at which the range's data was last verified.
func (*Replica) SetMaxBytes ¶
SetMaxBytes atomically sets the maximum byte limit before split. This value is cached by the range for efficiency.
func (*Replica) TruncateLog ¶
func (r *Replica) TruncateLog(batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.TruncateLogRequest) (roachpb.TruncateLogResponse, error)
TruncateLog discards a prefix of the raft log. Truncating part of a log that has already been truncated has no effect. If this range is not the one specified within the request body, the request will also be ignored.
type ReplicationStatusEvent ¶
type ReplicationStatusEvent struct { StoreID roachpb.StoreID // Per-range availability information, which is currently computed by // periodically polling the ranges of each store. // TODO(mrtracy): See if this information could be computed incrementally // from other events. LeaderRangeCount int32 ReplicatedRangeCount int32 AvailableRangeCount int32 }
ReplicationStatusEvent contains statistics on the replication status of the ranges in the store.
Because these statistics cannot currently be computed from other events, this event should be periodically broadcast by the store independently of other operations.
type SequenceCache ¶
type SequenceCache struct {
// contains filtered or unexported fields
}
The SequenceCache provides idempotence for request retries. Each transactional request to a range specifies an Transaction ID and sequence number which uniquely identifies a client command. After commands have been replicated via Raft, they are executed against the state machine and the results are stored in the SequenceCache.
The SequenceCache stores responses in the underlying engine, using keys derived from Range ID, txn ID and sequence number.
A SequenceCache is not thread safe. Access to it is serialized through Raft.
func NewSequenceCache ¶
func NewSequenceCache(rangeID roachpb.RangeID) *SequenceCache
NewSequenceCache returns a new sequence cache. Every range replica maintains a sequence cache, not just the leader.
func (*SequenceCache) ClearData ¶
func (sc *SequenceCache) ClearData(e engine.Engine) error
ClearData removes all persisted items stored in the cache.
func (*SequenceCache) CopyFrom ¶
CopyFrom copies all the persisted results from the originRangeID sequence cache into this one. Note that the cache will not be locked while copying is in progress. Failures decoding individual entries return an error. The copy is done directly using the engine instead of interpreting values through MVCC for efficiency.
func (*SequenceCache) CopyInto ¶
CopyInto copies all the results from this sequence cache into the destRangeID sequence cache. Failures decoding individual cache entries return an error.
func (*SequenceCache) Get ¶
func (sc *SequenceCache) Get(e engine.Engine, id []byte, dest *roachpb.SequenceCacheEntry) (uint32, uint32, error)
Get looks up the latest sequence number recorded for this transaction ID. The latest entry is that with the highest epoch (and then, highest sequence). On a miss, zero is returned for both. If an entry is found and a SequenceCacheEntry is provided, it is populated from the found value.
func (*SequenceCache) GetAllTransactionID ¶
func (sc *SequenceCache) GetAllTransactionID(e engine.Engine, id []byte) ([]roachpb.KeyValue, error)
GetAllTransactionID returns all the key-value pairs for the given transaction ID from the engine.
func (*SequenceCache) Iterate ¶
func (sc *SequenceCache) Iterate(e engine.Engine, f func([]byte, []byte, roachpb.SequenceCacheEntry))
Iterate walks through the sequence cache, invoking the given callback for each unmarshaled entry with the key, the transaction ID and the decoded entry.
type SplitRangeEvent ¶
type SplitRangeEvent struct { StoreID roachpb.StoreID Original UpdateRangeEvent New RegisterRangeEvent }
SplitRangeEvent occurs whenever a range is split in two. This Event actually contains two other events: an UpdateRangeEvent for the Range which originally existed, and a RegisterRangeEvent for the range created via the split.
type StartStoreEvent ¶
StartStoreEvent occurs whenever a store is initially started.
type Store ¶
type Store struct { Ident roachpb.StoreIdent // contains filtered or unexported fields }
A Store maintains a map of ranges by start key. A Store corresponds to one physical device.
func NewStore ¶
func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescriptor) *Store
NewStore returns a new instance of a store.
func (*Store) AddReplicaTest ¶
AddReplicaTest adds the replica to the store's replica map and to the sorted replicasByKey slice. To be used only by unittests.
func (*Store) AppliedIndex ¶
AppliedIndex implements the multiraft.Storage interface. The caller must hold the store's lock.
func (*Store) Attrs ¶
func (s *Store) Attrs() roachpb.Attributes
Attrs returns the attributes of the underlying store.
func (*Store) Bootstrap ¶
Bootstrap writes a new store ident to the underlying engine. To ensure that no crufty data already exists in the engine, it scans the engine contents before writing the new store ident. The engine should be completely empty. It returns an error if called on a non-empty engine.
func (*Store) BootstrapRange ¶
BootstrapRange creates the first range in the cluster and manually writes it to the store. Default range addressing records are created for meta1 and meta2. Default configurations for zones are created. All configs are specified for the empty key prefix, meaning they apply to the entire database. The zone requires three replicas with no other specifications. It also adds the range tree and the root node, the first range, to it. The 'initialValues' are written as well after each value's checksum is initialized.
func (*Store) CanApplySnapshot ¶
CanApplySnapshot implements the multiraft.Storage interface. The caller must hold the store's lock.
func (*Store) Capacity ¶
func (s *Store) Capacity() (roachpb.StoreCapacity, error)
Capacity returns the capacity of the underlying storage engine.
func (*Store) Context ¶
Context returns a base context to pass along with commands being executed, derived from the supplied context (which is allowed to be nil).
func (*Store) Descriptor ¶
func (s *Store) Descriptor() (*roachpb.StoreDescriptor, error)
Descriptor returns a StoreDescriptor including current store capacity information.
func (*Store) DisableRaftLogQueue ¶
DisableRaftLogQueue disables or enables the raft log queue. Exposed only for testing.
func (*Store) DisableReplicaGCQueue ¶
DisableReplicaGCQueue disables or enables the replica GC queue. Exposed only for testing.
func (*Store) ForceRaftLogScanAndProcess ¶
ForceRaftLogScanAndProcess iterates over all ranges and enqueues any that need their raft logs truncated and then process each of them. Exposed only for testing.
func (*Store) ForceReplicaGCScan ¶
ForceReplicaGCScan iterates over all ranges and enqueues any that may need to be GC'd. Exposed only for testing.
func (*Store) ForceReplicationScan ¶
ForceReplicationScan iterates over all ranges and enqueues any that need to be replicated. Exposed only for testing.
func (*Store) GetReplica ¶
GetReplica fetches a replica by Range ID. Returns an error if no replica is found.
func (*Store) GetStatus ¶
func (s *Store) GetStatus() (*StoreStatus, error)
GetStatus fetches the latest store status from the stored value on the cluster. Returns nil if the scanner has not yet run. The scanner runs once every ctx.ScanInterval.
func (*Store) GossipStore ¶
func (s *Store) GossipStore()
GossipStore broadcasts the store on the gossip network.
func (*Store) GroupStorage ¶
func (s *Store) GroupStorage(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (multiraft.WriteableGroupStorage, error)
GroupStorage implements the multiraft.Storage interface. The caller must hold the store's lock.
func (*Store) LookupReplica ¶
LookupReplica looks up a replica via binary search over the "replicasByKey" btree. Returns nil if no replica is found for specified key range. Note that the specified keys are transformed using Key.Address() to ensure we lookup replicas correctly for local keys. When end is nil, a replica that contains start is looked up.
func (*Store) MergeRange ¶
func (s *Store) MergeRange(subsumingRng *Replica, updatedEndKey roachpb.RKey, subsumedRangeID roachpb.RangeID) error
MergeRange expands the subsuming range to absorb the subsumed range. This merge operation will fail if the two ranges are not collocated on the same store. Must be called from the processRaft goroutine.
func (*Store) NewRangeDescriptor ¶
func (s *Store) NewRangeDescriptor(start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor) (*roachpb.RangeDescriptor, error)
NewRangeDescriptor creates a new descriptor based on start and end keys and the supplied roachpb.Replicas slice. It allocates new replica IDs to fill out the supplied replicas.
func (*Store) NewSnapshot ¶
NewSnapshot creates a new snapshot engine.
func (*Store) ProposeRaftCommand ¶
func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd roachpb.RaftCommand) <-chan error
ProposeRaftCommand submits a command to raft. The command is processed asynchronously and an error or nil will be written to the returned channel when it is committed or aborted (but note that committed does mean that it has been applied to the range yet).
func (*Store) PublishStatus ¶
PublishStatus publishes periodically computed status events to the store's events feed. This method itself should be periodically called by some external mechanism.
func (*Store) RaftLocker ¶
RaftLocker implements the multiraft.Storage interface.
func (*Store) RaftStatus ¶
RaftStatus returns the current raft status of the given range.
func (*Store) RemoveReplica ¶
func (s *Store) RemoveReplica(rep *Replica, origDesc roachpb.RangeDescriptor) error
RemoveReplica removes the replica from the store's replica map and from the sorted replicasByKey btree. The version of the replica descriptor that was used to make the removal decision is passed in, and the removal is aborted if the replica ID has changed since then.
func (*Store) ReplicaCount ¶
ReplicaCount returns the number of replicas contained by this store.
func (*Store) ReplicaDescriptor ¶
func (s *Store) ReplicaDescriptor(groupID roachpb.RangeID, replicaID roachpb.ReplicaID) (roachpb.ReplicaDescriptor, error)
ReplicaDescriptor implements the multiraft.Storage interface. The caller must hold the store's lock.
func (*Store) ReplicaIDForStore ¶
func (s *Store) ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
ReplicaIDForStore implements the multiraft.Storage interface. The caller must hold the store's lock.
func (*Store) ReplicasFromSnapshot ¶
ReplicasFromSnapshot implements the multiraft.Storage interface.
func (*Store) Send ¶
func (s *Store) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
Send fetches a range based on the header's replica, assembles method, args & reply into a Raft Cmd struct and executes the command using the fetched range.
func (*Store) SetRangeRetryOptions ¶
SetRangeRetryOptions sets the retry options used for this store. For unittests only.
func (*Store) SplitRange ¶
SplitRange shortens the original range to accommodate the new range. The new range is added to the ranges map and the rangesByKey btree.
func (*Store) StartedAt ¶
StartedAt returns the timestamp at which the store was most recently started.
func (*Store) WaitForInit ¶
func (s *Store) WaitForInit()
WaitForInit waits for any asynchronous processes begun in Start() to complete their initialization. In particular, this includes gossiping. In some cases this may block until the range GC queue has completed its scan. Only for testing.
type StoreContext ¶
type StoreContext struct { Clock *hlc.Clock DB *client.DB Gossip *gossip.Gossip StorePool *StorePool Transport multiraft.Transport // RangeRetryOptions are the retry options when retryable errors are // encountered sending commands to ranges. RangeRetryOptions retry.Options // RaftTickInterval is the resolution of the Raft timer; other raft timeouts // are defined in terms of multiples of this value. RaftTickInterval time.Duration // RaftHeartbeatIntervalTicks is the number of ticks that pass between heartbeats. RaftHeartbeatIntervalTicks int // RaftElectionTimeoutTicks is the number of ticks that must pass before a follower // considers a leader to have failed and calls a new election. Should be significantly // higher than RaftHeartbeatIntervalTicks. The raft paper recommends a value of 150ms // for local networks. RaftElectionTimeoutTicks int // ScanInterval is the default value for the scan interval ScanInterval time.Duration // ScanMaxIdleTime is the maximum time the scanner will be idle between ranges. // If enabled (> 0), the scanner may complete in less than ScanInterval for small // stores. ScanMaxIdleTime time.Duration // TimeUntilStoreDead is the time after which if there is no new gossiped // information about a store, it can be considered dead. TimeUntilStoreDead time.Duration // AllocatorOptions configures how the store will attempt to rebalance its // replicas to other stores. AllocatorOptions AllocatorOptions // EventFeed is a feed to which this store will publish events. EventFeed *util.Feed // Tracer is a request tracer. Tracer *tracer.Tracer // ScannerStopper is used to shut down the background scanner (for tests). // If nil, defaults to the store's own stopper. ScannerStopper *stop.Stopper }
A StoreContext encompasses the auxiliary objects and configuration required to create a store. All fields holding a pointer or an interface are required to create a store; the rest will have sane defaults set if omitted.
func (*StoreContext) Valid ¶
func (sc *StoreContext) Valid() bool
Valid returns true if the StoreContext is populated correctly. We don't check for Gossip and DB since some of our tests pass that as nil.
type StoreEventFeed ¶
type StoreEventFeed struct {
// contains filtered or unexported fields
}
StoreEventFeed is a helper structure which publishes store-specific events to a util.Feed. The target feed may be shared by multiple StoreEventFeeds. If the target feed is nil, event methods become no-ops.
func NewStoreEventFeed ¶
func NewStoreEventFeed(id roachpb.StoreID, feed *util.Feed) StoreEventFeed
NewStoreEventFeed creates a new StoreEventFeed which publishes events for a specific store to the supplied feed.
type StoreEventListener ¶
type StoreEventListener interface { OnRegisterRange(event *RegisterRangeEvent) OnUpdateRange(event *UpdateRangeEvent) OnRemoveRange(event *RemoveRangeEvent) OnSplitRange(event *SplitRangeEvent) OnMergeRange(event *MergeRangeEvent) OnStartStore(event *StartStoreEvent) OnBeginScanRanges(event *BeginScanRangesEvent) OnEndScanRanges(event *EndScanRangesEvent) OnStoreStatus(event *StoreStatusEvent) OnReplicationStatus(event *ReplicationStatusEvent) }
StoreEventListener is an interface that can be implemented by objects which listen for events published by stores.
type StoreList ¶
type StoreList struct {
// contains filtered or unexported fields
}
StoreList holds a list of store descriptors and associated count and used stats for those stores.
type StorePool ¶
type StorePool struct {
// contains filtered or unexported fields
}
StorePool maintains a list of all known stores in the cluster and information on their health.
type StoreStatus ¶
type StoreStatus struct { Desc cockroach_roachpb.StoreDescriptor `protobuf:"bytes,1,opt,name=desc" json:"desc"` NodeID github_com_cockroachdb_cockroach_roachpb.NodeID `protobuf:"varint,2,opt,name=node_id,casttype=github.com/cockroachdb/cockroach/roachpb.NodeID" json:"node_id"` RangeCount int32 `protobuf:"varint,3,opt,name=range_count" json:"range_count"` StartedAt int64 `protobuf:"varint,4,opt,name=started_at" json:"started_at"` UpdatedAt int64 `protobuf:"varint,5,opt,name=updated_at" json:"updated_at"` Stats cockroach_storage_engine.MVCCStats `protobuf:"bytes,6,opt,name=stats" json:"stats"` LeaderRangeCount int32 `protobuf:"varint,7,opt,name=leader_range_count" json:"leader_range_count"` ReplicatedRangeCount int32 `protobuf:"varint,8,opt,name=replicated_range_count" json:"replicated_range_count"` AvailableRangeCount int32 `protobuf:"varint,9,opt,name=available_range_count" json:"available_range_count"` }
StoreStatus contains the stats needed to calculate the current status of a store.
func (*StoreStatus) Marshal ¶
func (m *StoreStatus) Marshal() (data []byte, err error)
func (*StoreStatus) ProtoMessage ¶
func (*StoreStatus) ProtoMessage()
func (*StoreStatus) Reset ¶
func (m *StoreStatus) Reset()
func (*StoreStatus) Size ¶
func (m *StoreStatus) Size() (n int)
func (*StoreStatus) String ¶
func (m *StoreStatus) String() string
func (*StoreStatus) Unmarshal ¶
func (m *StoreStatus) Unmarshal(data []byte) error
type StoreStatusEvent ¶
type StoreStatusEvent struct {
Desc *roachpb.StoreDescriptor
}
StoreStatusEvent contains the current descriptor for the given store.
Because the descriptor contains information that cannot currently be computed from other events, this event should be periodically broadcast by the store independently of other operations.
type Stores ¶
type Stores struct {
// contains filtered or unexported fields
}
A Stores provides methods to access a collection of local stores.
func NewStores ¶
func NewStores() *Stores
NewStores returns a local-only sender which directly accesses a collection of stores.
func (*Stores) FirstRange ¶
func (ls *Stores) FirstRange() (*roachpb.RangeDescriptor, error)
FirstRange implements the RangeDescriptorDB interface. It returns the range descriptor which contains KeyMin.
func (*Stores) GetStoreCount ¶
GetStoreCount returns the number of stores this node is exporting.
func (*Stores) RangeLookup ¶
func (ls *Stores) RangeLookup(key roachpb.RKey, _ *roachpb.RangeDescriptor, considerIntents, useReverseScan bool) ([]roachpb.RangeDescriptor, error)
RangeLookup implements the RangeDescriptorDB interface. It looks up the descriptors for the given (meta) key.
func (*Stores) RemoveStore ¶
RemoveStore removes the specified store from the store map.
func (*Stores) Send ¶
func (ls *Stores) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
Send implements the client.Sender interface. The store is looked up from the store map if specified by the request; otherwise, the command is being executed locally, and the replica is determined via lookup through each store's LookupRange method. The latter path is taken only by unit tests.
type TimestampCache ¶
type TimestampCache struct {
// contains filtered or unexported fields
}
A TimestampCache maintains an interval tree FIFO cache of keys or key ranges and the timestamps at which they were most recently read or written. If a timestamp was read or written by a transaction, the txn ID is stored with the timestamp to avoid advancing timestamps on successive requests from the same transaction.
The cache also maintains a low-water mark which is the most recently evicted entry's timestamp. This value always ratchets with monotonic increases. The low water mark is initialized to the current system time plus the maximum clock offset.
func NewTimestampCache ¶
func NewTimestampCache(clock *hlc.Clock) *TimestampCache
NewTimestampCache returns a new timestamp cache with supplied hybrid clock.
func (*TimestampCache) Add ¶
func (tc *TimestampCache) Add(start, end roachpb.Key, timestamp roachpb.Timestamp, txnID []byte, readOnly bool)
Add the specified timestamp to the cache as covering the range of keys from start to end. If end is nil, the range covers the start key only. txnID is nil for no transaction. readOnly specifies whether the command adding this timestamp was read-only or not.
func (*TimestampCache) Clear ¶
func (tc *TimestampCache) Clear(clock *hlc.Clock)
Clear clears the cache and resets the low water mark to the current time plus the maximum clock offset.
func (*TimestampCache) GetMax ¶
func (tc *TimestampCache) GetMax(start, end roachpb.Key, txnID []byte) (roachpb.Timestamp, roachpb.Timestamp)
GetMax returns the maximum read and write timestamps which overlap the interval spanning from start to end. Cached timestamps matching the specified txnID are not considered. If no part of the specified range is overlapped by timestamps in the cache, the low water timestamp is returned for both read and write timestamps.
The txn ID prevents restarts with a pattern like: read("a"), write("a"). The read adds a timestamp for "a". Then the write (for the same transaction) would get that as the max timestamp and be forced to increment it. This allows timestamps from the same txn to be ignored.
func (*TimestampCache) MergeInto ¶
func (tc *TimestampCache) MergeInto(dest *TimestampCache, clear bool)
MergeInto merges all entries from this timestamp cache into the dest timestamp cache. The clear parameter, if true, copies the values of lowWater and latest and clears the destination cache before merging in the source.
func (*TimestampCache) SetLowWater ¶
func (tc *TimestampCache) SetLowWater(lowWater roachpb.Timestamp)
SetLowWater sets the cache's low water mark, which is the minimum value the cache will return from calls to GetMax().
type UpdateRangeEvent ¶
type UpdateRangeEvent struct { StoreID roachpb.StoreID Desc *roachpb.RangeDescriptor Stats engine.MVCCStats Method roachpb.Method Delta engine.MVCCStats }
UpdateRangeEvent occurs whenever a Range is modified. This structure includes the basic range information, but also includes a second set of MVCCStats containing the delta from the Range's previous stats. If the update did not modify any statistics, this delta may be nil.
Source Files ¶
- addressing.go
- allocator.go
- balancer.go
- command_queue.go
- doc.go
- feed.go
- gc_queue.go
- id_alloc.go
- queue.go
- raft_log_queue.go
- range_tree.go
- replica.go
- replica_command.go
- replica_data_iter.go
- replica_gc_queue.go
- replica_raftstorage.go
- replicate_queue.go
- scanner.go
- sequence_cache.go
- split_queue.go
- stats.go
- status.pb.go
- store.go
- store_pool.go
- stores.go
- timestamp_cache.go
- verify_queue.go
Directories ¶
Path | Synopsis |
---|---|
Package engine provides low-level storage.
|
Package engine provides low-level storage. |