简单的可暂停单工作线程模型

写完标题发现又是一篇《简单xxx》的文章。写个啥都是简单版,我好菜啊。

我好菜啊

这篇文章涉及到的线程模型主要的使用场景是爬虫线程。作为一个高大上的爬虫,有一个问题一定要处理好的:突然断网怎么办。这也许就是一个每天晚上宿舍熄灯的人说不出的痛。我倒不是很在乎速度,使用阻塞式的网络请求也没关系;也不太在意CPU利用率,所以哪怕是主线程等待我也不着急。但问题就是我希望我的爬虫是“可暂停”的:不仅仅可以冻结当前状态,而且还能把部分成果拿出来。因此,一个单工作线程的“线程池”模型横空出世。【虽然后来也不是线程池了

使用单线程工作的好处是省了很多锁,而且并没有损失拓展性:反正以后要是等不及了再在这个线程底下开线程池也没关系是不是。

之前在写的时候通过查询大量Stackoverflow的问题以及官方资料(这玩意好像没有官方文档),成功地绕了一个弯:为单个的工作线程开了一个multiprocessing.pool.ThreadPool,据说这样的好处是可以方便的拿到返回值:

1
2
3
4
5
6
7
8
9
10
def create_worker(something):
# ...
pool = ThreadPool(processes=1)
async_result = pool.apply_async(real_worker)
# ...
return {
'pool': pool,
'result': async_result,
# ...
}

注意:写的时候有点欠考虑,这样引用代码也许不太便于理解。可以参考这次修改的commit,谢谢配合。

然后在工作结束的时候结束进程并拿返回值:

1
2
3
4
5
def wait_for_worker_finish(worker):
# ... here is some code tells worker there's no more work.
result = worker['result'].get()
# ... some code update worker's state
return result

但是这个返回值并不是worker真正的产品(虽然这是ThreadPool的正确用法),因为这个返回值只有等全部的工作都做完了才能拿到,中途暂停(等会再说我是怎么实现暂停的)的时候是拿不到的。所以又开了两个multiprocessing.Queue,分别用来往worker里传任务和从worker往外传产品:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def create_worker(worker_method):
tasks_queue = Queue()
products_queue = Queue()
# ...
def real_worker():
while True:
# ...
task = tasks_queue.get()
# ...
product = worker_method(task)
# ...
products_queue.put(product)
# ...
worker = {
# ...
'in_queue': tasks_queue,
'out_queue': products_queue,
# ...
}

希望你还能把这个函数和上面那个合在一起。这样一来,真正传进来的worker_method所要实现的逻辑就很简单了:接收一个参数作为任务,返回产品。它还可以任性地抛异常——real_worker里的主循环当中省略的大部分代码就是为了处理这个的。这样一来随时增派新任务和获取已经完成部分的产品就变得非常容易了:只要传worker作为参数,取得对应的队列调用库的接口即可。具体可以参考官方文档。

real_worker的主循环中做的另一件事就是实现暂停功能,这是通过一对扭在一起的multiprocessing.Event实现的。具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def create_worker():
# ...
working = Event()
working.set()
pausing = Event()
def real_worker():
while True:
pausing.set()
working.wait()
pausing.clear()
# ...
return {
# ...
'work_event': working,
'pause_event': pausing,
# ...
}
def pause_worker(worker: dict) -> bool:
# ...
worker['work_event'].clear()
# ...
# Wait for worker finishing current task.
worker['pause_event'].wait()
return True

之所以还需要一个pause_event主要是因为一个BUG:最后一个任务可能会在worker被暂停以后被推进产品队列当中去。有了这个事件,pause_worker会一直等到最后一个产品被做完以后才返回,就不会出现莫名丢产品的情况了。

这里省略的代码主要是用来确定当前worker状态的两个布尔值:is_workingis_dead。这两个值不仅可以用来作安全检查,还可以对外提供一个友善的接口。


我跟你讲,以上是背景资料……整整写了1024个字……我也深表歉意。希望你能对接下来的正文依然充满了兴趣。

今天原本是要在上述的基础接口之上建立一层面向对象的封装,并写一篇类似于《半藏与守望先锋不得不说的故事》之类的文章【误。但是在我点了单元测试之后(单元测试爽过吸大麻),有一个API接口的测试挂了。我当即大吃一惊,难道这么几天哔哩哔哩还改了接口?仔细一看更吃惊了,只见这是用来读取多P视频各分P名字的接口,然而前两P的名字顺序反了!这是什么情况?于是我来到B站上一看:

截图

是的你没有看错,就是大名鼎鼎的保加利亚巫妖王……前两P的名字居然交换了次序!这怎么可能呢?由于我在几个星期前确确实实跑了几十遍单元测试通过了,所以才如此的确定。反正我是百思不得其解并兴奋地发了一条动态:

由于我的爬虫单元测试fail了,我可能是世界上第一个发现av170001前两P名字对调了的人。

好吧我要讲的不是这个。我要说的是,我手贱点了一下“Concurrency Diagram”的按钮以后,看到了一幅恐怖的景象:

并发测试图#1

怎么有线程没有停止?经过反复测试,我发现当我把单线程的线程池里的线程数改成4的时候,没有停止的线程数变成了四倍:

并发测试图#2

我的心好痛

我明明在等待停止worker的函数里调用了wait,甚至还调用了close啊,为什么会这样呢?然后我又发现,所有没有停止的线程全都有一个同名且正常停止的线程。就像前两张图里画的那样。

What the f*ck

经过一圈Google无果之后(这个情况复杂到我不知道如何描述),我尝试着在单元测试里把各个节点的活跃线程数打印出来,然后发现,它每次运行的结果不一样……

我的心好痛*2

这我又能怎么办呢?我只好决定把线程池改成一个单独的线程。还记得我为什么要用线程池吗?“可以方便地拿到返回值”。于是我尝试在worker里加一个字段用来存返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def create_worker(something):
# ...
result = None
def real_worker():
nonlocal result
# ... decide to return
result = True, None
return
return {
# ...
'result': result,
# ...
}

这里result的结构是按照ok, exception的约定。然后我就发现,对result的修改好像不起作用……冥冥中的直觉让我把它改成了这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def create_worker(something):
# ...
result = {'value': None}
def real_worker():
nonlocal result
# ... decide to return
result['value'] = True, None
return
return {
# ...
'result': result,
# ...
}

是的你没有猜错,好了。也许你会说“稍有常识……”,没错我就是没有常识。


下一个坑还要深一点点。在我把线程池改成线程以后,图表变成了这样:

并发测试图#3

看起来好多了,只不过两条队列并没有正常地终止,每一次都要等到下一个测试用例之后才结束,最后一个测试甚至延伸了好久。经过一番操作我把它们替换成了multiproccessing.Pipe,也就是一对Connection。如果你有经验应该已经猜到了,我在worker结束以后取结果的代码抛了EOFError。原因是当worker线程结束的时候它所对应的Connection被回收掉了。于是我只好把worker改成了这样:

1
2
3
4
5
6
7
8
9
10
11
worker = {
'worker': worker,
'result': result,
'connect': host_conn,
'work_event': working,
'pause_event': pausing,
'is_working': True,
'is_dead': False,
# this prevents auto-close
'_worker_connection': worker_conn,
}

顺便加上了前面提到过的所有东西。于是,我的努力最终达到了效果:

并发测试图#4

没有多余线程啦!

鼓掌

最后,感谢每一个看完这篇不知所云文章的人。这里是这次commit的地址,你可以看一看完整版的代码。下一篇文章应该可以写我是如何构建上层抽象的了……大概吧。