如何从包含整数的JavaRDD
创建DataFrame
。我做了下面这样的事情,但没有工作。
List<Integer> input = Arrays.asList(101, 103, 105);
JavaRDD<Integer> inputRDD = sc.parallelize(input);
DataFrame dataframe = sqlcontext.createDataFrame(inputRDD, Integer.class);
我得到了ClassCastException
说org. apache.park.sql.types.IntegerType$不能转换为org.apache.park.sql.type.StructType
我怎样才能做到这一点?
显然(虽然不是直观的),这个createDataFrame
重载只能用于Bean类型,这意味着类型不对应于任何内置的SparkSQL类型。
你可以看到,在源代码中,你传递的类与JavaTypeIncentce.的SparkSQL类型匹配,结果被转换为
结构类型
(参见SQLContext. getSchema
中的dataType.asInstanceOf[结构类型]
-但是内置的“原始”类型(如IntegerType
)不是结构类型
…对我来说,看起来像是bug或未记录的行为….
工作方法:
>
用“bean”类包装您的Integer
(我知道这很丑陋):
public static class MyBean {
final int value;
MyBean(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
List<MyBean> input = Arrays.asList(new MyBean(101), new MyBean(103), new MyBean(105));
JavaRDD<MyBean> inputRDD = sc.parallelize(input);
DataFrame dataframe = sqlcontext.createDataFrame(inputRDD, MyBean.class);
dataframe.show(); // this works...
转换为RDD
// convert to Rows:
JavaRDD<Row> rowRdd = inputRDD.map(new Function<Integer, Row>() {
@Override
public Row call(Integer v1) throws Exception {
return RowFactory.create(v1);
}
});
// create schema (this looks nicer in Scala...):
StructType schema = new StructType(new StructField[]{new StructField("number", IntegerType$.MODULE$, false, Metadata.empty())});
DataFrame dataframe = sqlcontext.createDataFrame(rowRdd, schema);
dataframe.show(); // this works...
现在在Spark 2.2中,您可以执行以下操作来创建数据集。
Dataset<Integer> dataSet = sqlContext().createDataset(javardd.rdd(), Encoders.INT());