我正在使用dask来读写镶木地板。我正在使用fastparquet引擎写作,并使用pyarrow引擎阅读。
我的工人有1 GB的内存。使用fastparquet时,内存使用情况很好,但是当我切换到pyarrow时,它会炸毁并导致工作人员重新启动。
我下面有一个可复制的示例,该示例在内存为1gb的工作者上无法使用pyarrow。
实际上,我的数据集比这大得多。使用pyarrow的唯一原因是,与fastparquet相比,它在扫描时可以提高速度(大约7x-8x)

敏捷:0.17.1

pyarrow:0.9.0.post1

快速拼花:0.1.3

import dask.dataframe as dd
import numpy as np
import pandas as pd

size = 9900000
tmpdir = '/tmp/test/outputParquet1'

d = {'a': np.random.normal(0, 0.3, size=size).cumsum() + 50,
    'b': np.random.choice(['A', 'B', 'C'], size=size),
    'c': np.random.choice(['D', 'E', 'F'], size=size),
    'd': np.random.normal(0, 0.4, size=size).cumsum() + 50,
    'e': np.random.normal(0, 0.5, size=size).cumsum() + 50,
    'f': np.random.normal(0, 0.6, size=size).cumsum() + 50,
    'g': np.random.normal(0, 0.7, size=size).cumsum() + 50}
df = dd.from_pandas(pd.DataFrame(d), 200)
df.to_parquet(tmpdir, compression='snappy', write_index=True,
         engine='fastparquet')

#engine = 'pyarrow' #fails due to worker restart
engine = 'fastparquet' #works fine
df_partitioned = dd.read_parquet(tmpdir + "/*.parquet", engine=engine)
print(df_partitioned.count().compute())
df_partitioned.query("b=='A'").count().compute()

编辑:我的原始设置正在运行spark作业,该作业使用fastparquet将数据并行写入分区。因此,元数据文件是在最里面的分区而不是父目录中创建的,因此使用glob路径而不是父目录(fastparquet在读取父目录时要快得多,而pyarrow在使用glob路径进行扫描时会胜出)

最佳答案

我建议选择read_parquet调用中需要的列

df = dd.read_parquet('/path/to/*.parquet', engine='pyarrow', columns=['b'])

这样,您就可以高效地仅读取需要的几列,而不是一次读取所有列。

关于dask - 带有ready内存的dask read_parquet炸毁,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50873502/

10-12 23:24