Python多段程和多过程(三) 进程同歩之标准自变

摘要:最先,标准自变量务必要相互配合互斥锁应用,由于标准自变量是一种多段程市场竞争的共享资源資源。根据标准自变量能够完成等候和通告的体制。最基本的应用方法为:cond = Condit...

最先,标准自变量务必要相互配合互斥锁应用,由于标准自变量是一种多段程市场竞争的共享资源資源。
根据标准自变量能够完成等候和通告的体制。

最基本的应用方法为:

cond = Condition()  # 建立一个标准自变量
cond.acquire()      # 给标准自变量锁上
cond.wait()         # 等候,会堵塞下边的编码实行,当别的进程启用notify的情况下才会被唤起
do_something()      
cond.notify()       # 通告和唤起别的应用了标准自变量cond的进程
cond.release()

 

在其中:
cond.acquire()  实际上便是用一个互斥锁锁上,非常于 lock.acquire()
cond.wait()     会干三件事:1.会释放出来互斥锁,让别的应用了同样标准自变量的进程能够取得锁来运作。2.进到堵塞休眠状态情况,让出CPU。3.当本进程的wait()被别的进程notify唤起后,wait()会再次取得锁。
cond.notify()    通告别的进程,唤起别的进程的wait()。
cond.release()  释放出来互斥锁

wait()和release()务必在锁内实行。假如不获得锁就立即启用wait()和release()便会出错


PS:应用 
with cond:
    if my_condition:
        cond.wait()
    do_something()
    cond.notify()
   
和 
cond.acquire()
if my_condition:
    cond.wait()
do_something()
cond.notify()
cond.release()

是一样的。

应用with就非常于在编码外边包了一层锁。


事例1:操纵多段程井然有序实行
有2个人A,B开展会话,各自说1~6这好多个数据,规定A先说,A讲完B才可以说,B讲完A才可以再聊
A:1
B:2
A:3
B:4
A:5
B:6

from threading import Condition,Thread
cond = Condition()
a_say = [1,3,5]
b_say = [2,4,6]
class A(Thread):
    def __init__(self,cond,say):
        super(A,self).__init__(name="A")
        self.cond = cond
        self.say = say
    def run(self):
        self.cond.acquire()
        for i in range(len(self.say)):
            print("%s say %d" % (self.name,self.say.pop(0)))
            self.cond.notify()  # A讲完就需要通告B,让B刚开始说
            if len(self.say):
                self.cond.wait()    # A讲完也不能在说,只是等候B讲完,等B通告到A,A才可以再次说
        self.cond.release()

        for i in range(len(self.say)):             self.cond.wait()    # 一刚开始是A先说而并不是B先说,因此一刚开始B是处在等候情况             print("%s say %d" % (self.name,self.say.pop(0)))             self.cond.notify()  # B讲完就需要通告A,让A再次说         self.cond.release() if __name__=="__main__":     a = A(cond,a_say)     b = B(cond,b_say)     b.start()       # 务必让b进程先起动,a后起动,假如a先起动,那麼a会在b沒有实行wait()的状况下实行notify(),因此这一notify()通告非常于失效。以后a实行wait().b也实行wait()彼此都处在等候,因此这一过程就卡住     a.start()


   
   

这儿应用了一个标准自变量来完成


自然,大家还可以应用好几个互斥锁来完成多段程的井然有序实行,前边互斥锁的事例中早已展现。

 


事例2:用互斥锁,标准自变量和目录完成一个进程安全性的序列:
 

# coding=utf-8
from threading import Thread
from threading import Lock,Condition
import random
class ThreadSafeQueue:
    def __init__(self,max_size=0,blocking=True,timeout=None):  # 默认设置序列沒有限定较大室内空间
        self.max_size=max_size
        self.blocking=blocking  # 默认设置等候堵塞
        self.timeout=timeout    # 默认设置等候時间无尽长
        self.lock = Lock()
        self.cond = Condition(lock=self.lock)  # 这一标准自变量所应用的锁是自定的互斥锁,而不应用Condition內部界定的重入锁
        self.queue = []
    def size(self):     # self.queue是进程共享资源資源,全部有关self.queue的应用必须加锁,包含查和改
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()
        return size
    def batch_push(self,items):
        if not isinstance(items,list):
            items=list(items)
        for item in items:
            self.push(item)
    def push(self,item):
        self.cond.acquire()
        while self.max_size 0 and len(self.queue) =self.max_size:
            if self.blocking:
                res = self.cond.wait(timeout=self.timeout)    # 假如超出timeout还没有被唤起,则回到False
                
                if not res:
                    self.cond.release()
                    return False
            else:
                self.cond.release()
                return False
        self.queue.append(item)
        self.cond.notify()
        self.cond.release()

        item = self.queue.pop()         self.cond.notify()          # 通告生产制造者能够再次生产制造         self.cond.release()         return item     def get(self,index):         self.lock.acquire()         try:             item = self.queue[index]         except:             item=None         self.lock.release()         return item # 生产制造者 def produce(q,n):     for i in range(100000):         q.push(i)         print("Thread %d push %d" % (n,i)) def consumer(q,n):     count_none = 0  # 假如q.pop()堵塞频次超过10则终止while循环系统     while True:         item = q.pop()         if item is False:             count_none+=1         else:             count_none=0             print("Thread %d pop %d" % (n,item))         if count_none =10:             break
if __name__=="__main__":     queue = ThreadSafeQueue(1000)       # 检测堵塞序列,結果是,消費者消費完全部商品后堵塞等候新品生产制造,一直处在等候情况     # queue = ThreadSafeQueue(1000,timeout=1)       # 检测堵塞序列,結果是,消費者消費完全部商品后堵塞等候新品生产制造,堵塞10次后全自动跳出来循环系统     # queue = ThreadSafeQueue(1000,blocking=False)    # 检测非堵塞序列,結果是,生产制造者因为数次被堵塞而舍弃了许多次生产制造商品,消費者消費完全部商品后立即完毕     # 建立2个生产制造者进程,一个消費者进程,促使生产制造商品的速率比消費商品的速率快,那样消費商品不容易等候,为之产商品会等候     t1 = Thread(target=produce,args=(queue,1))     t2 = Thread(target=produce,args=(queue,2))     t3 = Thread(target=consumer,args=(queue,3))     t1.start()     t2.start()     t3.start()     t1.join()     t2.join()     t3.join()  

 
   
   


下边说一下Condition的最底层是如何完成的:
1.案例化 Condition 的情况下,Condition的__init__会转化成一个 RLock 重入锁,这一锁用以维护标准自变量目标的应用。大家叫这把锁为 R

2.实行wait()前务必实行cond.acquire()对标准自变量锁上,上的锁便是 R;
  实行wait() 的情况下,wait()干了那么几个事:
  2-1. wait()会建立一个互斥锁,大家把这一互斥锁称为X,并对X启用acquire()锁上: X.acquire(),以后将锁X放进一个双重序列 Q 中。
  2-2. wait()释放出来锁 R,那样别的进程才可以获得锁R并实行一些每日任务编码
  2-3. wait()在释放出来 R以后,会再对X上一次锁,X.acquire() ; 因为持续对X上2次锁,因此会产生死链接,那样wait()进到堵塞情况。
  因此 标准自变量的wait() 是根据死链接的方法来完成堵塞等候的作用的!!
 
  2-4.wait()的锁X被别的进程的notify()释放出来后,会再次对R锁上,锁X就从此不容易被应用。下一次启用wait()的情况下会转化成一把新的X锁
 
3.别的进程获得到锁R,并实行一些每日任务编码,以后实行notify()唤起以前哪个进程的wait()
  notify()干了那么几个事:
  3-1. 从序列 Q 头顶部弹出来锁 X,释放出来锁X。根据释放出来锁X完成唤起wait()的。以后这一锁X就始终不容易被采用了
 
 
小结: Condition的完成用了两把锁:
__init__()时建立的重入锁R 和 wait()是建立的互斥锁X
R用以维护标准自变量和一些共享资源自变量的进程安全性
X无需于进程安全性,只是用以生产制造死链接达到堵塞实际效果

R会反复应用,X是一次性应用,每一次会转化成新的X


下边贴出来 Condition中__init__,wait()和notify()的源代码:
 

class Condition:
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()      # 建立一个重入锁 R。假如手动式传到 lock 则应用客户传到的lock。
        self._lock = lock
        self.acquire = lock.acquire
        self.release = lock.release
        
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()
    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()       #### 互斥锁 X ####
        waiter.acquire()                #### 对 X 锁上 ####
        self._waiters.append(waiter)    #### 将 X 加上到序列 Q ####
        saved_state = self._release_save()      #### 释放出来锁 R ####
        gotit = False
        try:    
            if timeout is None:
                waiter.acquire()        #### 对互斥锁 X 第二次锁上,达到死链接,完成了堵塞 ####
                gotit = True
            else:
                if timeout 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)      #### X被释放出来后,对R再次锁上 ####
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()        #### 释放出来锁 X ####
            try:
                all_waiters.remove(waiter)      #### 将锁X从序列Q中弹出来 ####
            except ValueError:
                pass
    


   

张柏沛IT技术性blog > Python多段程和多过程(三) 进程同歩之标准自变量

点一下拷贝转截该一篇文章



联系我们

全国服务热线:4000-399-000 公司邮箱:343111187@qq.com

  工作日 9:00-18:00

关注我们

官网公众号

官网公众号

Copyright?2020 广州凡科互联网科技股份有限公司 版权所有 粤ICP备10235580号 客服热线 18720358503

技术支持:如何开发小程序