怎么使用python3线程池ThreadPoolExecutor处理csv文件数据(csv,python3,threadpoolexecutor,开发技术)

时间:2024-05-05 11:19:35 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

背景

由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过 Python 脚本进行修改

知识点

Python3、线程池、pymysql、CSV 文件操作、requests

拓展

当我们程序在使用到线程、进程或协程的时候,以下三个知识点可以先做个基本认知

CPU 密集型、IO 密集型、GIL 全局解释器锁

pip3 install requests

pip3 install pymysql

流程

怎么使用python3线程池ThreadPoolExecutor处理csv文件数据

实现代码

#-*-coding:utf-8-*-#@FileName:grade_update.py#@Desc:在一台超级计算机上运行过的牛逼Python代码importtimefromconcurrent.futuresimportThreadPoolExecutor,FIRST_COMPLETED,waitimportrequestsimportpymysqlfromprojectPathimportpathgradeId=[4303,4304,1000926,1000927]defwrit_mysql():"""数据库连接"""returnpymysql.connect(host="localhost",port=3306,user="admin",password="admin",database="test")defoprationdb(grade_id,member_id):"""操作数据库"""db=writ_mysql()try:cursor=db.cursor()sql=f"UPDATE`t_m_member_grade`SET`current_grade_id`={grade_id},`modified`=now()WHERE`member_id`={member_id};"cursor.execute(sql)db.commit()print(f"提交的SQL->{sql}")exceptpymysql.Errorase:db.rollback()print("DB数据库异常:",e)db.close()returnTruedefinterface(rows,thead):"""调用第三方接口"""print(f"处理数据行数--->{thead}----数据--->{rows}")try:url="http://xxxx/api/xxx-data/Tmall/bindQuery"body={"nickname":str(rows[0]),"seller_name":"test","mobile":"111"}heade={"Content-Type":"application/x-www-form-urlencoded"}res=requests.post(url=url,data=body,headers=heade)result=res.json()ifresult["data"]["status"]in[1,2]:grade=result["data"]["member"]["level"]grade_id=gradeId[grade]oprationdb(grade_id=grade_id,member_id=rows[1])returnTruereturnTrueexceptExceptionase:print(f"调用异常:{e}")defread_csv():importcsv#db=writ_mysql()#线程数MAX_WORKERS=5withThreadPoolExecutor(MAX_WORKERS)aspool:withopen(path+'/file/result2_colu.csv','r',newline='',encoding='utf-8')asf:#set()函数创建无序不重复元素集seq_notdone=set()seq_done=set()#使用csv的reader()方法,创建一个reader对象reader=csv.reader(f)n=0forrowinreader:n+=1#遍历reader对象的每一行try:seq_notdone.add(pool.submit(interface,rows=row,thead=n))iflen(seq_notdone)>=MAX_WORKERS:#FIRST_COMPLETED文档说明--Returnwhenanyfuturefinishesoriscancelled.done,seq_notdone=wait(seq_notdone,return_when=FIRST_COMPLETED)seq_done.update(done)exceptExceptionase:print(f"解析结果出错:{e}")#db.close()return"完成"if__name__=='__main__':read_csv()

解释

引入线程池库

from concurrent.futures import ThreadPoolExecutor,FIRST_COMPLETED,wait

pool.submit(interface, rows=row, thead=n)

提交任务,interface 调用的函数,rows、thead 为 interface() 函数的入参

任务持续提交,线程池通过 MAX_WORKERS 定义的线程数持续消费

说明像这种 I/O 密集型的操作脚本适合使用多线程,如果是 CPU 密集型建议使用进行,根据机器核数进行配置

 </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
本文:怎么使用python3线程池ThreadPoolExecutor处理csv文件数据的详细内容,希望对您有所帮助,信息来源于网络。
上一篇:css3中rotate3d方法如何用下一篇:

8 人围观 / 0 条评论 ↓快速评论↓

(必须)

(必须,保密)

阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18