goroutine不立即执行的解决办法

在Go的开发中我们会经常使用goroutine,但是有的时候goroutine的也不一定按照我们的想法去执行,我手头的一个日志分析项目(等稳定下来可以开源掉)就遇到这个问题,就这个问题我给大家分享一下。

需求描述:
我是实时抓取的Nginx日志,测试环境大概是1700QPS的样子,我需要实时统计所有Path的QPS,,每秒延迟,以及这些Path返回的每种状态码的Qps

算法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (this *_hostStat)runTimer(timerService TimerService) {
go func() {
defer func() {
this.runTimer(timerService)
}()
notify := make(chan time.Time)
timerService.RegNotify(notify, time.Second, true)
for {
select {
case <-notify:
//reqPathInfos map[int64][]*PathInfo
for hostId, reqPathInfos := range this.logService.GetPathInfos() {
hostMutex := this.GetHostMutex(hostId)
for _, reqPathInfo := range reqPathInfos {
pathId := reqPathInfo.pathInfo.GetId()
pathStat := this.getPathStat(hostMutex, pathId)
access := atomic.SwapInt64(pathStat.accessSecond, 0)
delay := atomic.SwapInt64(pathStat.delaySecond, 0)
hostMutex.Lock()
statuses := pathStat.statusSecond
pathStat.statusSecond = make(map[int64]*int64)
hostMutex.Unlock()
pathStat.accessStatses.compute(float64(access))
pathStat.delayStatses.compute(float64(delay) / 1000)
for code, countP := range statuses {
accessData := pathStat.statusStatses[code]
if accessData == nil {
accessData = buildAccessData()
pathStat.statusStatses[code] = accessData
}
accessData.compute(float64(*countP))
}
for code, accessData := range pathStat.statusStatses {
if _, ok := statuses[code]; !ok {
accessData.compute(float64(0))
}
}
}
}
}
}
}()
}

首先我自己写了一个Timer(代码很简单,后续再单独贴代码出来写篇文章),通过堵塞channel来通知是否需要开始执行代码,接下来就是for select(这个写go的都知道)。

大家看到这里应该发现我的timer是一秒执行一次,开始运行的时候我的host大概是10来个,但是你知道的,线上各种攻击和嗅探不断,host很快达到了100多,Path总量也因为新上线的restAPI急剧增加(我还没有写自动归并RestAPI的模块,哭死),所以我担心以后一秒内计算不完所有的Path统计数据

补充:我的统计是用移动平均值来计算的,这个算法不懂的可以自行google,然后区间分为30秒,5分钟,30分钟,1小时,24小时,全部是浮点运算,所以可想而知计算量在Path剧增的时候有多大,那么问题来了,怎么解决这个问题呢,于是我在第14行这个份for循环外面加上了goroutine,让每个host的Path统计在单独的goroutine里面计算,毕竟不这么干的时候我的CPU使用率才20-40%,我用的可是20核的机器,CPU不用也是浪费。

现在代码变成下面这样了:

```go
func (this _hostStat)runTimer(timerService TimerService) {
go func() {
defer func() {
this.runTimer(timerService)
}()
notify := make(chan time.Time)
timerService.RegNotify(notify, time.Second, true)
for {
select {
case <-notify:
//reqPathInfos map[int64][]
PathInfo
for hostId, reqPathInfos := range this.logService.GetPathInfos() {
hostMutex := this.GetHostMutex(hostId)
go func(){
for , reqPathInfo := range reqPathInfos {
pathId := reqPathInfo.pathInfo.GetId()
pathStat := this.getPathStat(hostMutex, pathId)
access := atomic.SwapInt64(pathStat.accessSecond, 0)
delay := atomic.SwapInt64(pathStat.delaySecond, 0)
hostMutex.Lock()
statuses := pathStat.statusSecond
pathStat.statusSecond = make(map[int64]int64)
hostMutex.Unlock()
pathStat.accessStatses.compute(float64(access))
pathStat.delayStatses.compute(float64(delay) / 1000)
for code, countP := range statuses {
accessData := pathStat.statusStatses[code]
if accessData == nil {
accessData = buildAccessData()
pathStat.statusStatses[code] = accessData
}
accessData.compute(float64(
countP))
}
for code, accessData := range pathStat.statusStatses {
if
, ok := statuses[code]; !ok {
accessData.compute(float64(0))
}
}
}
}()
}
}
}
}()
}
改成这样我心里还蛮得意的,成功的利用了多核,然后还大大加强了计算结果的实时性,可接下来的事情让我傻眼了,我发现我的计算结果几乎是不变的,再看一遍代码,没有写错啊,然后我在第15行的地方打印了一条日志 run compute,然后运行程序,我发现真的是没有运行,日志打了一条就不打了,难道goroutine被吃掉了?

这个时候我回想起几年前学go的时候,我们在main方法里面写了go func(){xxxxx}(),然后经常是还没等这个goroutine运行,程序就执行完了,也就是说goroutine还没被分配到执行时间片,主线程就执行完了。那么是不是会因为这个父Goroutine还没执行完,它创建的子Goroutine的运行时间片就不会被分配,这个也只是我的推测,我没时间去看那么多goroutine的底层代码,但是平时也看看技术博客的,我记得好像从1.4开始所有的goroutine不会因为一个goroutine里面执行死循环而分配不到时间片,如果真的是这个原因的话,怎么解决呢?

我想了一个奇巧淫技,在41行后面加上一个time.Sleep(time.Nanosecond),奇迹出现了,日志每隔一秒打印出来。

基本上本次的问题可以确定为goroutine没有被分配到时间片导致的,虽然不能非常肯定,但是至少有效。分享出来,大家遇到这问题的时候,也可以这么解决。

另外,如果大家有自己的看法或者有更好的解决办法,请给我留言或者给我发邮件

坚持原创技术分享,您的支持将鼓励我继续创作!