对于顺序任务可控执行的合理封装

对于Bitabase这个项目,我并没有计划把它做得很复杂。首先分为两个大模块:Bitabase部分承担真正的爬取工作部分,web(还没有想好名字)会渲染一个简单的本地网站用于把数据展示出来,以及提供操作接口以控制爬虫的工作。虽然项目迁移到了macOS/PyCharm的开发环境,但是依然保留着Visual Studio的结构,因为……懒。开玩笑啦,要不是VS的补全实在太慢,也许到现在这还是一个Windows底下开发的项目。

Bitabase底下没有更加细分的模块。interface.py是面向标准库和第三方库的一个封装,理论上其上方的代码就不用再直接引用任何其他库了,全部需求可以由它本身满足;base.py是代码真正的基础,将interface.py里提供的原始的、面向过程的接口改造成为需求量身定制的形状,所有应用层的代码(比如web模块)都不应该越过base而直接引用interface。今天要讲的就是base模块当中最开始的一部分。

根据上一篇文章中所描述的,interface为我提供了一个简单的单工作线程,其实还有一个用来提起网络请求的接口,长成这个样子:

1
2
3
def get_request(url: str, session: requests.Session = None, json: bool = False,
text: bool = True, **kwargs) -> (bool, dict or str or bytes):
pass

嗯,看来高亮不能正确处理换行,编辑器也不能正确处理Tab,真是一个充满恶意的世界呀。

有了以上两种东西以后,问题就变成了如何把它们组装起来。经过考虑我决定把“请求”作为一个任务和产品。这个概念过于抽象,所以我用类比的方式给接口命名,以及写这篇博客。因此,下面的提示非常重要:

注意:本文作者默认读者对于《守望先锋》有基本的了解(世界观、英雄等),为了不影响观看本文的体验,请不满足此假设的读者点击前面的链接提高自身姿势水平,谢谢配合。

好了,这回我们从顶至下地进行游览。一下代码摘自单元测试,可以算是最简单的应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def test_hanzo(self):
after = None
def callback(hanzo):
nonlocal after
after = hanzo
baidu_hanzo = Hanzo('https://baidu.com', callback)
overwatch = Overwatch()
overwatch.conscript([baidu_hanzo])
overwatch.dismiss()
self.assertIsInstance(after, Hanzo)
self.assertIsInstance(after.data, str)
self.assertEqual(after.data[:15], '<!DOCTYPE html>')

可以看到两个基础的类:半藏Hanzo和守望先锋Overwatch。这里面的守望先锋特制那支队伍(“We are overwatch!”),而非游戏本身。每个半藏承载着一个请求,调用一个半藏实例的fire方法(射箭)就会把这个请求发送出去——下层当时是调用了get_request方法。每个半藏还会接受一个回调函数作为构造参数,这个回调函数接受一个半藏实例作为参数,而这个实例就是射完了箭的半藏。在fire当中半藏会用请求返回的数据更新自身,从而供回调函数使用。半藏不是老说“为你效命”吗?这个回调函数就叫servicing,没毛病。

Overwatch本质上就是把worker的接口统一到了一个类里,但是区别在于守望先锋的团队对于它的队员有要求:目前来说只能是半藏。而worker并不在意你把什么东西当做任务推进去了——它只是原封不动地把任务送进worker_method当中去了而已。守望先锋的conscript(应征入伍)接口接受一列半藏作为参数,把它们加入到守望先锋的队伍当中去,并按照顺序调用每一个半藏的fire。而dismiss(解散)则对应着wait_for_worker_finish,阻塞等待半藏们各自射完箭,并将自身的状态更新为dismissed,不再接受更多半藏的入伍请求。retire(退役)接口用于将已经射完箭的半藏送回它所效命的主子那里去,具体的就是调用每一个产品队列里的半藏携带的servicing回调函数。你可以看到上面的例子当中没有出现这个接口,因为在解散守望先锋的时候这个接口会自动被调用——挺合理的。另外这个接口也会在运行过程中周期性的被调用到。

先来看一段具体的代码:

1
2
3
4
5
6
7
8
9
10
11
def __init__(self, max_team_member: int = 0, max_retry: int = 3):
self.worker = create_worker(self._duty)
self.member_count = 0
self.total_hanzo_count = 0
self.max_team_member = max_team_member if max_team_member > 0 else \
Overwatch.max_team_member
self.max_retry = max_retry
self.hanzoes = {}
self.retry_count = {}
self.dismissed = False
self.result = None

这是守望先锋的构造函数。可以看到用来当做worker的回调函数是self._duty,接下来会详细研究它,而其他的一些量中,头三个比较重要。一个守望先锋队伍是有人数上限的,不能无限地让射完了箭的半藏堆在队伍里,这样不利于程序的健壮性。每当半藏的人数达到上限的时候,就要先让完成了工作的半藏们退休,然后再接着射箭:

1
2
3
4
5
6
7
def _duty(self, hanzo_id: int) -> int:
if self.member_count == self.max_team_member:
self.retire()
self.total_hanzo_count += 1
self.hanzoes[hanzo_id].fire()
self.member_count += 1
return hanzo_id

上述逻辑在此表达的比较清楚。这里你可能会有疑问了,说好的把半藏作为任务呢?怎么是传的ID,而把半藏本人给扣在队伍里了?这里就要涉及到实现过程中遇到的第一个坑了。

如前一篇文章中所记述,worker与主线程之间的通信(交换任务和产品)是用过Pipe来实现的。而Pipe在传输对象前要先对其进行序列化。我的理解是为了防止已经进入管道的对象被别的线程修改,造成读取时与原始的状态不一致。而序列化是通过调用pickle来实现的。而pickle……在一篇Stack Overflow的回答当中直接被用了“broken”来形容,我觉得不算夸张。由于半藏实例要包含一个回调函数,因此无法被序列化。经过若干企图“绕过”的尝试失败以后,我只好采取如上的迂回办法。由此对应的入伍方法如下:

1
2
3
def conscript(self, hanzo_list: Iterable[Hanzo]) -> None:
self.hanzoes.update({id(hanzo): hanzo for hanzo in hanzo_list})
append_tasks(self.worker, map(id, hanzo_list))

id的返回值作键,应该是高枕无忧了吧。


先给出退役接口的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def retire(self) -> None:
retry_list = []
for hanzo_id in get_products(self.worker):
if hanzo_id is None:
break
hanzo = self.hanzoes[hanzo_id]
if hanzo.ok:
hanzo.refine()
elif self.dismissed:
# TODO: Log here.
pass
elif self.retry_count.get(hanzo, -1) == self.max_retry:
# TODO: Log here.
pass
else:
self.retry_count[hanzo] = 1 if hanzo not in self.retry_count \
else self.retry_count[hanzo] + 1
hanzo.ok = hanzo.exception = None
retry_list.append(hanzo)
continue
hanzo.servicing(hanzo)
del self.hanzoes[hanzo_id]
self.member_count = 0
self.conscript(retry_list)

是不是比想象中的要复杂?这是因为我写了另一个功能:出错重试。拿到半藏实例之后的大条件分支里一共有一下四种情形:

  • 如果箭射得没问题,在fire当中ok会被设置为True。此时执行refine(精炼)函数,将拿到的纯文本/json对象数据转化成更特异化的形式,以方便主子处理。在Hanzo当中refine就是一个空方法,在子类当中会被覆写。
  • 如果箭没射好但队伍已经解散了,那么也就没有重试的机会了(下层的worker已经关闭,不能接收更多的任务了),所以不重试。
  • 如果已经达到了最大重试上限,那也不重试。
  • 否则,将两个标志着错误的状态(ok == Falseexception != None)清除掉,这样就把半藏洗干净了,可以放回队列里继续用。由于条件分支之后的代码会把半藏送回老家,而进入这条分支的半藏还是个……傻半藏,所以跳过。

最后把队伍人数清零(注意人数是指射完箭的人数哦),把洗完了傻半藏重新推回服役列表当中去。退役仪式圆满结束。

这里可以看到,如果不是队伍的最后几名半藏,那么什么时候被送回主人那里是不确定的。这个模型和思想有点类似于node的异步的感觉,算是玩了几年WEB积累的一点直觉吧。


最后就是退役接口的实现了:

1
2
3
4
5
def dismiss(self) -> None:
self.dismissed = True
self.result = wait_for_worker_finish(self.worker)
# The last group of hanzoes have not been retired.
self.retire()

这里一定要注意更新dismissed必须放在最前面,因为wait_for_worker_finish一上来就会给任务队列推一个None代码没有新任务了,如果之后再通过任何途径调用了retire时又推了新任务,那么这个任务就不可能被执行到。事实上应该是完全推不进去才对……感觉代码又一次不健壮了……


最后分享一个有可能是史上最简陋的mock:

1
2
3
class FakeSession(Session):
def get(self, url, **kwargs):
raise RequestException('This is a test exception.')

我的每一层的网络接口都可以接受一个requests.Session对象来代替直接从requests.get或者类似接口发送请求。最初这么做只是因为这样效率高,没想到有奇效……只要把这个类的实例作为session传进去就可以模拟断网了……于是我的单元测试覆盖率终于达到了100%。

可把我牛逼坏了