xact

package
v1.3.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2022 License: MIT Imports: 20 Imported by: 1

README

This is the top eXtended Action (xaction) directory containing much of the common functionality and interfaces used by the rest of the code.

In addition, it contains subdirectories:

  • xreg - xaction registry
  • xs - concrete named xactions, e.g. apc.ActRebalance, apc.ActPromote, apc.ActSummaryBck and other enumerated kinds.

For all supported xactions, their kinds and static properties, see xact.Table.

Xaction kinds are generally consistent with the API constants from api/apc/const.go.

Extended Actions (xactions)

Batch operations that may take many seconds (minutes, hours) to execute are called eXtended actions or xactions.

Xactions run asynchronously, have one of the enumerated kinds, start/stop times, and xaction-specific statistics. Xactions start running based on a wide variety of runtime conditions that include:

  • periodic (defined by a configured interval of time)
  • resource utilization (e.g., usable capacity falling below configured watermark)
  • certain type of workload (e.g., PUT into a mirrored or erasure-coded bucket)
  • user request (e.g., to reduce the number of local object copies in a given bucket)
  • adding or removing storage targets (the events that trigger cluster-wide rebalancing)
  • adding or removing local disks (the events that cause resilver to start moving stored content between mountpaths - see Managing mountpaths)
  • and more...

Further, to reduce congestion and minimize interference with user-generated workload, extended actions (self-)throttle themselves based on configurable watermarks. The latter include disk_util_low_wm and disk_util_high_wm (see configuration). Roughly speaking, the idea is that when local disk utilization falls below the low watermark (disk_util_low_wm) extended actions that utilize local storage can run at full throttle. And vice versa.

The amount of throttling that a given xaction imposes on itself is always defined by a combination of dynamic factors. To give concrete examples, an extended action that runs LRU evictions performs its "balancing act" by taking into account the remaining storage capacity and the current utilization of the local filesystems. The mirroring (xaction) takes into account congestion on its communication channel that callers use for posting requests to create local replicas.


NOTE (Dec 2021): rest of this document is somewhat outdated and must be revisited. For the most recently updated information on running and monitoring xactions, please see:


Supported extended actions are enumerated in the user-facing API and include:

  • cluster-wide rebalancing (denoted as ActGlobalReb in the API) that gets triggered when storage targets join or leave the cluster
  • LRU-based cache eviction (see LRU) that depends on the remaining free capacity and configuration
  • prefetching batches of objects (or arbitrary size) from the Cloud (see List/Range Operations)
  • consensus voting (when conducting new leader election)
  • erasure-encoding objects in a EC-configured bucket (see Erasure coding)
  • creating additional local replicas, and reducing number of object replicas in a given locally-mirrored bucket (see Storage Services)
  • and more...

There are different actions that may be taken upon xaction. Actions include stats, start and stop. List of supported actions can be found in the API

Xaction requests are generic for all xactions, but responses from each xaction are different. See below. The request looks as follows:

  1. Single target request:

    $ curl -i -X GET  -H 'Content-Type: application/json' -d '{"action": "actiontype", "name": "xactionname", "value":{"bucket":"bucketname"}}' 'http://T/v1/daemon?what=xaction'
    

    To simplify the logic, result is always an array, even if there's only one element in the result

  2. Proxy request, which executes a request on all targets within the cluster, and responds with list of targets' responses:

    $ curl -i -X GET  -H 'Content-Type: application/json' -d '{"action": "actiontype", "name": "xactionname", "value":{"bucket":"bucketname"}}' 'http://G/v1/cluster?what=xaction'
    

    Response of a query to proxy is a map of daemonID -> target's response. If any of targets responded with error status code, the proxy's response will result in the same error response.

Start and Stop

For a successful request, the response only contains the HTTP status code. If the request was sent to the proxy and all targets responded with a successful HTTP code, the proxy would respond with the successful HTTP code. The response body should be omitted.

For an unsuccessful request, the target's response contains the error code and error message. If the request was sent to proxy and at least one of targets responded with an error code, the proxy will respond with the same error code and error message.

As always, G above (and throughout this entire README) serves as a placeholder for the real gateway's hostname/IP address and T serves for placeholder for target's hostname/IP address. More information in notation section.

The corresponding RESTful API includes support for querying all xactions including global-rebalancing and prefetch operations.

Stats

Stats request results in list of requested xactions. Statistics of each xaction share a common base format which looks as follow:

[
   {
      "id":1,
      "kind":"ec-get",
      "bucket":"test",
      "startTime":"2019-04-15T12:40:18.721697505-07:00",
      "endTime":"0001-01-01T00:00:00Z",
      "status":"InProgress"
   },
   {
      "id":2,
      "kind":"ec-put",
      "bucket":"test",
      "startTime":"2019-04-15T12:40:18.721723865-07:00",
      "endTime":"0001-01-01T00:00:00Z",
      "status":"InProgress"
   }
]

Any xaction can have additional fields, which are included in additional field called "ext"

Example rebalance stats response:

[
    {
      "id": 3,
      "kind": "rebalance",
      "bucket": "",
      "start_time": "2019-04-15T13:38:51.556388821-07:00",
      "end_time": "0001-01-01T00:00:00Z",
      "status": "InProgress",
      "count": 0,
      "ext": {
        "tx.n": 0,
        "tx.size": 0,
        "rx.n": 0,
        "rx.size": 0
      }
    }
]

If flag --all is provided, stats command will display old, finished xactions, along with currently running ones. If --all is not set (default), only the most recent xactions will be displayed, for each bucket, kind or (bucket, kind)

References

For xaction-related CLI documentation and examples, supported multi-object (batch) operations, and more, please see:

Documentation

Overview

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package xact provides core functionality for the AIStore eXtended Actions (xactions).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Index

Constants

View Source
const (
	ScopeG   = "global"
	ScopeT   = "target"
	ScopeBck = "bucket"
	ScopeO   = "other"
)

Variables

View Source
var IncFinished func()
View Source
var Table = map[string]Descriptor{

	apc.ActLRU:          {Scope: ScopeG, Startable: true, Mountpath: true},
	apc.ActStoreCleanup: {Scope: ScopeG, Startable: true, Mountpath: true},
	apc.ActElection:     {Scope: ScopeG, Startable: false},
	apc.ActResilver:     {Scope: ScopeT, Startable: true, Mountpath: true, Resilver: true},
	apc.ActRebalance:    {Scope: ScopeG, Startable: true, Metasync: true, Owned: false, Mountpath: true, Rebalance: true},
	apc.ActDownload:     {Scope: ScopeG, Startable: false, Mountpath: true},
	apc.ActETLInline:    {Scope: ScopeG, Startable: false, Mountpath: false},

	apc.ActECGet:           {Scope: ScopeBck, Startable: false},
	apc.ActECPut:           {Scope: ScopeBck, Startable: false, Mountpath: true, RefreshCap: true},
	apc.ActECRespond:       {Scope: ScopeBck, Startable: false},
	apc.ActMakeNCopies:     {Scope: ScopeBck, Access: apc.AccessRW, Startable: true, Metasync: true, Owned: false, RefreshCap: true, Mountpath: true},
	apc.ActPutCopies:       {Scope: ScopeBck, Startable: false, Mountpath: true, RefreshCap: true},
	apc.ActArchive:         {Scope: ScopeBck, Startable: false, RefreshCap: true},
	apc.ActCopyObjects:     {Scope: ScopeBck, Startable: false, RefreshCap: true},
	apc.ActETLObjects:      {Scope: ScopeBck, Startable: false, RefreshCap: true},
	apc.ActMoveBck:         {Scope: ScopeBck, Access: apc.AceMoveBucket, Startable: false, Metasync: true, Owned: false, Mountpath: true, Rebalance: true, MassiveBck: true},
	apc.ActCopyBck:         {Scope: ScopeBck, Access: apc.AccessRW, Startable: false, Metasync: true, Owned: false, RefreshCap: true, Mountpath: true, MassiveBck: true},
	apc.ActETLBck:          {Scope: ScopeBck, Access: apc.AccessRW, Startable: false, Metasync: true, Owned: false, RefreshCap: true, Mountpath: true, MassiveBck: true},
	apc.ActECEncode:        {Scope: ScopeBck, Access: apc.AccessRW, Startable: true, Metasync: true, Owned: false, RefreshCap: true, Mountpath: true, MassiveBck: true},
	apc.ActEvictObjects:    {Scope: ScopeBck, Access: apc.AceObjDELETE, Startable: false, RefreshCap: true, Mountpath: true},
	apc.ActDeleteObjects:   {Scope: ScopeBck, Access: apc.AceObjDELETE, Startable: false, RefreshCap: true, Mountpath: true},
	apc.ActLoadLomCache:    {Scope: ScopeBck, Startable: true, Mountpath: true},
	apc.ActPrefetchObjects: {Scope: ScopeBck, Access: apc.AccessRW, RefreshCap: true, Startable: true},
	apc.ActPromote:         {Scope: ScopeBck, Access: apc.AcePromote, Startable: false, RefreshCap: true},
	apc.ActList:            {Scope: ScopeBck, Access: apc.AceObjLIST, Startable: false, Metasync: false, Owned: true},
	apc.ActInvalListCache:  {Scope: ScopeBck, Access: apc.AceObjLIST, Startable: false},

	apc.ActSummaryBck: {Scope: ScopeO, Access: apc.AceObjLIST | apc.AceBckHEAD, Startable: false, Metasync: false, Owned: true, Mountpath: true},
}

Table is a static Kind=>[Xaction Descriptor] map that contains static properties of a given xaction type (aka `kind`), such as: `Startable`, `Owned`, etc.

Functions

func CompareRebIDs

func CompareRebIDs(someID, fltID string) int

func GoRunW

func GoRunW(xctn cluster.Xact)

common helper to go-run and wait until it actually starts running

func IsBckScope

func IsBckScope(kind string) bool

func IsMountpath

func IsMountpath(kind string) bool

func IsValidKind

func IsValidKind(kind string) bool

func IsValidRebID

func IsValidRebID(id string) bool

func RebID2S

func RebID2S(id int64) string

func RefcntQuiCB

func RefcntQuiCB(refc *atomic.Int32, maxTimeout, totalSoFar time.Duration) cluster.QuiRes

common ref-counted quiescence

func S2RebID

func S2RebID(id string) (int64, error)

Types

type Base

type Base struct {
	// contains filtered or unexported fields
}

func (*Base) Abort

func (xctn *Base) Abort(err error) (ok bool)

func (*Base) AbortErr

func (xctn *Base) AbortErr() (err error)

func (*Base) AbortedAfter

func (xctn *Base) AbortedAfter(d time.Duration) (err error)

func (*Base) AddNotif

func (xctn *Base) AddNotif(n cluster.Notif)

func (*Base) Bck

func (xctn *Base) Bck() *cluster.Bck

func (*Base) Bytes

func (xctn *Base) Bytes() int64

func (*Base) ChanAbort

func (xctn *Base) ChanAbort() <-chan error

func (*Base) EndTime

func (xctn *Base) EndTime() time.Time

func (*Base) Finish

func (xctn *Base) Finish(err error)

atomically set end-time

func (*Base) Finished

func (xctn *Base) Finished() bool

func (*Base) FromTo

func (*Base) FromTo() (*cluster.Bck, *cluster.Bck)

func (*Base) GetStats

func (xctn *Base) GetStats() (stats *Stats)

func (*Base) ID

func (xctn *Base) ID() string

func (*Base) InBytes

func (xctn *Base) InBytes() int64

func (*Base) InObjs

func (xctn *Base) InObjs() int64

base stats: receive

func (*Base) InObjsAdd

func (xctn *Base) InObjsAdd(cnt int, size int64)

func (*Base) InitBase

func (xctn *Base) InitBase(id, kind string, bck *cluster.Bck)

func (*Base) IsAborted

func (xctn *Base) IsAborted() bool

func (*Base) Kind

func (xctn *Base) Kind() string

func (*Base) Name

func (xctn *Base) Name() (s string)

func (*Base) Notif

func (xctn *Base) Notif() (n cluster.Notif)

func (*Base) Objs

func (xctn *Base) Objs() int64

base stats: locally processed

func (*Base) ObjsAdd

func (xctn *Base) ObjsAdd(cnt int, size int64)

func (*Base) OutBytes

func (xctn *Base) OutBytes() int64

func (*Base) OutObjs

func (xctn *Base) OutObjs() int64

base stats: transmit

func (*Base) OutObjsAdd

func (xctn *Base) OutObjsAdd(cnt int, size int64)

func (*Base) Quiesce

func (xctn *Base) Quiesce(d time.Duration, cb cluster.QuiCB) cluster.QuiRes

count all the way to duration; reset and adjust every time activity is detected

func (*Base) Result

func (*Base) Result() (any, error)

func (*Base) Running

func (xctn *Base) Running() (yes bool)

func (*Base) Snap

func (xctn *Base) Snap() cluster.XactSnap

func (*Base) StartTime

func (xctn *Base) StartTime() time.Time

func (*Base) String

func (xctn *Base) String() string

func (*Base) ToSnap

func (xctn *Base) ToSnap(snap *Snap)

func (*Base) ToStats

func (xctn *Base) ToStats(stats *Stats)

type BaseDemandStatsExt

type BaseDemandStatsExt struct {
	IsIdle bool `json:"is_idle"`
}

type BckJog

type BckJog struct {
	Base
	// contains filtered or unexported fields
}

func (*BckJog) Init

func (r *BckJog) Init(id, kind string, bck *cluster.Bck, opts *mpather.JoggerGroupOpts)

func (*BckJog) Run

func (r *BckJog) Run()

func (*BckJog) Target

func (r *BckJog) Target() cluster.Target

func (*BckJog) Wait

func (r *BckJog) Wait() error

type Demand

type Demand interface {
	cluster.Xact
	IdleTimer() <-chan struct{}
	IncPending()
	DecPending()
	SubPending(n int)
}

xaction that self-terminates after staying idle for a while with an added capability to renew itself and ref-count its pending work

type DemandBase

type DemandBase struct {
	Base
	// contains filtered or unexported fields
}

func (*DemandBase) Abort

func (r *DemandBase) Abort(err error) (ok bool)

func (*DemandBase) DecPending

func (r *DemandBase) DecPending()

func (*DemandBase) ExtSnap

func (r *DemandBase) ExtSnap() *SnapExt

func (*DemandBase) IdleTimer

func (r *DemandBase) IdleTimer() <-chan struct{}

func (*DemandBase) IncPending

func (r *DemandBase) IncPending()

func (*DemandBase) Init

func (r *DemandBase) Init(uuid, kind string, bck *cluster.Bck, idle time.Duration) (xdb *DemandBase)

func (*DemandBase) Pending

func (r *DemandBase) Pending() (cnt int64)

func (*DemandBase) Snap

func (r *DemandBase) Snap() cluster.XactSnap

func (*DemandBase) Stop

func (r *DemandBase) Stop()

func (*DemandBase) SubPending

func (r *DemandBase) SubPending(n int)

type Descriptor

type Descriptor struct {
	Scope      string          // ScopeG (global), etc. - the enum above
	Access     apc.AccessAttrs // Access required by xctn (see: apc.Access*)
	Startable  bool            // determines if this xaction can be started via API
	Metasync   bool            // true: changes and metasyncs cluster-wide meta
	Owned      bool            // true: JTX-owned
	RefreshCap bool            // true: refresh capacity stats upon completion
	Mountpath  bool            // true: mountpath-traversing (jogger-based) xaction
	// see xreg for "limited coexistence"
	Rebalance  bool // moves data between nodes
	Resilver   bool // moves data between mountpaths
	MassiveBck bool // massive data copying (transforming, encoding) operation on a bucket
}

type Marked

type Marked struct {
	Xact        cluster.Xact
	Interrupted bool
}

type NotifXact

type NotifXact struct {
	Xact cluster.Xact
	nl.NotifBase
}

func (*NotifXact) ToNotifMsg

func (nx *NotifXact) ToNotifMsg() cluster.NotifMsg

type NotifXactListener

type NotifXactListener struct {
	nl.NotifListenerBase
}

func NewXactNL

func NewXactNL(uuid, action string, smap *cluster.Smap, srcs cluster.NodeMap, bck ...*cmn.Bck) *NotifXactListener

func (*NotifXactListener) AbortArgs

func (nxb *NotifXactListener) AbortArgs() cmn.HreqArgs

(see also downloader.AbortArgs)

func (*NotifXactListener) QueryArgs

func (nxb *NotifXactListener) QueryArgs() cmn.HreqArgs

func (*NotifXactListener) UnmarshalStats

func (*NotifXactListener) UnmarshalStats(rawMsg []byte) (stats any, finished, aborted bool, err error)

type QueryMsg

type QueryMsg struct {
	OnlyRunning *bool     `json:"show_active"`
	Ext         any       `json:"ext"`
	Bck         cmn.Bck   `json:"bck"`
	ID          string    `json:"id"`
	Kind        string    `json:"kind"`
	DaemonID    string    `json:"node,omitempty"`
	Buckets     []cmn.Bck `json:"buckets,omitempty"`
}

NOTE: see closely related `api.XactReqArgs` and comments TODO: apc package, here and elsewhere

func (*QueryMsg) String

func (msg *QueryMsg) String() (s string)

type QueryMsgLRU

type QueryMsgLRU struct {
	Force bool `json:"force"`
}

type Snap

type Snap struct {
	StartTime time.Time `json:"start-time"`
	EndTime   time.Time `json:"end-time"`
	Bck       cmn.Bck   `json:"bck"`
	ID        string    `json:"id"`
	Kind      string    `json:"kind"`
	Stats     Stats     `json:"stats"` // common stats counters (below)
	AbortedX  bool      `json:"aborted"`
}

func (*Snap) Finished

func (b *Snap) Finished() bool

func (*Snap) IsAborted

func (b *Snap) IsAborted() bool

func (*Snap) Running

func (b *Snap) Running() bool

type SnapExt

type SnapExt struct {
	Ext any `json:"ext"`
	Snap
}

func (*SnapExt) Idle

func (b *SnapExt) Idle() bool

Idle is: - stat.IsIdle for on-demand xactions - !stat.Running() for the rest of xactions MorphMarshal cannot be used to read any stats as BaseDemandStatsExt because upcasting is unsupported (uknown fields are forbidden).

type Stats

type Stats struct {
	Objs     int64 `json:"loc-objs,string"`  // locally processed
	Bytes    int64 `json:"loc-bytes,string"` //
	OutObjs  int64 `json:"out-objs,string"`  // transmit
	OutBytes int64 `json:"out-bytes,string"` //
	InObjs   int64 `json:"in-objs,string"`   // receive
	InBytes  int64 `json:"in-bytes,string"`
}

Directories

Path Synopsis
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
Package xreg provides registry and (renew, find) functions for AIS eXtended Actions (xactions).
Package xs contains most of the supported eXtended actions (xactions) with some exceptions that include certain storage services (mirror, EC) and extensions (downloader, lru).
Package xs contains most of the supported eXtended actions (xactions) with some exceptions that include certain storage services (mirror, EC) and extensions (downloader, lru).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL