暂停两个Python线程,而第三个线程填充(带锁?)
问题内容:
我是并发编程的新手。
我想重复执行三个任务。前两个应该一直运行,第三个应该每小时运行一次。前两个任务可以并行运行,但是我总是想在第三个任务运行时暂停它们。
这是我尝试过的内容的骨架:
import threading
import time
flock = threading.Lock()
glock = threading.Lock()
def f():
while True:
with flock:
print 'f'
time.sleep(1)
def g():
while True:
with glock:
print 'g'
time.sleep(1)
def h():
while True:
with flock:
with glock:
print 'h'
time.sleep(5)
threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
我希望这段代码每秒打印一次f和ag,大约每五秒钟打印一次h。但是,当我运行它时,大约需要12 f和12
g才能开始看到一些h。看起来前两个线程不断释放并重新获取它们的锁,而第三个线程则被排除在循环之外。
- 这是为什么?当第三个线程尝试获取当前持有的锁,然后释放该锁时,不是应该立即获得成功,而不是第一个/第二个线程立即再次获取它?我可能误会了一些东西。
- 什么是实现我想要的好方法?
注意:time.sleep(1)
对于这个简单的示例,将调用移出with flock /
glock块是可行的,但显然不适用于我的实际应用程序,在该应用程序中,线程将大部分时间用于实际操作。当循环主体每次执行后前两个线程休眠一秒钟,并且释放了锁时,第三个任务仍然永远不会执行。
问题答案:
如何使用threading.Events:
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(resume,is_waiting,name):
while True:
if not resume.is_set():
is_waiting.set()
logger.debug('{n} pausing...'.format(n=name))
resume.wait()
is_waiting.clear()
logger.info(name)
time.sleep(1)
def h(resume,waiters):
while True:
logger.debug('halt')
resume.clear()
for i,w in enumerate(waiters):
logger.debug('{i}: wait for worker to pause'.format(i=i))
w.wait()
logger.info('h begin')
time.sleep(2)
logger.info('h end')
logger.debug('resume')
resume.set()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
# set means resume; clear means halt
resume = threading.Event()
resume.set()
waiters=[]
for name in 'fg':
is_waiting=threading.Event()
waiters.append(is_waiting)
threading.Thread(target=f,args=(resume,is_waiting,name)).start()
threading.Thread(target=h,args=(resume,waiters)).start()
产量
[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt
(针对评论中的问题)此代码尝试测量h
-thread从其他工作线程获取每个锁所花费的时间。
似乎表明,即使h
正在等待获取锁,另一个工作线程也可能以相当高的概率释放并重新获取锁。没有优先权h
只是因为它等待了更长的时间。
David
Beazley在PyCon上介绍了有关线程和GIL的问题。这是幻灯片的pdf格式。这是一本引人入胜的书,也可能有助于解释这一点。
import threading
import time
import logging
logger=logging.getLogger(__name__)
def f(lock,n):
while True:
with lock:
logger.info(n)
time.sleep(1)
def h(locks):
while True:
t=time.time()
for n,lock in enumerate(locks):
lock.acquire()
t2=time.time()
logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
t=t2
t2=time.time()
logger.info('h {d}'.format(d=t2-t))
t=t2
for lock in locks:
lock.release()
time.sleep(5)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
locks=[]
N=5
for n in range(N):
lock=threading.Lock()
locks.append(lock)
t=threading.Thread(target=f,args=(lock,n))
t.start()
threading.Thread(target=h,args=(locks,)).start()