atomic 的使用
# atomic
# 原子操作
原子操作(atomic operation)是指不会被线程调度机制(不会被其他操作)打断的操作,并且这样的操作一旦开始,就一直运行到结束,之间不会有任何的中断。通俗地说,原子操作就如同一个原子一样,不可分割。
就拿数值求和这个例子,并发环境下对同一个变量进行加1,得出的结果会是怎么样?
func TestNoAtomic(t *testing.T) {
res := 0
target := 1000
var wg sync.WaitGroup
for i := 1; i <= target; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
res += i
}(i)
}
wg.Wait()
t.Log("预期结果:", sum(target))
t.Log("实际结果:", res)
}
func sum(t int) int {
return (1 + t) * t / 2
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
输出的结果(实际结果每次可能都不一样):
预期结果: 500500
实际结果: 498819
2
每次执行这个测试,得出的实际结果都是不一样的,而且不是正确的结果。这是因为并发环境下,没有对资源(这里的变量res
)进行保护,会出现以下情况:G1 下res=1
,G2 下res=2
,此时 G1 和 G2 都对res
加 1 ,G1下res=2
,G2 下res=3
;然后 G3 拿到res
,res
的在 G3 的值可能是 2,有可能是 3,数据出现了混乱,最终导致结果跟预期的不一致。
现在将 goroutine 的数量设置为 1,即同一时刻只能由一个 goroutine 运行:
func TestNoAtomic_OneGoroutine(t *testing.T) {
runtime.GOMAXPROCS(1)
res := 0
target := 1000
var wg sync.WaitGroup
for i := 1; i <= target; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
res += i
}(i)
}
wg.Wait()
t.Log("预期结果:", sum(target))
t.Log("实际结果:", res)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
输出的结果:
预期结果: 500500
实际结果: 500500
2
如果要在并发环境下保证数据的一致性和准确性,就需要原子操作或者说是串行操作。
串行操作,无论多少次执行都是同样的结果:
func TestSerial(t *testing.T) { res := 0 target := 1000 for i := 1; i <= target; i++ { res = res + i } t.Log("预期结果:", sum(target)) //预期结果: 500500 t.Log("实际结果:", res) //实际结果: 500500 }
1
2
3
4
5
6
7
8
9缺点是丧失了并发的优势。
原子操作
func TestAtomic(t *testing.T) { var res int32 target := 1000 var wg sync.WaitGroup for i := 1; i <= target; i++ { wg.Add(1) go func(i int) { defer wg.Done() atomic.AddInt32(&res, int32(i)) }(i) } wg.Wait() t.Log("预期结果:", sum(target)) //预期结果: 500500 t.Log("实际结果:", res) //实际结果: 500500 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15适合于较小和短事务的操作。
加锁操作
func TestMutex(t *testing.T) { res := 0 target := 1000 var wg sync.WaitGroup var mu sync.Mutex for i := 1; i <= target; i++ { wg.Add(1) go func(i int) { defer wg.Done() mu.Lock() res += i mu.Unlock() }(i) } wg.Wait() t.Log("预期结果:", sum(target)) //预期结果: 500500 t.Log("实际结果:", res) //实际结果: 500500 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
加锁和原子操作都常用于并发使处理数据竞争问题,通俗的说,其实加锁和原子操作就是将被保护的数据的操作由并发执行变成串行执行了。
# Go 语言的原子操作
Go语言的sync/atomic
提供了对原子操作的支持,用于同步访问整数和指针。
原子操作主要是两类:
- 修改:即重新赋值。
- 存储:即读写。
sync/atomic
提供AddXXX
、CompareAndSwapXXX
、SwapXXX
、LoadXXX
、StoreXXX
等方法。原子操作支持的类型包括int32、int64、uint32、uint64、uintptr、unsafe.Pointer
。
竞争条件是由于异步的访问共享资源,并试图同时读写该资源而导致的,使用互斥锁和通道的思路都是在线程获得到访问权后阻塞其他线程对共享内存的访问,而使用原子操作解决数据竞争问题则是利用了其不可被打断的特性。
# Add 方法
Add 方法很好理解,就是对addr
指向的值加delta
。delta
可以为整数,也可以为负数。
// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)
// AddUint32 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint32(&x, ^uint32(c-1)).
// In particular, to decrement x, do AddUint32(&x, ^uint32(0)).
func AddUint32(addr *uint32, delta uint32) (new uint32)
// AddInt64 atomically adds delta to *addr and returns the new value.
func AddInt64(addr *int64, delta int64) (new int64)
// AddUint64 atomically adds delta to *addr and returns the new value.
// To subtract a signed positive constant value c from x, do AddUint64(&x, ^uint64(c-1)).
// In particular, to decrement x, do AddUint64(&x, ^uint64(0)).
func AddUint64(addr *uint64, delta uint64) (new uint64)
// AddUintptr atomically adds delta to *addr and returns the new value.
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
从注释中可以看出,对于无符号的 uint
整型,如果要做减法,可以利用计算机补码的规则,把减法变成加法:
AddUint32(&x, ^uint32(c-1)) // 相当于 x - c
AddUint64(&x, ^uint64(c-1)) // 相当于 x - c
2
# CAS(CompareAndSwap) 方法
go 中的 CAS 操作,是借用了CPU提供的原子性指令来实现。CAS 操作修改共享变量时候不需要对共享变量加锁,而是通过类似乐观锁的方式进行检查,本质还是不断的占用 CPU 资源换取加锁带来的开销(比如上下文切换开销)。CAS 支持的方法如下:
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
// CompareAndSwapInt64 executes the compare-and-swap operation for an int64 value.
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
// CompareAndSwapUint32 executes the compare-and-swap operation for a uint32 value.
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
// CompareAndSwapUint64 executes the compare-and-swap operation for a uint64 value.
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
// CompareAndSwapUintptr executes the compare-and-swap operation for a uintptr value.
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
// CompareAndSwapPointer executes the compare-and-swap operation for a unsafe.Pointer value.
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompareAndSwapX
函数会先判断参数addr
指向的操作值与参数old
的值是否相等,仅当此判断得到的结果是true
之后,才会用参数new
代表的新值替换掉原先的旧值,否则操作就会被忽略。
如下是一个比较和交换变量a
和b
的例子:
func TestCAS(t *testing.T) {
var a int64 = 9
t.Log("a before:", a)
t.Log("a交换是否成功", atomic.CompareAndSwapInt64(&a, 10, 11))
t.Log("a after:", a)
var b int64 = 10
t.Log("b before:", b)
t.Log("b交换是否成功", atomic.CompareAndSwapInt64(&b, 10, 11))
t.Log("b after:", b)
}
2
3
4
5
6
7
8
9
10
11
输出:
a before: 9
a交换是否成功 false
a after: 9
b before: 10
b交换是否成功 true
b after: 11
2
3
4
5
6
sync.Mutext
的Lock
使用CompareAndSwapInt32
实现自旋锁:
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // 自旋锁
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
2
3
4
5
6
7
8
9
10
11
# Swap 方法
Swap 与 CompareAndSwap 方法相比,少了 Compare,即不需要进行比较就交换的原子操作。支持的方法:
// SwapInt32 atomically stores new into *addr and returns the previous *addr value.
func SwapInt32(addr *int32, new int32) (old int32)
// SwapInt64 atomically stores new into *addr and returns the previous *addr value.
func SwapInt64(addr *int64, new int64) (old int64)
// SwapUint32 atomically stores new into *addr and returns the previous *addr value.
func SwapUint32(addr *uint32, new uint32) (old uint32)
// SwapUint64 atomically stores new into *addr and returns the previous *addr value.
func SwapUint64(addr *uint64, new uint64) (old uint64)
// SwapUintptr atomically stores new into *addr and returns the previous *addr value.
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
// SwapPointer atomically stores new into *addr and returns the previous *addr value.
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Load(原子读取)和Store(原子写入)
Load 和 Store 方法可以说是成对使用的。Store 方法将一个值存到指定的addr
地址中,Load 方法从指定的addr
地址读取数据。
// LoadInt32 atomically loads *addr.
func LoadInt32(addr *int32) (val int32)
// LoadInt64 atomically loads *addr.
func LoadInt64(addr *int64) (val int64)
// LoadUint32 atomically loads *addr.
func LoadUint32(addr *uint32) (val uint32)
// LoadUint64 atomically loads *addr.
func LoadUint64(addr *uint64) (val uint64)
// LoadUintptr atomically loads *addr.
func LoadUintptr(addr *uintptr) (val uintptr)
// LoadPointer atomically loads *addr.
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
// StoreInt32 atomically stores val into *addr.
func StoreInt32(addr *int32, val int32)
// StoreInt64 atomically stores val into *addr.
func StoreInt64(addr *int64, val int64)
// StoreUint32 atomically stores val into *addr.
func StoreUint32(addr *uint32, val uint32)
// StoreUint64 atomically stores val into *addr.
func StoreUint64(addr *uint64, val uint64)
// StoreUintptr atomically stores val into *addr.
func StoreUintptr(addr *uintptr, val uintptr)
// StorePointer atomically stores val into *addr.
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
下面的例子是往变量a
存储一个值,然后再从a
读取值:
func TestStoreAndLoad(t *testing.T) {
var a int32
atomic.StoreInt32(&a, 10)
res := atomic.LoadInt32(&a)
t.Log(res) // 10
}
2
3
4
5
6
7
# atomic.Value
atomic.Value
实现了Load() (val any)
、Store(val any)
、Swap(new any) (old any)
、CompareAndSwap(old any, new any) (swapped bool)
四个方法,用于简化上面的集中操作,但操作效率会低一些。
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v any
}
2
3
4
5
6
7
8
atomic.Value
的使用示例:
func TestValue(t *testing.T) {
var a atomic.Value
a.Store(int32(10))
res := a.Load().(int32)
t.Log(res) // 10
t.Log(a.CompareAndSwap(int32(10), int32(11))) // true
res = a.Load().(int32)
t.Log(res) // 11
}
2
3
4
5
6
7
8
9
10
# 原子操作与互斥锁的区别
互斥锁是一种数据结构,使你可以执行一系列互斥操作。而原子操作是互斥的单个操作,这意味着没有其他线程可以打断它。
原子锁的优缺点:
- 优势:更轻量。比如 CAS 可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作。这可以大大减少同步对程序性能的损耗。
- 劣势:使用 CAS 操作的做法趋于乐观,总是假设被操作值未曾被改变(即与旧值相等),并一旦确认这个假设的真实性就立即进行值替换,那么在被操作值被频繁变更的情况下,CAS 操作并不那么容易成功。而使用互斥锁的做法则趋于悲观,我们总假设会有并发的操作要修改被操作的值,并使用锁将相关操作放入临界区中加以保护。
原子操作与互斥锁的区别:
- 互斥锁是一种数据结构,用来让一个线程(或 goroutine)执行程序的关键部分,完成互斥的多个操作。
- 原子操作是针对某个值的单个互斥操作。
- 可以把互斥锁理解为悲观锁,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。
atomic
包提供了底层的原子性内存原语,这对于同步算法的实现很有用。这些函数一定要非常小心地使用,使用不当反而会增加系统资源的开销,对于应用层来说,最好使用通道或sync
包中提供的功能来完成同步操作。