Post
Go Commons Pool发布以及Golang多线程编程问题总结
把 Apache Commons Pool 改写到 Go 的过程中,顺手总结了几类典型的并发编程问题和取舍。
趁着元旦放假,整理了一下最近学习Golang时,『翻译』的一个Golang的通用对象池,放到 github Go Commons Pool开源出来。之所以叫做『翻译』,是因为这个库的核心算法以及逻辑都是基于 Apache Commons Pool 的,只是把原来的Java『翻译』成了Golang。
前一段时间阅读 kubernetes 源码的时候,整体上学习了下 Golang,但语言这种东西,学了不用,几个星期就忘差不多了。一次 Golang 实践群里聊天,有人问到 Golang 是否有通用的对象池,搜索了下,貌似没有比较完备的。当前 Golang 的 pool 有以下解决方案:
-
sync.Pool
sync.Pool使用很简单,只需要传递一个创建对象的func即可。var objPool = sync.Pool{ New: func() interface{} { return NewObject() }, } p := objPool.Get().(*Object)但
sync.Pool只解决对象复用的问题,pool 中对象的生命周期是两次 gc 之间,gc 后 pool 中的对象会被回收,使用方不能控制对象的生命周期,所以不适合用在连接池等场景。 -
通过
container/list来实现自定义的 pool,比如 redigo 就使用这种办法。
但这些自定义的 pool 大多都不是通用的,功能也不完备。比如 redigo 当前没有获取连接池的超时机制,参看这个 issue Blocking with timeout when Get PooledConn。
而 Java 中的 commons pool,功能比较完备,算法和逻辑也经过验证,使用也比较广泛,所以就直接『翻译』过来,顺便练习 Golang 的语法。
作为一个通用的对象池,需要包含以下主要功能:
- 对象的生命周期可以精确控制,Pool 提供机制允许使用方自定义对象的创建、销毁、校验逻辑。
- 对象的存活数量可以精确控制,Pool 提供设置存活数量以及时长的配置。
- 获取对象有超时机制避免死锁,方便使用方实现 failover。以前也遇到过许多线上故障,就是因为连接池的设置或者实现机制有缺陷导致的。
Apache Commons Pool 的核心是基于 LinkedBlockingDeque,idle 对象都放在 deque 中。之所以是 deque,而不是 queue,是因为它支持 LIFO(last in, first out) / FIFO(first in, first out) 两种策略获取对象。然后有个包含所有对象的 Map,key 是用户自定义对象,value 是 PooledObject,用于校验 Return Object 的合法性,后台定时 abandoned 时遍历,计算活跃对象数等。超时是通过 Java 锁的 wait timeout 机制实现的。
下面总结下将 Java 翻译成 Golang 的时候遇到的多线程问题。
递归锁或者叫可重入锁(Recursive Lock)
Java 中的 synchronized 关键词以及 LinkedBlockingDeque 中用到的 ReentrantLock,都是可重入的。而 Golang 中的 sync.Mutex 是不可重入的。表现出来就是:
ReentrantLock lock;
public void a() {
lock.lock();
// do some thing
lock.unlock();
}
public void b() {
lock.lock();
// do some thing
lock.unlock();
}
public void all() {
lock.lock();
// do some thing
a();
// do some thing
b();
// do some thing
lock.unlock();
}
上例 all 方法中嵌套调用 a 方法,虽然调用 a 方法的时候也需要锁,但因为 all 已经申请锁,并且该锁可重入,所以不会导致死锁。而同样的代码在 Golang 中是会导致死锁的:
var lock sync.Mutex
func a() {
lock.Lock()
// do some thing
lock.Unlock()
}
func b() {
lock.Lock()
// do some thing
lock.Unlock()
}
func all() {
lock.Lock()
// do some thing
a()
// do some thing
b()
// do some thing
lock.Unlock()
}
只能重构为下面这样的:
var lock sync.Mutex
func a() {
lock.Lock()
a1()
lock.Unlock()
}
func a1() {
// do some thing
}
func b() {
lock.Lock()
b1()
lock.Unlock()
}
func b1() {
// do some thing
}
func all() {
lock.Lock()
// do some thing
a1()
// do some thing
b1()
// do some thing
lock.Unlock()
}
Golang 的核心开发者认为可重入锁是不好的设计,所以不提供,参看 Recursive (aka reentrant) mutexes are a bad idea。于是我们使用锁的时候就需要多注意嵌套以及递归调用。
锁等待超时机制
Golang 的 sync.Cond 只有 Wait,没有如 Java 中 Condition 的超时等待方法 await(long time, TimeUnit unit)。这样就没法实现 LinkedBlockingDeque 的 pollFirst(long timeout, TimeUnit unit) 这样的方法。有人提了 issue,但被拒绝了 sync: add WaitTimeout method to Cond。所以只能通过 channel 的机制模拟了一个超时等待的 Cond。完整源码参看 go-commons-pool/concurrent/cond.go。
type TimeoutCond struct {
L sync.Locker
signal chan int
}
func NewTimeoutCond(l sync.Locker) *TimeoutCond {
cond := TimeoutCond{L: l, signal: make(chan int, 0)}
return &cond
}
// return remain wait time, and is interrupt
func (this *TimeoutCond) WaitWithTimeout(timeout time.Duration) (time.Duration, bool) {
// wait should unlock mutex, if not will cause deadlock
this.L.Unlock()
defer this.L.Lock()
begin := time.Now().Nanosecond()
select {
case _, ok := <-this.signal:
end := time.Now().Nanosecond()
return time.Duration(end - begin), !ok
case <-time.After(timeout):
return 0, false
}
}
Map 机制的问题
这个问题严格地说不属于多线程问题。虽然 Golang 的 map 不是线程安全的,但通过 mutex 封装一下也很容易实现。关键问题在于我们前面提到的,pool 中用于维护全部对象的 map,key 是用户自定义对象,value 是 PooledObject。而 Golang 对 map 的 key 有约束:go spec: Map types
The comparison operators == and != must be fully defined for operands of the key type; thus the key type must not be a function, map, or slice. If the key type is an interface type, these comparison operators must be defined for the dynamic key values; failure will cause a run-time panic.
也就是说 key 中不能包含不可比较的值,比如 slice、map、function。而我们的 key 是用户自定义的对象,没办法进行约束。于是借鉴 Java 的 IdentityHashMap 思路,将 key 转换成对象的指针地址,实际上 map 中保存的是 key 对象的指针地址。
type SyncIdentityMap struct {
sync.RWMutex
m map[uintptr]interface{}
}
func (this *SyncIdentityMap) Get(key interface{}) interface{} {
this.RLock()
keyPtr := genKey(key)
value := this.m[keyPtr]
this.RUnlock()
return value
}
func genKey(key interface{}) uintptr {
keyValue := reflect.ValueOf(key)
return keyValue.Pointer()
}
同时,这样做的缺点是 Pool 中存的对象必须是指针,不能是值对象。比如 string、int 等对象是不能保存到 Pool 中的。
其他关于多线程的题外话
Golang 的 test -race 参数非常好用,通过这个参数,发现了几个 data race 的 bug,参看 commit fix data race test error。
Go Commons Pool 后续工作
- 继续完善测试用例,测试用例当前已经完成了大约一半多,覆盖率 88%。『翻译』的时候,主体代码相对来说写起来很快,但测试用例就比较麻烦多了,多线程情况下调试也比较复杂。一般基础库的测试用例代码是核心逻辑代码的 2-3 倍。
- 做下 benchmark。核心算法上应该没啥问题,都是经过验证的。但用 channel 模拟 timeout 的机制上可能有瓶颈,这块要考虑 timer 的复用机制。参看 Terry-Mao/goim。
- 上两项完成了,就可以准备发布个正式版本,可以通过这个 pool 改进下 redigo。