提问者:小点点

使用Spark将CSV转换为镶木地板,保留分区


我正在尝试使用Spark将一堆csv文件转换为parque,有趣的是输入的csv文件已经按目录“分区”。所有输入文件都有相同的列集。输入文件结构如下所示:

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

我想用Spark读取这些文件,并将它们的数据写入hdfs中的parque表,保留分区(按输入目录分区),例如每个分区有一个输出文件。输出文件结构应如下所示:

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet

到目前为止,我找到的最好的解决方案是在输入目录之间循环,将csv文件加载到dataframe中,并将dataframe写入parque表中的目标分区中。但这并不高效,因为我希望每个分区都有一个输出文件,写入hdfs是阻止循环的单个任务。我想知道如何以最大的并行性(并且不打乱集群中的数据)实现这一点。

谢谢!


共2个答案

匿名用户

将您的输入目录更改为苛刻dir=苛刻。然后执行:

spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output')

如果您无法重命名目录,您可以使用Hive Metastore。创建外部表和每个目录一个分区。然后加载此表并使用上述模式重写。

匿名用户

到目前为止,我找到的最佳解决方案(没有混洗和与输入目录一样多的线程):

>

  • 创建一个输入目录的rdd,其分区与输入目录一样多

    将其转换为输入文件的rdd(按目录保留分区)

    使用自定义csv解析器平面图

    将rdd转换为dataframe

    将数据帧写入由目录分区的镶木板表

    它需要编写自己的解析器。我找不到使用sc. text file或数据库csv解析器保留分区的解决方案。