batcher

package
v0.0.0-...-011bec4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin")
	ErrMaxFrameIndex         = errors.New("max frame index reached (uint16)")
	ErrMaxDurationReached    = errors.New("max channel duration reached")
	ErrChannelTimeoutClose   = errors.New("close to channel timeout")
	ErrSeqWindowClose        = errors.New("close to sequencer window timeout")
	ErrTerminated            = errors.New("channel terminated")
)
View Source
var ErrAlreadyStopped = errors.New("already stopped")
View Source
var ErrBatcherNotRunning = errors.New("batcher is not running")
View Source
var ErrPendingAfterClose = errors.New("pending channels remain after closing channel-manager")
View Source
var ErrReorg = errors.New("block does not extend existing chain")

Functions

func Main

func Main(version string) cliapp.LifecycleAction

Main is the entrypoint into the Batch Submitter. This method returns a cliapp.LifecycleAction, to create an op-service CLI-lifecycle-managed batch-submitter with.

func MaxDataSize

func MaxDataSize(numFrames int, maxFrameSize uint64) uint64

MaxDataSize returns the maximum byte size of output data that can be packed into a channel with numFrames frames and frames of max size maxFrameSize. It accounts for the constant frame overhead. It panics if the maxFrameSize is smaller than derive.FrameV0OverHeadSize.

func NewChannelManager

func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollupCfg *rollup.Config) *channelManager

Types

type BatchSubmitter

type BatchSubmitter struct {
	DriverSetup
	// contains filtered or unexported fields
}

BatchSubmitter encapsulates a service responsible for submitting L2 tx batches to L1 for availability.

func NewBatchSubmitter

func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter

NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup

func (*BatchSubmitter) StartBatchSubmitting

func (l *BatchSubmitter) StartBatchSubmitting() error

func (*BatchSubmitter) StopBatchSubmitting

func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error

StopBatchSubmitting stops the batch-submitter loop, and force-kills if the provided ctx is done.

func (*BatchSubmitter) StopBatchSubmittingIfRunning

func (l *BatchSubmitter) StopBatchSubmittingIfRunning(ctx context.Context) error

type BatcherConfig

type BatcherConfig struct {
	NetworkTimeout         time.Duration
	PollInterval           time.Duration
	MaxPendingTransactions uint64

	// UseBlobs is true if the batcher should use blobs instead of calldata for posting blobs
	UseBlobs bool

	// UsePlasma is true if the rollup config has a DA challenge address so the batcher
	// will post inputs to the Plasma DA server and post commitments to blobs or calldata.
	UsePlasma bool

	WaitNodeSync        bool
	CheckRecentTxsDepth int
}

type BatcherService

type BatcherService struct {
	Log              log.Logger
	Metrics          metrics.Metricer
	L1Client         *ethclient.Client
	EndpointProvider dial.L2EndpointProvider
	TxManager        txmgr.TxManager
	PlasmaDA         *plasma.DAClient

	BatcherConfig

	RollupConfig *rollup.Config

	// Channel builder parameters
	ChannelConfig ChannelConfig

	Version string

	NotSubmittingOnStart bool
	// contains filtered or unexported fields
}

BatcherService represents a full batch-submitter instance and its resources, and conforms to the op-service CLI Lifecycle interface.

func BatcherServiceFromCLIConfig

func BatcherServiceFromCLIConfig(ctx context.Context, version string, cfg *CLIConfig, log log.Logger) (*BatcherService, error)

BatcherServiceFromCLIConfig creates a new BatcherService from a CLIConfig. The service components are fully started, except for the driver, which will not be submitting batches (if it was configured to) until the Start part of the lifecycle.

func (*BatcherService) Driver

func (bs *BatcherService) Driver() rpc.BatcherDriver

Driver returns the handler on the batch-submitter driver element, to start/stop/restart the batch-submission work, for use in testing.

func (*BatcherService) Kill

func (bs *BatcherService) Kill() error

Kill is a convenience method to forcefully, non-gracefully, stop the BatcherService.

func (*BatcherService) Start

func (bs *BatcherService) Start(_ context.Context) error

Start runs once upon start of the batcher lifecycle, and starts batch-submission work if the batcher is configured to start submit data on startup.

func (*BatcherService) Stop

func (bs *BatcherService) Stop(ctx context.Context) error

Stop fully stops the batch-submitter and all its resources gracefully. After stopping, it cannot be restarted. See driver.StopBatchSubmitting to temporarily stop the batch submitter. If the provided ctx is cancelled, the stopping is forced, i.e. the batching work is killed non-gracefully.

func (*BatcherService) Stopped

func (bs *BatcherService) Stopped() bool

Stopped returns if the service as a whole is stopped.

type CLIConfig

type CLIConfig struct {
	// L1EthRpc is the HTTP provider URL for L1.
	L1EthRpc string

	// L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
	L2EthRpc string

	// RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided.
	RollupRpc string

	// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a
	// channel open. This allows to more eagerly send batcher transactions
	// during times of low L2 transaction volume. Note that the effective
	// L1-block distance between batcher transactions is then MaxChannelDuration
	// + NumConfirmations because the batcher waits for NumConfirmations blocks
	// after sending a batcher tx and only then starts a new channel.
	//
	// If 0, duration checks are disabled.
	MaxChannelDuration uint64

	// The batcher tx submission safety margin (in #L1-blocks) to subtract from
	// a channel's timeout and sequencing window, to guarantee safe inclusion of
	// a channel on L1.
	SubSafetyMargin uint64

	// PollInterval is the delay between querying L2 for more transaction
	// and creating a new batch.
	PollInterval time.Duration

	// MaxPendingTransactions is the maximum number of concurrent pending
	// transactions sent to the transaction manager (0 == no limit).
	MaxPendingTransactions uint64

	// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
	// If using blobs, this setting is ignored and the max blob size is used.
	MaxL1TxSize uint64

	// The target number of frames to create per channel. Controls number of blobs
	// per blob tx, if using Blob DA.
	TargetNumFrames int

	// ApproxComprRatio to assume (only [compressor.RatioCompressor]).
	// Should be slightly smaller than average from experiments to avoid the
	// chances of creating a small additional leftover frame.
	ApproxComprRatio float64

	// Type of compressor to use. Must be one of [compressor.KindKeys].
	Compressor string

	// Type of compression algorithm to use. Must be one of [zlib, brotli, brotli[9-11]]
	CompressionAlgo derive.CompressionAlgo

	// If Stopped is true, the batcher starts stopped and won't start batching right away.
	// Batching needs to be started via an admin RPC.
	Stopped bool

	// Whether to wait for the sequencer to sync to a recent block at startup.
	WaitNodeSync bool

	// How many blocks back to look for recent batcher transactions during node sync at startup.
	// If 0, the batcher will just use the current head.
	CheckRecentTxsDepth int

	BatchType uint

	// DataAvailabilityType is one of the values defined in op-batcher/flags/types.go and dictates
	// the data availability type to use for posting batches, e.g. blobs vs calldata.
	DataAvailabilityType flags.DataAvailabilityType

	// TestUseMaxTxSizeForBlobs allows to set the blob size with MaxL1TxSize.
	// Should only be used for testing purposes.
	TestUseMaxTxSizeForBlobs bool

	// ActiveSequencerCheckDuration is the duration between checks to determine the active sequencer endpoint.
	ActiveSequencerCheckDuration time.Duration

	TxMgrConfig   txmgr.CLIConfig
	LogConfig     oplog.CLIConfig
	MetricsConfig opmetrics.CLIConfig
	PprofConfig   oppprof.CLIConfig
	RPC           oprpc.CLIConfig
	PlasmaDA      plasma.CLIConfig
}

func NewConfig

func NewConfig(ctx *cli.Context) *CLIConfig

NewConfig parses the Config from the provided flags or environment variables.

func (*CLIConfig) Check

func (c *CLIConfig) Check() error

type ChannelBuilder

type ChannelBuilder struct {
	// contains filtered or unexported fields
}

ChannelBuilder uses a ChannelOut to create a channel with output frame size approximation.

func NewChannelBuilder

func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1OriginBlockNum uint64) (*ChannelBuilder, error)

newChannelBuilder creates a new channel builder or returns an error if the channel out could not be created. it acts as a factory for either a span or singular channel out

func (*ChannelBuilder) AddBlock

func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, error)

AddBlock adds a block to the channel compression pipeline. IsFull should be called afterwards to test whether the channel is full. If full, a new channel must be started.

AddBlock returns a ChannelFullError if called even though the channel is already full. See description of FullErr for details.

AddBlock also returns the L1BlockInfo that got extracted from the block's first transaction for subsequent use by the caller.

Call OutputFrames() afterwards to create frames.

func (*ChannelBuilder) Blocks

func (c *ChannelBuilder) Blocks() []*types.Block

Blocks returns a backup list of all blocks that were added to the channel. It can be used in case the channel needs to be rebuilt.

func (*ChannelBuilder) CheckTimeout

func (c *ChannelBuilder) CheckTimeout(l1BlockNum uint64)

CheckTimeout checks if the channel is timed out at the given block number and in this case marks the channel as full, if it wasn't full already.

func (*ChannelBuilder) Close

func (c *ChannelBuilder) Close()

Close immediately marks the channel as full with an ErrTerminated if the channel is not already full.

func (*ChannelBuilder) FramePublished

func (c *ChannelBuilder) FramePublished(l1BlockNum uint64)

FramePublished should be called whenever a frame of this channel got published with the L1-block number of the block that the frame got included in.

func (*ChannelBuilder) FullErr

func (c *ChannelBuilder) FullErr() error

FullErr returns the reason why the channel is full. If not full yet, it returns nil.

It returns a ChannelFullError wrapping one of the following possible reasons for the channel being full:

  • derive.ErrCompressorFull if the compressor target has been reached,
  • derive.MaxRLPBytesPerChannel if the general maximum amount of input data would have been exceeded by the latest AddBlock call,
  • ErrMaxFrameIndex if the maximum number of frames has been generated (uint16),
  • ErrMaxDurationReached if the max channel duration got reached,
  • ErrChannelTimeoutClose if the consensus channel timeout got too close,
  • ErrSeqWindowClose if the end of the sequencer window got too close,
  • ErrTerminated if the channel was explicitly terminated.

func (*ChannelBuilder) HasFrame

func (c *ChannelBuilder) HasFrame() bool

HasFrame returns whether there's any available frame. If true, it can be popped using NextFrame().

Call OutputFrames before to create new frames from the channel out compression pipeline.

func (*ChannelBuilder) ID

func (*ChannelBuilder) InputBytes

func (c *ChannelBuilder) InputBytes() int

InputBytes returns the total amount of input bytes added to the channel.

func (*ChannelBuilder) IsFull

func (c *ChannelBuilder) IsFull() bool

IsFull returns whether the channel is full. FullErr returns the reason for the channel being full.

func (*ChannelBuilder) LatestL1Origin

func (c *ChannelBuilder) LatestL1Origin() eth.BlockID

LatestL1Origin returns the latest L1 block origin from all the L2 blocks that have been added to the channel

func (*ChannelBuilder) NextFrame

func (c *ChannelBuilder) NextFrame() frameData

NextFrame returns the next available frame. HasFrame must be called prior to check if there's a next frame available. Panics if called when there's no next frame.

func (*ChannelBuilder) OutputBytes

func (c *ChannelBuilder) OutputBytes() int

func (*ChannelBuilder) OutputFrames

func (c *ChannelBuilder) OutputFrames() error

OutputFrames creates new frames with the channel out. It should be called after AddBlock and before iterating over available frames with HasFrame and NextFrame.

If the channel isn't full yet, it will conservatively only pull readily available frames from the compression output. If it is full, the channel is closed and all remaining frames will be created, possibly with a small leftover frame.

func (*ChannelBuilder) PendingFrames

func (c *ChannelBuilder) PendingFrames() int

PendingFrames returns the number of pending frames in the frames queue. It is larger zero iff HasFrames() returns true.

func (*ChannelBuilder) PushFrames

func (c *ChannelBuilder) PushFrames(frames ...frameData)

PushFrames adds the frames back to the internal frames queue. Panics if not of the same channel.

func (*ChannelBuilder) ReadyBytes

func (c *ChannelBuilder) ReadyBytes() int

ReadyBytes returns the amount of bytes ready in the compression pipeline to output into a frame.

func (*ChannelBuilder) TimedOut

func (c *ChannelBuilder) TimedOut(blockNum uint64) bool

TimedOut returns whether the passed block number is after the timeout block number. If no block timeout is set yet, it returns false.

func (*ChannelBuilder) Timeout

func (c *ChannelBuilder) Timeout() uint64

Timeout returns the block number of the channel timeout. If no timeout is set it returns 0

func (*ChannelBuilder) TotalFrames

func (c *ChannelBuilder) TotalFrames() int

TotalFrames returns the total number of frames that were created in this channel so far. It does not decrease when the frames queue is being emptied.

type ChannelConfig

type ChannelConfig struct {
	// Number of epochs (L1 blocks) per sequencing window, including the epoch
	// L1 origin block itself
	SeqWindowSize uint64
	// The maximum number of L1 blocks that the inclusion transactions of a
	// channel's frames can span.
	ChannelTimeout uint64

	// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep the
	// channel open. This allows control over how long a channel is kept open
	// during times of low transaction volume.
	//
	// If 0, duration checks are disabled.
	MaxChannelDuration uint64
	// The batcher tx submission safety margin (in #L1-blocks) to subtract from
	// a channel's timeout and sequencing window, to guarantee safe inclusion of
	// a channel on L1.
	SubSafetyMargin uint64
	// The maximum byte-size a frame can have.
	MaxFrameSize uint64

	// Target number of frames to create per channel.
	// For blob transactions, this controls the number of blobs to target adding
	// to each blob tx.
	TargetNumFrames int

	// CompressorConfig contains the configuration for creating new compressors.
	// It should not be set directly, but via the Init*Compressor methods after
	// creating the ChannelConfig to guarantee a consistent configuration.
	CompressorConfig compressor.Config

	// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
	BatchType uint

	// Whether to put all frames of a channel inside a single tx.
	// Should only be used for blob transactions.
	MultiFrameTxs bool
}

func (*ChannelConfig) Check

func (cc *ChannelConfig) Check() error

Check validates the ChannelConfig parameters.

func (*ChannelConfig) InitCompressorConfig

func (cc *ChannelConfig) InitCompressorConfig(approxComprRatio float64, comprKind string, compressionAlgo derive.CompressionAlgo)

InitCompressorConfig (re)initializes the channel configuration's compressor configuration using the given values. The TargetOutputSize will be set to a value consistent with cc.TargetNumFrames and cc.MaxFrameSize. comprKind can be the empty string, in which case the default compressor will be used.

func (*ChannelConfig) InitNoneCompressor

func (cc *ChannelConfig) InitNoneCompressor()

func (*ChannelConfig) InitRatioCompressor

func (cc *ChannelConfig) InitRatioCompressor(approxComprRatio float64, compressionAlgo derive.CompressionAlgo)

func (*ChannelConfig) InitShadowCompressor

func (cc *ChannelConfig) InitShadowCompressor(compressionAlgo derive.CompressionAlgo)

func (*ChannelConfig) MaxFramesPerTx

func (cc *ChannelConfig) MaxFramesPerTx() int

type ChannelFullError

type ChannelFullError struct {
	Err error
}

func (*ChannelFullError) Error

func (e *ChannelFullError) Error() string

func (*ChannelFullError) Unwrap

func (e *ChannelFullError) Unwrap() error

type DriverSetup

type DriverSetup struct {
	Log              log.Logger
	Metr             metrics.Metricer
	RollupConfig     *rollup.Config
	Config           BatcherConfig
	Txmgr            txmgr.TxManager
	L1Client         L1Client
	EndpointProvider dial.L2EndpointProvider
	ChannelConfig    ChannelConfig
	PlasmaDA         *plasma.DAClient
}

DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.

type L1Client

type L1Client interface {
	HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
	NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

type L2Client

type L2Client interface {
	BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}

type RollupClient

type RollupClient interface {
	SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL