atomic

package
v1.96.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2022 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Example (IntervalRunner)
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/opencost/opencost/pkg/util/atomic"
)

// IntervalRunner is an example implementation of AtomicRunState.
type IntervalRunner struct {
	runState atomic.AtomicRunState
	action   func()
	interval time.Duration
}

// NewIntervalRunner Creates a new instance of an interval runner to execute the provided
// function on a designated interval until explicitly stopped.
func NewIntervalRunner(action func(), interval time.Duration) *IntervalRunner {
	return &IntervalRunner{
		action:   action,
		interval: interval,
	}
}

// Start begins the interval execution. It returns true if the interval execution successfully starts.
// It will return false if the interval execcution is already running.
func (ir *IntervalRunner) Start() bool {
	// Before we attempt to start, we must ensure we are not in a stopping state, this is a common
	// pattern that should be used with the AtomicRunState
	ir.runState.WaitForReset()

	// This will atomically check the current state to ensure we can run, then advances the state.
	// If the state is already started, it will return false.
	if !ir.runState.Start() {
		return false
	}

	// our run state is advanced, let's execute our action on the interval
	// spawn a new goroutine which will loop and wait the interval each iteration
	go func() {
		for {
			// use a select statement to receive whichever channel receives data first
			select {
			// if our stop channel receives data, it means we have explicitly called
			// Stop(), and must reset our AtomicRunState to it's initial idle state
			case <-ir.runState.OnStop():
				ir.runState.Reset()
				return // exit go routine

			// After our interval elapses, fall through
			case <-time.After(ir.interval):
			}

			// Execute the function
			ir.action()

			// Loop back to the select where we will wait for the interval to elapse
			// or an explicit stop to be called
		}
	}()

	return true
}

// Stop will explicitly stop the execution of the interval runner. If an action is already executing, it will wait
// until completion before processing the stop. Any attempts to start during the stopping phase will block until
// it's possible to Start() again
func (ir *IntervalRunner) Stop() bool {
	return ir.runState.Stop()
}

func main() {
	count := 0

	// As a general test, we'll use a goroutine which waits for a specific number of
	// ticks before calling stop, then issues a signal back to the main thread
	var wg sync.WaitGroup
	wg.Add(4)

	// Create a new IntervalRunner instance to execute our print action every second
	ir := NewIntervalRunner(
		func() {
			fmt.Printf("Tick[%d]\n", count)
			count++
			// advance the wait group count
			wg.Done()
		},
		time.Second,
	)

	// Start the runner, panic on failure
	if !ir.Start() {
		panic("Failed to start interval runner!")
	}

	// spin up a second goroutine which will wait for a specific number of
	// ticks before calling Stop(). This is a bit contrived, but demonstrates
	// multiple goroutines controlling the same interval runner.
	complete := make(chan bool)
	go func() {
		wg.Wait()

		// Stop the interval runner, notify main thread
		ir.Stop()
		complete <- true
	}()

	<-complete

	// Start immediately again using a different total tick count
	count = 0
	wg.Add(2)

	// Start the runner, panic on failure
	if !ir.Start() {
		panic("Failed to start interval runner!")
	}

	// Create a new Stop waiter
	go func() {
		wg.Wait()

		// Stop the interval runner, notify main thread
		ir.Stop()
		complete <- true
	}()

	<-complete

}
Output:

Tick[0]
Tick[1]
Tick[2]
Tick[3]
Tick[0]
Tick[1]

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AtomicBool

type AtomicBool int32

AtomicBool alias leverages a 32-bit integer CAS

func NewAtomicBool

func NewAtomicBool(value bool) *AtomicBool

NewAtomicBool creates an AtomicBool with given default value

func (*AtomicBool) CompareAndSet

func (ab *AtomicBool) CompareAndSet(current, new bool) bool

CompareAndSet sets value to new if current is equal to the current value. If the new value is set, this function returns true.

func (*AtomicBool) Get

func (ab *AtomicBool) Get() bool

Loads the bool value atomically

func (*AtomicBool) Set

func (ab *AtomicBool) Set(value bool)

Sets the bool value atomically

type AtomicInt32

type AtomicInt32 int32

func NewAtomicInt32

func NewAtomicInt32(value int32) *AtomicInt32

NewAtomicInt32 creates a new atomic int32 instance.

func (*AtomicInt32) CompareAndSet

func (ai *AtomicInt32) CompareAndSet(current, new int32) bool

CompareAndSet sets value to new if current is equal to the current value

func (*AtomicInt32) Decrement

func (ai *AtomicInt32) Decrement() int32

Decrements the atomint int and returns the new value

func (*AtomicInt32) Get

func (ai *AtomicInt32) Get() int32

Loads the int32 value atomically

func (*AtomicInt32) Increment

func (ai *AtomicInt32) Increment() int32

Increments the atomic int and returns the new value

func (*AtomicInt32) Set

func (ai *AtomicInt32) Set(value int32)

Sets the int32 value atomically

type AtomicRunState

type AtomicRunState struct {
	// contains filtered or unexported fields
}

AtomicRunState can be used to provide thread-safe start/stop functionality to internal run-loops inside a goroutine.

func (*AtomicRunState) IsRunning

func (ars *AtomicRunState) IsRunning() bool

IsRunning returns true if the state is running or in the process of stopping.

func (*AtomicRunState) IsStopping

func (ars *AtomicRunState) IsStopping() bool

IsStopping returns true if the run state has been stopped, but not yet reset.

func (*AtomicRunState) OnStop

func (ars *AtomicRunState) OnStop() <-chan struct{}

OnStop returns a channel that should be used within a select goroutine run loop. It is set to signal whenever Stop() is executed. Once the channel is signaled, Reset() should be called if the runstate is to be used again.

func (*AtomicRunState) Reset

func (ars *AtomicRunState) Reset()

Reset should be called in the select case for OnStop(). Note that calling Reset() prior to selecting OnStop() will result in failed Stop signal receive.

func (*AtomicRunState) Start

func (ars *AtomicRunState) Start() bool

Start checks for an existing run state and returns false if the run state has already started. If the run state has not started, then it will advance to the started state and return true.

func (*AtomicRunState) Stop

func (ars *AtomicRunState) Stop() bool

Stops closes the stop channel triggering any selects waiting for OnStop()

func (*AtomicRunState) WaitForReset

func (ars *AtomicRunState) WaitForReset()

WaitForStop will wait for a stop to occur IFF the run state is in the process of stopping.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL