TL; DR:我们在将Pandas代码与Dask并行化(从同一HDF读取和写入)时遇到问题

我正在一个项目中,该项目通常需要三个步骤:读取,翻译(或合并数据)和写入这些数据。就上下文而言,我们正在处理医疗记录,在这里我们以不同的格式接收索赔,将其转换为标准格式,然后将其重新写入磁盘。理想情况下,我希望以某种形式保存中间数据集,以便以后可以通过Python / Pandas访问。

目前,我已选择HDF作为数据存储格式,但是遇到运行时问题。目前,对于大多数人来说,我的代码可能需要几天的时间。这使我开始调查Dask,但我并不肯定我已将Dask应用于自己的情况。

以下是我的工作流程的一个有效示例,希望有足够的示例数据来了解运行时问题。

读取(在这种情况下为Create)数据

import pandas as pd
import numpy as np
import dask
from dask import delayed
from dask import dataframe as dd
import random
from datetime import timedelta
from pandas.io.pytables import HDFStore

member_id = range(1, 10000)
window_start_date = pd.to_datetime('2015-01-01')
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id]

# Eligibility records
eligibility = pd.DataFrame({'member_id': member_id,
                            'start_date': start_date_col})
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365)
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6])
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4])
(eligibility.set_index('member_id')
 .to_hdf('test_data.h5',
         key='eligibility',
         format='table'))

# Inpatient records
inpatient_record_number = range(1, 20000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number]
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number,
                          'service_date': service_date})
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number))
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number))
(inpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='inpatient',
         format='table'))

# Outpatient records
outpatient_record_number = range(1, 30000)
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number]
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number,
                           'service_date': service_date})
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number))
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number))
(outpatient.set_index('member_id')
 .to_hdf('test_data.h5',
         key='outpatient',
         format='table'))


翻译/写入数据

顺序法

def pull_member_data(member_i):
    inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i))
    outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i))
    return inpatient_slice, outpatient_slice


def create_visits(inpatient_slice, outpatient_slice):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits'
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date')
    visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1))
    return visits_stacked


def save_visits_to_hdf(visits_slice):
    with HDFStore('test_data.h5', mode='a') as store:
        store.append('visits', visits_slice)


# Read in the data by member_id, perform some operation
def translate_by_member(member_i):
    inpatient_slice, outpatient_slice = pull_member_data(member_i)
    visits_slice = create_visits(inpatient_slice, outpatient_slice)
    save_visits_to_hdf(visits_slice)


def run_translate_sequential():
    # Simple approach: Loop through each member sequentially
    for member_i in member_id:
        translate_by_member(member_i)

run_translate_sequential()


上面的代码需要大约9分钟才能在我的计算机上运行。

达克斯方法

def create_visits_dask_version(visits_stacked):
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier
    len_of_visits = visits_stacked.shape[0]
    visits_stacked_1 = (visits_stacked
                        .sort_values('service_date')
                        .assign(visit_id=range(1, len_of_visits + 1))
                        .set_index('visit_id')
                        )
    return visits_stacked_1


def run_translate_dask():
    # Approach 2: Dask, with individual writes to HDF
    inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient')
    outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient')
    stacked = dd.concat([inpatient_dask, outpatient_dask])
    visits = stacked.groupby('member_id').apply(create_visits_dask_version)
    visits.to_hdf('test_data_dask.h5', 'visits')

run_translate_dask()


这种Dask方法需要13秒(!)

尽管这是一个很大的改进,但我们通常对以下几件事感到好奇:


举这个简单的例子,使用Dask数据帧,将它们连接起来并使用groupby / apply是最好的方法吗?
实际上,我们有多个这样的过程,它们从同一HDF读取并写入同一HDF。我们原始的代码库的结构允许一次运行整个工作流member_id。当我们尝试并行化它们时,它有时可以处理少量样本,但是大多数时候会产生分割错误。使用HDF读写这样的并行工作流程是否存在已知问题?我们也正在努力制作一个示例,但想想我们将其发布在此处,以防触发建议(或者该代码是否可以帮助面临类似问题的人)。


任何和所有反馈表示赞赏!

最佳答案

通常,groupby-apply会比较慢。像这样的数据通常具有挑战性,尤其是在内存有限的情况下。

通常,我建议使用Parquet格式(dask.dataframe具有to_和read_parquet函数)。与HDF文件相比,发生段错误的可能性要小得多。

关于python - 使用Dask并行化HDF读-翻译-写,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46497323/

10-12 19:17