Golang 源码分析系列之 atomic 底层实现

本文转载自:Golang 源码分析系列之 atomic 底层实现

atomic 概述

Package atomic provides low-level atomic memory primitives useful for implementing synchronization algorithms.

atomic 包提供了用于实现同步机制的底层原子内存原语。

These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don’t communicate by sharing memory.

使用这些功能需要非常小心。除了特殊的底层应用程序外,最好使用通道或 sync 包来进行同步。通过通信来共享内存;不要通过共享内存来通信

对整数类型 T 的操作

T 类型是 int32int64uint32uint64uintptr 其中一种。

1
2
3
4
5
func AddT (addr *T, delta T) (new T)
func CompareAndSwapT (addr *T, old, new T) (swapped bool)
func LoadT (addr *T) (val T)
func StoreT (addr *T, val T)
func SwapT (addr *T, new T) (old T)

对于 unsafe.Pointer 类型的操作

1
2
3
4
func CompareAndSwapPointer (addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func LoadPointer (addr *unsafe.Pointer) (val unsafe.Pointer)
func StorePointer (addr *unsafe.Pointer, val unsafe.Pointer)
func SwapPointer (addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

atomic.Value 类型提供 Load/Store 操作

atomic 提供了 atomic.Value 类型,用来原子性加载和存储类型一致的值(consistently typed value)。atomic.Value 提供了对任何类型的原则性操作。

1
2
func (v *Value) Load () (x interface {}) // 原子性返回刚刚存储的值,若没有值返回 nil
func (v *Value) Store (x interface {}) // 原子性存储值 x,x 可以是 nil,但需要每次存的值都必须是同一个具体类型。

用法

用法示例 1:原子性增加值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main () {
	var count int32
	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add (1)
		go func () {
			atomic.AddInt32 (&count, 1) // 原子性增加值
			wg.Done ()
		}()
		go func () {
			fmt.Println (atomic.LoadInt32 (&count)) // 原子性加载
		}()
	}
	wg.Wait ()
	fmt.Println ("count:", count)
}

用法示例 2:简易自旋锁实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
	"sync/atomic"
)

type spin int64

func (l *spin) lock () bool {
	for {
		if atomic.CompareAndSwapInt64 ((*int64)(l), 0, 1) {
			return true
		}
		continue
	}
}

func (l *spin) unlock () bool {
	for {
		if atomic.CompareAndSwapInt64 ((*int64)(l), 1, 0) {
			return true
		}
		continue
	}
}

func main () {
	s := new (spin)

	for i := 0; i < 5; i++ {
		s.lock ()
		go func (i int) {
			println (i)
			s.unlock ()
		}(i)
	}
	for {

	}
}

用法示例 3: 无符号整数减法操作

对于 Uint32 和 Uint64 类型 Add 方法第二个参数只能接受相应的无符号整数,atomic 包没有提供减法 SubstractT 操作:

1
2
func AddUint32 (addr *uint32, delta uint32) (new uint32)
func AddUint64 (addr *uint64, delta uint64) (new uint64)

对于无符号整数 V,我们可以传递 -V 给 AddT 方法第二个参数就可以实现减法操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
	"sync/atomic"
)

func main () {
	var i uint64 = 100
	var j uint64 = 10
	var k = 5
	atomic.AddUint64 (&i, -j)
	println (i)
	atomic.AddUint64 (&i, -uint64 (k))
	println (i)
	// 下面这种操作是不可以的,会发生恐慌:constant -5 overflows uint64
	//atomic.AddUint64 (&i, -uint64 (5))
}

源码分析

atomic 包提供的三类操作的前两种都是直接通过汇编源码实现的(sync/atomic/asm.s):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include "textflag.h"

TEXTSwapInt32 (SB),NOSPLIT,$0
	JMP	runtimeinternalatomicXchg (SB)

TEXTSwapUint32 (SB),NOSPLIT,$0
	JMP	runtimeinternalatomicXchg (SB)

...

TEXTStoreUintptr (SB),NOSPLIT,$0
	JMP	runtimeinternalatomicStoreuintptr (SB)

从上面汇编代码可以看出来 atomic 操作通过 JMP 操作跳到 runtime/internal/atomic 目录下面的汇编实现。我们把目标转移到 runtime/internal/atomic 目录下面。

该目录包含针对不同平台的 atomic 汇编实现 asm_xxx.s。这里面我们只关注 amd64 平台 asm_amd64.s(runtime/internal/atomic/asm_amd64.s) 和 atomic_amd64.go(runtime/internal/atomic/atomic_amd64.go)。

函数 底层实现
SwapInt32 / SwapUint32 runtime∕internal∕atomic・Xchg
SwapInt64 / SwapUint64 / SwapUintptr runtime∕internal∕atomic・Xchg64
CompareAndSwapInt32 / CompareAndSwapUint32 runtime∕internal∕atomic・Cas
CompareAndSwapUintptr / CompareAndSwapInt64 / CompareAndSwapUint64 runtime∕internal∕atomic・Cas64
AddInt32 / AddUint32 runtime∕internal∕atomic・Xadd
AddUintptr / AddInt64 / AddUint64 runtime∕internal∕atomic・Xadd64
LoadInt32 / LoadUint32 runtime∕internal∕atomic・Load
LoadInt64 / LoadUint64 / LoadUint64/ LoadUintptr runtime∕internal∕atomic・Load64
LoadPointer runtime∕internal∕atomic・Loadp
StoreInt32 / StoreUint32 runtime∕internal∕atomic・Store
StoreInt64 / StoreUint64 / StoreUintptr runtime∕internal∕atomic・Store64

Add 操作

AddUintptrAddInt64 以及 AddUint64 都是由方法 runtime∕internal∕atomic・Xadd64 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
TEXT runtimeinternalatomicXadd64 (SB), NOSPLIT, $0-24
	MOVQ	ptr+0 (FP), BX // 第一个参数保存到 BX
	MOVQ	delta+8 (FP), AX // 第二个参数保存到 AX
	MOVQ	AX, CX  // 将第二个参数临时存到 CX 寄存器中
	LOCK			// LOCK 指令进行锁住操作,实现对共享内存独占访问
	XADDQ	AX, 0 (BX) //xaddq 指令,实现寄存器 AX 的值与 BX 指向的内存存的值互换,
	// 并将这两个值的和存在 BX 指向的内存中,此时 AX 寄存器存的是第一个参数指向的值
	ADDQ	CX, AX // 此时 AX 寄存器的值是 Add 操作之后的值,和 0 (BX) 值一样
	MOVQ	AX, ret+16 (FP) # 返回值
	RET

LOCK指令是一个指令前缀,其后是读 - 写性质的指令,在多处理器环境中,LOCK 指令能够确保在执行 LOCK 随后的指令时,处理器拥有对数据的独占使用。若对应数据已经在 cache line 里,也就不用锁定总线,仅锁住缓存行即可,否则需要锁住总线来保证独占性。

XADDQ指令用于交换加操作,会将源操作数与目的操作数互换,并将两者的和保存到源操作数中。

AddInt32AddUint32 都是由方法 runtime∕internal∕atomic・Xadd 实现,实现逻辑和 runtime∕internal∕atomic・Xadd64 一样,只是 Xadd 中相关数据操作指令后缀是 L

1
2
3
4
5
6
7
8
9
TEXT runtimeinternalatomicXadd (SB), NOSPLIT, $0-20
	MOVQ	ptr+0 (FP), BX // 注意第一个参数是一个指针类型,是 64 位,所以还是 MOVQ 指令
	MOVL	delta+8 (FP), AX // 第二个参数 32 位的,所以是 MOVL 指令
	MOVL	AX, CX
	LOCK
	XADDL	AX, 0 (BX)
	ADDL	CX, AX
	MOVL	AX, ret+16 (FP)
	RET

Store 操作

StoreInt64StoreUint64StoreUintptr 三个是 runtime∕internal∕atomic・Store64 方法实现:

1
2
3
4
5
6
TEXT runtimeinternalatomicStore64 (SB), NOSPLIT, $0-16
	MOVQ	ptr+0 (FP), BX // 第一个参数保存到 BX
	MOVQ	val+8 (FP), AX // 第二个参数保存到 AX
	XCHGQ	AX, 0 (BX) // 将 AX 寄存器与 BX 寄存指向内存的值互换,
	// 那么第一个参数指向的内存存的值为第二个参数
	RET

XCHGQ指令是交换指令,用于交换源操作数和目的操作数。

StoreInt32StoreUint32 是由 runtime∕internal∕atomic・Store 方法实现,与 runtime∕internal∕atomic・Store64 逻辑一样,这里不在赘述。

CompareAndSwap 操作

CompareAndSwapUintptrCompareAndSwapInt64CompareAndSwapUint64 都是由 runtime∕internal∕atomic・Cas64 实现:

1
2
3
4
5
6
7
8
TEXT runtimeinternalatomicCas64 (SB), NOSPLIT, $0-25
	MOVQ	ptr+0 (FP), BX // 将第一个参数保存到 BX
	MOVQ	old+8 (FP), AX // 将第二个参数保存到 AX
	MOVQ	new+16 (FP), CX // 将第三个参数保存 CX
	LOCK				 // LOCK 指令进行上锁操作
	CMPXCHGQ	CX, 0 (BX) // BX 寄存器指向的内存的值与 AX 寄存器值进行比较,若相等则把 CX 寄存器值存储到 BX 寄存器指向的内存中
	SETEQ	ret+24 (FP)
	RET

CMPXCHGQ指令是比较并交换指令,它的用法是将目的操作数和累加寄存器 AX 进行比较,若相等,则将源操作数复制到目的操作数中,否则将目的操作复制到累加寄存器中。

Swap 操作

SwapInt64SwapUint64SwapUintptr 实现的方法是 runtime∕internal∕atomic・Xchg64SwapInt32SwapUint32 底层实现是 runtime∕internal∕atomic・Xchg,这里面只分析 64 的操作:

1
2
3
4
5
6
TEXT runtimeinternalatomicXchg64 (SB), NOSPLIT, $0-24
	MOVQ	ptr+0 (FP), BX // 第一个参数保存到 BX
	MOVQ	new+8 (FP), AX // 第一个参数保存到 AX 中
	XCHGQ	AX, 0 (BX) // XCHGQ 指令交互 AX 值到 0 (BX) 中
	MOVQ	AX, ret+16 (FP) // 将旧值返回
	RET

Load 操作

LoadInt32LoadUint32LoadInt64LoadUint64LoadUint64LoadUintptrLoadPointer 实现都是 Go 实现的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
//go:linkname Load
//go:linkname Loadp
//go:linkname Load64

//go:nosplit
//go:noinline
func Load (ptr *uint32) uint32 {
	return *ptr
}

//go:nosplit
//go:noinline
func Loadp (ptr unsafe.Pointer) unsafe.Pointer {
	return *(*unsafe.Pointer)(ptr)
}

//go:nosplit
//go:noinline
func Load64 (ptr *uint64) uint64 {
	return *ptr
}

最后我们来分析 atomic.Value 类型提供 Load/Store 操作。

atomic.Value 类型的 Load/Store 操作

atomic.Value 类型定义如下:

1
2
3
4
5
6
7
8
9
type Value struct {
	v interface {}
}

 //ifaceWords 是空接口底层表示
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

atomic.Value 底层存储的是空接口类型,空接口底层结构如下:

1
2
3
4
type eface struct {
	_type *_type // 空接口持有的类型
	data  unsafe.Pointer// 指向空接口持有类型变量的指针
}

atomic.Value 内存布局如下所示:

img

从上图可以看出来 atomic.Value 内部分为两部分,第一个部分是_type 类型指针,第二个部分是 unsafe.Pointer 类型,两个部分大小都是 8 字节(64 系统下)。我们可以通过以下代码进行测试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Value struct {
	v interface {}
}

type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

func main () {
	func main () {
	val := Value {v: 123456}
	t := (*ifaceWords)(unsafe.Pointer (&val))
	dp := (*t).data            //dp 是非安全指针类型变量
	fmt.Println (*((*int)(dp))) // 输出 123456

	var val2 Value
	t = (*ifaceWords)(unsafe.Pointer (&val2))
	fmt.Println (t.typ) // 输出 nil
}

接下来我们看下 Store 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (v *Value) Store (x interface {}) {
	if x == nil { //atomic.Value 类型变量不能是 nil
		panic ("sync/atomic: store of nil value into Value")
	}
	vp := (*ifaceWords)(unsafe.Pointer (v)) // 将指向 atomic.Value 类型指针转换成 * ifaceWords 类型
	xp := (*ifaceWords)(unsafe.Pointer (&x)) //xp 是 * faceWords 类型指针,指向传入参数 x
	for {
		typ := LoadPointer (&vp.typ) // 原子性返回 vp.typ
		if typ == nil { // 第一次调用 Store 时候,atomic.Value 底层结构体第一部分是 nil,
		// 我们可以从上面测试代码可以看出来
			runtime_procPin () //pin process 处理,防止 M 被抢占
			if !CompareAndSwapPointer (&vp.typ, nil, unsafe.Pointer (^uintptr (0))) { // 通过 cas 操作,将 atomic.Value 的第一部分存储为 unsafe.Pointer (^uintptr (0)),若没操作成功,继续操作
				runtime_procUnpin () //unpin process 处理,释放对当前 M 的锁定
				continue
			}

			//vp.data == xp.data
			//vp.typ == xp.typ
			StorePointer (&vp.data, xp.data)
			StorePointer (&vp.typ, xp.typ)
			runtime_procUnpin ()
			return
		}
		if uintptr (typ) == ^uintptr (0) { // 此时说明第一次的 Store 操作未完成,正在处理中,此时其他的 Store 等待第一次操作完成
			continue
		}

		if typ != xp.typ { // 再次 Store 操作时进行 typ 类型校验,确保每次 Store 数据对象都必须是同一类型
			panic ("sync/atomic: store of inconsistently typed value into Value")
		}
		StorePointer (&vp.data, xp.data) //vp.data == xp.data
		return
	}
}

总结上面 Store 流程:

  1. 每次调用 Store 方法时候,会将传入参数转换成 interface {} 类型。当第一次调用 Store 方法时候,分两部分操作,分别将传入参数空接口类型的_typ 和 data,存储到 Value 类型中。
  2. 当再次调用 Store 类型时候,进行传入参数空接口类型的_type 和 Value 的_type 比较,若不一致直接 panic,若一致则将 data 存储到 Value 类型中

从流程 2 可以看出来,每次调用 Store 方法时传入参数都必须是同一类型的变量。当 Store 完成之后,实现了 “鸠占鹊巢”,atomic.Value 底层存储的实际上是 (interface {}) x。

最后我们看看 atomic.Value 的 Load 操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (v *Value) Load () (x interface {}) {
	vp := (*ifaceWords)(unsafe.Pointer (v)) // 将指向 v 指针转换成 * ifaceWords 类型
	typ := LoadPointer (&vp.typ)
	if typ == nil || uintptr (typ) == ^uintptr (0) { //typ == nil 说明 Store 方法未调用过
	//uintptr (typ) == ^uintptr (0) 说明第一 Store 方法调用正在进行中
		return nil
	}
	data := LoadPointer (&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer (&x))
	xp.typ = typ
	xp.data = data
	return
}
0%