在使用Python的多进程模块multiprocessing的进程池Pool进行多进程操作时,当在类中把进程池Pool设为类属性self.pool = Pool(),同时把self作为参数传给进程池中的函数的时候会报错(NotImplementedError: pool objects cannot be passed between processes or pickled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Pool
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, num):
print(num)
def run(self):
self.pool = Pool()
self.pool.map(self.printTest, self.numList)
self.pool.close()
self.pool.join()

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Traceback (most recent call last):
File "E:/3.PythonProject/myTool/Source/networkScan/tcpScan.py", line 162, in <module>
Tests(numList).run()
File "E:/3.PythonProject/myTool/Source/networkScan/tcpScan.py", line 145, in run
self.pool.map(self.printTest, self.numList)
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 657, in get
raise self._value
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 431, in _handle_tasks
put(task)
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:\Users\13714\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 535, in __reduce__
'pool objects cannot be passed between processes or pickled'
NotImplementedError: pool objects cannot be passed between processes or pickled

报错提示,Pool池对象不能够在进程间传递或不能被pickle,pickle是Python中的序列化模块,跟其它语言一样,当需要对对象进行持久化存储时,就会用到序列化和反序列化,在php和java中序列化的结果是人类可读的Json串,而pickle结果是人类无法识别的数据,这也起到了私有的作用。

当pickle序列化实例方法时,Python需要pickle整个对象,包括它的实例变量。这些实例变量之一是对象本身,而Pool对象不能进行pickle,因此出现错误。至于Pool对象为什么不能被pickle,本菜还未搞清楚。

查询+探索得知,目前大致有以下几种解决方案

方法一:指定Pool对象不被pickle

当pickle实例方法时,Python需要pickle整个对象,如果将Pool对象写入类属性进行传递,则也包括Pool对象,此时可以通过在对象上实现__getstate__魔术方法,并使用它在pickle之前从实例中删除Pool对象来解决此问题。

这里涉及到和pickle有关的两个魔术方法:

  • __getstate__
  • __setstate__(state)

__getstate__总是在pickle对象之前被调用,它允许你指定对象状态的哪一部分实际上应该被pickle。然后在unpickle时,__setstate__(state)将被调用,如果它实现了(在我们的例子中),或者如果它没有,由__getstate__返回的dict将被用作unpickle实例的__dict__。在上面的例子中,我们显式地设置了dict的__dict__,我们在__getstate__返回,但我们本可以不实现__setstate__和得到相同的效果。

加入__getstate____setstate__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Pool
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, num):
print(num)
def run(self):
self.pool = Pool()
self.pool.map(self.printTest, self.numList)
self.pool.close()
self.pool.join()

def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

可成功执行,输出:

1
2
3
4
5
1
2
3
4
5

在这两个魔术方法中添加输出代码,查看每次执行后的pickle对象内容

1
2
3
4
5
6
7
8
9
def __getstate__(self):
self_dict = self.__dict__.copy()
print(self.__dict__)
del self_dict['pool']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)
print(self.__dict__)

1608635655066.png

通过打印可知,使用del self_dict['pool']后将字典中的Pool对象删除,然后再将得到的字典传给__setstate__(self, state),这样pickle的内容就不会包含Pool对象。

方法二:使用pathos模块代替multiprocessing的Pool

pip安装pathos并行计算模块,使用pathos模块的ProcessPool来代替multiprocessing的Pool:

1
2
3
4
5
6
7
8
9
10
11
12
13
from pathos.pools import ProcessPool as Pool
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, num):
print(num)
def run(self):
self.pool = Pool()
self.pool.map(self.printTest, self.numList)

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

可成功执行,输出:

1
2
3
4
5
1
2
3
4
5

方法三:不将Pool对象写入类属性

上文提到,当pickle实例方法时,Python需要pickle整个对象,如果将Pool对象写入类属性进行传递,则也包括Pool对象。因此修改代码,将Pool对象单独写进run函数,而不写入类属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Pool
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, num):
print(num)
def run(self):
pool = Pool() # 不写入类属性
pool.map(self.printTest, self.numList)
pool.close()
pool.join()

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

可成功执行,输出:

1
2
3
4
5
1
2
3
4
5

方法四:使用@staticmethod静态装饰器

源自Stack Overflow上师傅的回答,回答中提到的Dano’s answer其实就是上述方法一

1608640733012.png

@staticmethod称为静态装饰器,通常类方法需要在实例化类之后才可调用,而加了静态修饰符后的函数,不需要实例化类即可调用,直接类名.方法名()即可,这是因为静态装饰器其实是对外部函数的静态封装,函数内部并不需要表示自身对象的self和自身类的cls参数。

为printTest()函数添加装饰器,run()中正常创建Pool类属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Tests():
def __init__(self, numList):
self.numList = numList

@staticmethod
def printTest(num):
print(num)
def run(self):
self.pool = Pool()
self.pool.map(self.printTest, self.numList)
self.pool.close()
self.pool.join()

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

可成功执行,输出:

1
2
3
4
5
1
2
3
4
5

方法五:声明全局变量pool = Pool()

与方法二对应的是,可以通过global来在外部声明一个全局的变量pool = Pool(),同样可以实现,但是此方法不够模块化,感jio偏离了面向对象的思维

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, dstPort):
print(dstPort)
def run(self):
pool.map(self.printTest, self.numList)
pool.close()
pool.join()

if __name__ == "__main__":
global pool # 声明一个全局变量
pool = Pool()
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

可成功执行,输出:

1
2
3
4
5
1
2
3
4
5

方法六:使用ThreadPool代替multiprocessing.Pool

使用multiprocessing.pool 的 ThreadPool代替multiprocessing.Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing.pool import ThreadPool as Pool
class Tests():
def __init__(self, numList):
self.numList = numList
def printTest(self, num):
print(num)
def run(self):
self.pool = Pool()
self.pool.map(self.printTest, self.numList)

if __name__ == "__main__":
numList = [1, 2, 3, 4, 5]
Tests(numList).run()

因为 ThreadPool 与主线程共享内存,而不是创建新进程,这意味着不需要被pickle。

但是这种方式会触发GIL,不能充分利用CPU资源。

参考

https://stackoverflow.com/questions/25382455/python-notimplementederror-pool-objects-cannot-be-passed-between-processes

pickle:https://www.cnblogs.com/cobbliu/archive/2012/09/04/2670178.html