处理Spark数据框中的非均匀JSON列


问题内容

我想知道将换行符分隔的JSON文件读入数据帧的最佳实践是什么。至关重要的是,每个记录中的(必填)字段之一映射到一个不能保证具有相同子字段的对象(即,架构在所有记录中都是不一致的)。

例如,输入文件可能看起来像:

{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}
{"id": 3, "type": "baz", "data": {"key3": "moo"}}

在这种情况下,字段idtypedata将出现在所有记录中,但映射到的结构data将具有异构模式。

我有两种方法可以解决data列的不均匀性:

  1. 让火花来推断模式:

    df = spark.read.options(samplingRatio=1.0).json(‘s3://bucket/path/to/newline_separated_json.txt’)

这种方法的明显问题是需要对 每条
记录进行采样,以确定最终方案的字段/方案的超集。给定一个数据集的记录数以亿计的低记录​​,这可能会过高地昂贵?要么…

  1. 告诉火花数据字段对象转换成JSON字符串,然后只是有一个架构由三个顶级字符串字段的,idtypedata。在这里,我不太确定最好的进行方法。例如,我假设仅声明该data字段为字符串,如下所示,将不起作用,因为它没有明确地执行json.dumps?的等效操作。

    schema = StructType([
    StructField(“id”, StringType(), true),
    StructField(“type”, StringType(), true),
    StructField(“data”, StringType(), true)
    ])
    df = spark.read.json(‘s3://bucket/path/to/newline_separated_json.txt’, schema=schema)

如果我想避免扫描选项1引起的整个数据集的开销,什么是摄取此文件并将data字段保留为JSON字符串的最佳方法?

谢谢


问题答案:

我认为您的尝试和总体构想是朝着正确的方向发展的。这是基于aka get_json_object/内置选项(from_json通过dataframe
API)map以及基于pythonjson.dumps()json.loads()RDD API的转换的另外两种方法。

选项1: get_json_object() / from_json()

首先让我们尝试一下get_json_object()不需要模式的方法:

import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

df.select(f.get_json_object("value", "$.id").alias("id"), \
          f.get_json_object("value", "$.type").alias("type"), \
           f.get_json_object("value", "$.data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

相反,from_json()需要架构定义:

from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f

df = spark.createDataFrame([
  ('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
  ('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
  ('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

df.select(f.from_json("value", schema).getItem("id").alias("id"), \
         f.from_json("value", schema).getItem("type").alias("type"), \
         f.from_json("value", schema).getItem("data").alias("data"))

# +---+----+-----------------------------+
# |id |type|data                         |
# +---+----+-----------------------------+
# |1  |foo |{"key0":"foo","key2":"meh"}  |
# |2  |bar |{"key2":"poo","key3":"pants"}|
# |3  |baz |{"key3":"moo"}               |
# +---+----+-----------------------------+

选项2:map / RDD API + json.dumps()

from pyspark.sql.types import StringType, StructType, StructField
import json

df = spark.createDataFrame([
  '{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
  '{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
  '{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())

def from_json(data):
  row = json.loads(data[0])
  return (row['id'], row['type'], json.dumps(row['data']))

json_rdd = df.rdd.map(from_json)

schema = StructType([
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StringType(), True)
])

spark.createDataFrame(json_rdd, schema).show(10, False)

# +---+----+--------------------------------+
# |id |type|data                            |
# +---+----+--------------------------------+
# |1  |foo |{"key2": "meh", "key0": "foo"}  |
# |2  |bar |{"key2": "poo", "key3": "pants"}|
# |3  |baz |{"key3": "moo"}                 |
# +---+----+--------------------------------+

函数from_json会将字符串行转换为的元组(id, type, data)json.loads()将解析json字符串并返回一个字典,通过该字典我们生成并返回最终的元组。