python 多线程知识全面解析
Python编程学堂 4天前
非阻塞启动线程
import threadingimport timedef one_thread(name,id):print("start....")print(name)print(id)time.sleep(5)print("end...")print("start thread")threading.Thread(target=one_thread, args=(), kwargs={"name": 111, "id": 222}).start()# args是一个list# kwargs是一个字典,需要对应函数的keyprint("end thread")
得到值如下,线程启动函数后,非阻塞执行
start threadstart....111222end threadend...
多线程并发处理
import threadingimport timeclass myThread(threading.Thread):def __init__(self, threadID, name):threading.Thread.__init__(self)self.threadID = threadIDself.name = namedef run(self):print_time(self.threadID, self.name)num = 0def print_time(threadID, name):global num# 每一个线程循环10次,最终总循环次数为30次for i in range(10):print("start run")time.sleep(2)print(i)num += 1print("thread_id=%s:name=%s" % (threadID, name))if __name__ == '__main__':threads = []# 新增三个线程for i in range(3):name = "Thread-%d" % it = myThread(i, name)t.start()threads.append(t)for t in threads:t.join()print("所有线程执行完毕")print("总循环次数为:%s" % num)
打印结果:每次运行三个线程,每个线程循环打印10次
start runstart runstart run00...thread_id=1:name=Thread-1所有线程执行完毕总循环次数为:30
多线程共享资源,可以使用全局变量
global
多线程加锁
对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间
# -*- coding: utf-8 -*-import timeimport threading# 创建锁对象lock = threading.Lock()num = 0def run(n):global numfor i in range(10):# 加锁 为了确保下面代码只能由一个线程从头到尾的执行# 会阻止多线程的并发执行,所以效率会大大降低"""lock.acquire()try:num = num - nnum = num + nfinally:# 解锁lock.release()"""with lock:time.sleep(2)print("start")num = num + 1print("==============")if __name__ == '__main__':t1 = threading.Thread(target=run,args=(6,))t2 = threading.Thread(target=run,args=(9,))t1.start()t2.start()t1.join()t2.join()print("num = %s"%(num))
打印结果是每次只能运行一个线程
start==============...num = 20
多线程与队列
我们经常会遇到这样的一个问题,这里有成千上万条数据,每次需要取出其中的一条数据进行处理,那么引入多线程该怎么进行任务分配?
我们可以将数据进行分割然后交给多个线程去跑,可是这并不是一个明智的做法。在这里我们可以使用队列与线程相结合的方式进行任务分配。
队列线程的思想:首先创建一个全局共享的队列,队列中只存在有限个元素,并将所有的数据逐条加入到队列中,并调用队列的join函数进行等待。之后便可以开启若干线程,线程的任务就是不断的从队列中取数据进行处理就可以了。
import threadingimport timeimport queueq = queue.Queue(10)threadLock = threading.Lock()class myThread(threading.Thread):def __init__(self, threadID, name):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.exitFlag = 0def run(self):while not self.exitFlag:threadLock.acquire()if not q.empty():id = q.get()print_time(self.name, id)threadLock.release()else:threadLock.release()def print_time(threadName, id):print ("%s:%s:%s"%(threadName,time.ctime(time.time()),id))# pass# 创建3个线程threads = []for i in range(3):name = "Thread-%d" % it = myThread(i, name)t.start()threads.append(t)print(threads)# 新增队列数据for i in range(10000):q_name = "Queue:%d" % iq.put(q_name)# 等待队列清空while not q.empty():pass# 也可以join方法,与上同效# q.join()# 通知线程,处理完之后关闭for t in threads:t.exitFlag = 1# 等待所有线程结束之后才退出for t in threads:t.join()print("Exiting Main Thread")
这里必须要在判断
q.empty()前加上线程锁,因为可能会出现这样的一种情况。某一时刻,队列中还有一个元素,该元素正在被线程A取出,而与此同时线程B正在判断队列q是否为空,而此时线程B中队列q不为空进入后面的操作,但是待B去取元素时,最后一个元素已经被A取出,造成线程等待,显示出被挂起的状态。
我们也可以通过加入
q.get(timeout=10)超时操作来弥补这一问题。打印的结果
[<myThread(Thread-0, started 6568)>, <myThread(Thread-1, started 7724)>, <myThread(Thread-2, started 7796)>]Thread-1:Sat Aug 22 11:36:29 2020:Queue:0Thread-1:Sat Aug 22 11:36:29 2020:Queue:1...Thread-1:Sat Aug 22 11:36:30 2020:Queue:9998Thread-1:Sat Aug 22 11:36:30 2020:Queue:9999Exiting Main Thread
ThreadPoolExecutor线程池的使用
锁依然可以运用到线程池
map的使用,接受一个List的数据,会循环调用
from concurrent.futures.thread import ThreadPoolExecutorimport timenum = 0def print_time(data): global num num += 1 time.sleep(2) print("start_%s" % data) print("============")data = []for i in range(50): data.append(i)with ThreadPoolExecutor(10) as pool: result = pool.map(print_time, data)# 等待所有线程执行完毕for i in result: passprint("循环次数=%s" % num)
打印结果为:每次启动10个线程,启动了5次
============start_46start_49========================循环次数=50
submit接受list的数据,也可以接受字典
from concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures import as_completedimport timedef print_time(data):time.sleep(2)print("start_%s" % data)print("============")data = []for i in range(50):data.append(i)with ThreadPoolExecutor(10) as executor:future_list = []for i in range(10):# future = executor.submit(print_time,data)future = executor.submit(print_time, {"name": 111, "id": 222})future_list.append(future)for res in as_completed(future_list): # 这个futrure_list是你future对象的列表print(res.result())
作者:望月成三人
链接:https://www.jianshu.com/p/74d3119903fc
来源:简书
本文来源于简书,著作权归作者所有。仅用于学习交流,不进行商业用途,如需引用或转载,请注明原出处。如有侵权,请联系删除。
不喜欢不看的原因确定内容质量低不看此公众号
赞 (0)
