Python如何处理复杂CSV文件(csv文件,python,开发技术)

时间:2024-04-29 07:38:07 作者 : 石家庄SEO 分类 : 开发技术
  • TAG :

    项目简介

    鉴于项目保密的需要,不便透露太多项目的信息,因此,简单介绍一下项目存在的难点:

    • 海量数据:项目是对CSV文件中的数据进行处理,而特点是数据量大...真的大!!!拿到的第一个CSV示例文件是110多万行(小CASE),而第二个文件就到了4500万行,等到第三个文件......好吧,一直没见到第三个完整示例文件,因为太大了,据说是第二个示例文件的40多倍,大概二十亿行......

    • 业务逻辑复杂:项目是需要对CSV文件的每一行数据的各种组合可能性进行判断,而判断的业务逻辑较为复杂,如何在解决复杂逻辑的同时保证较高的处理效率是难点之一。

    项目笔记与心得

    1.分批处理与多进程及多线程加速

    • 因为数据量太大,肯定是要分批对数据进行处理,否则,效率低不谈,大概率也没有足够的内存能够支撑,需要用到chunksize,此外,为了节约内存,以及提高处理效率,可以将文本类的数据存储为“category”格式:

    • 项目整体是计算密集型的任务,因此,需要用到多进程,充分利用CPU的多核性能;

    • 多线程进行读取与写入,其中,写入使用to_csv的增量写入方法,mode参数设置为'a';

    • 多进程与多线程开启一般为死循环,需要在合适的位置,放入结束循环的信号,以便处理完毕后退出多进程或多线程

    """鉴于项目保密需要,以下代码仅为示例"""importtimeimportpathlibasplimportpandasaspdfromthreadingimportThreadfrommultiprocessingimportQueue,Process,cpu_count#导入多线程Thread,多进程的队列Queue,多进程Process,CPU核数cpu_count#存放分段读取的数据队列,注:maxsize控制队列的最大数量,避免一次性读取到内存中的数据量太大data_queue=Queue(maxsize=cpu_count()*2)#存放等待写入磁盘的数据队列write_queue=Queue()defread_data(path:pl.Path,data_queue:Queue,size:int=10000):"""读取数据放入队列的方法:return:"""data_obj=pd.read_csv(path,sep=',',header=0,chunksize=size,dtype='category')foridx,dfinenumerate(data_obj):whiledata_queue.full():#如果队列满了,那就等待time.sleep(1)data_queue.put((idx+1,df))data_queue.put((None,None))#放入结束信号defwrite_data(out_path:pl.Path,write_queue:Queue):"""将数据增量写入CSV的方法:return:"""whileTrue:whilewrite_queue.empty():time.sleep(1)idx,df=write_queue.get()ifdfisNone:return#结束退出df.to_csv(out_path,mode='a',header=None,index=False,encoding='ansi')#输出CSVdefparse_data(data_queue:Queue,write_queue:Queue):"""从队列中取出数据,并加工的方法:return:"""whileTrue:whilewrite_queue.empty():time.sleep(1)idx,df=data_queue.get()ifdfisNone:#如果是空的结束信号,则结束退出进程,#特别注意结束前把结束信号放回队列,以便其他进程也能接收到结束信号!!!data_queue.put((idx,df))return"""处理数据的业务逻辑略过"""write_queue.put((idx,df))#将处理后的数据放入写队列#创建一个读取数据的线程read_pool=Thread(target=read_data,args=(read_data_queue,*args))read_pool.start()#开启读取线程#创建一个增量写入CSV数据的线程write_pool=Thread(target=write_data,args=(write_data_queue,*args))write_pool.start()#开启写进程pools=[]#存放解析进程的队列foriinrange(cpu_count()):#循环开启多进程,不确定开多少个进程合适的情况下,那么按CPU的核数开比较合理pool=Process(target=parse_data,args=(read_data_queue,write_data_queue,*args))pool.start()#启动进程pools.append(pool)#加入队列forpoolinpools:pool.join()#等待所有解析进程完成#所有解析进程完成后,在写队列放入结束写线程的信号write_data_queue.put((None,None))write_pool.join()#等待写线程结束print('任务完成')

    2.优化算法提高效率

    将类对象存入dataframe列

    在尝试了n种方案之后,最终使用了将类对象存到dataframe的列中,使用map方法,运行类方法,最后,将运行结果展开到多列中的方式。该方案本项目中取得了最佳的处理效率。

    """鉴于保密需要,以下代码仅为示例"""classObj:def__init__(self,ser:pd.Series):"""初始化类对象:paramser:传入series"""self.ser=ser#行数据self.attrs1=[]#属性1self.attrs2=[]#属性2self.attrs3=[]#属性3def__repr__(self):"""自定义输出"""attrs1='_'.join([str(a)forainself.attrs1])attrs2='_'.join([str(a)forainself.attrs2])attrs3='_'.join([str(a)forainself.attrs3])return'_'.join([attrs1,attrs2,attrs3])defrun(self):"""运行业务逻辑"""#创建obj列,存入类对象data['obj']=data.apply(lambdax:Obj(x),axis=1)#运行obj列中的类方法获得判断结果data['obj']=data['obj'].map(lambdax:x.run())#链式调用,1将类对象文本化->2拆分到多列->3删除空列->4转换为category格式data[['col1','col2','col3',...省略]]=data['obj'].map(str).str.split('_',expand=True).dropna(axis=1).astype('category')#删除obj列data.drop(columns='obj',inplace=True)

    减少计算次数以提高运行效率

    在整个优化过程中,对运行效率产生最大优化效果的有两项:

    • 一是改变遍历算法,采用直接对整行数据进行综合判断的方法,使原需要遍历22个组合的计算与判断大大减少

    • 二是提前计算特征组合,制作成字典,后续直接查询结果,而不再进行重复计算

    使用numpy加速计算

    numpy还是数据处理上的神器,使用numpy的方法,比自己实现的方法效率要高非常多,本项目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了运行效率,又解决了逻辑判断的问题:

    """numpy方法使用示例"""importnumpyasnp#计算数字的个数组合bincountnp.bincount([9,2,13,12,9,10,11])#输出结果:array([0,0,1,0,0,0,0,0,0,2,1,1,1,1],dtype=int64)#取得个数最多的数字argmaxnp.argmax(np.bincount([9,2,13,12,9,10,11]))#输出结果:9#将数字按照个数优先,其次大小进行排序argsortnp.argsort(np.bincount([9,2,13,12,9,10,11]))#输出结果:array([0,1,3,4,5,6,7,8,2,10,11,12,13,9],dtype=int64)#翻转列表flipudnp.flipud(np.argsort(np.bincount([9,2,13,12,9,10,11])))#输出结果:array([9,13,12,11,10,2,8,7,6,5,4,3,1,0],dtype=int64)#查找相同值in1dnp.in1d([2,3,4],[2,9,3])#输出结果:array([True,True,False])注:指2,3True,4Falsenp.all(np.in1d([2,3],[2,9,3]))#输出结果:array([True,True])#是否全是allnp.all(np.in1d([2,3,4],[2,9,3]))#判断组合1是否包含在组合2中#输出结果:Falsenp.all(np.in1d([2,3],[2,9,3]))#输出结果:True

    优化前后的效率对比

    Python如何处理复杂CSV文件

     </div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
    本文:Python如何处理复杂CSV文件的详细内容,希望对您有所帮助,信息来源于网络。
    上一篇:C#中modbus Tcp协议的数据抓取和使用方法下一篇:

    12 人围观 / 0 条评论 ↓快速评论↓

    (必须)

    (必须,保密)

    阿狸1 阿狸2 阿狸3 阿狸4 阿狸5 阿狸6 阿狸7 阿狸8 阿狸9 阿狸10 阿狸11 阿狸12 阿狸13 阿狸14 阿狸15 阿狸16 阿狸17 阿狸18