提问者:小点点

SideInputs损坏了DataFlow管道中的数据


我有一个数据流管道(SDK 2.1.0,Apache Beam 2.2.0),它只需从GCS中读取RDF(在N-Triples中,所以它只是文本文件),以某种方式转换并将其写回GCS,但放在不同的桶中。在这个管道中,我使用三个单独的文件(每个侧输入一个文件),并在ParDo中使用它们。

为了在Java中使用RDF,我使用ApacheJena,所以每个文件都被读入Model类的一个实例中。由于Dataflow没有Coder,所以我自己开发了它(RDFModelCode,见下文)。它在我创建的其他管道中运行良好。

这个特定管道的问题是,当我添加侧输入时,执行失败,异常指示数据损坏,即添加了一些垃圾。一旦我删除了副输入,管道就成功地完成了执行。

异常(它是从RDFModelCoder抛出的,见下文):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
    at org.apache.jena.atlas.io.IO.exception(IO.java:233)
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)

在这里你可以看到垃圾(在最后):

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������

管道:

val one = p.apply(TextIO.read().from(config.getString("source.one")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val two = p.apply(TextIO.read().from(config.getString("source.two")))
           .apply(Combine.globally(SingleValue()))
           .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val three = p.apply(TextIO.read().from(config.getString("source.three")))
             .apply(Combine.globally(SingleValue()))
             .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))

val sideInput = PCollectionList.of(one).and(two).and(three)
                .apply(Flatten.pCollections())
                .apply(View.asList())

p.apply(RDFIO.Read
                  .from(options.getSource())
                  .withSuffix(RDFLanguages.strLangNTriples))
 .apply(ParDo.of(SparqlConstructETL(config, sideInput))
                        .withSideInputs(sideInput))
 .apply(RDFIO.Write
                  .to(options.getDestination())
                  .withSuffix(RDFLanguages.NTRIPLES))

为了提供全貌,这里有SingleValueConvertToRDFModelParDos的实现:

class SingleValue : SerializableFunction<Iterable<String>, String> {
    override fun apply(input: Iterable<String>?): String {
        if (input != null) {
            return input.joinToString(separator = " ")
        }
        return ""
    }
}

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
    private val lang: String = outputLang.name

    @ProcessElement
    fun processElement(c: ProcessContext?) {
        if (c != null) {
            val model = ModelFactory.createDefaultModel()
            model.read(StringReader(c.element()), null, lang)
            c.output(model)
        }
    }
}

RDFModelCoder的实现:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
                    private val encodeLang: String = RDFLanguages.strLangNTriples)
    : AtomicCoder<Model>() {

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)

    override fun decode(inStream: InputStream): Model {
        val bytes = StreamUtils.getBytes(inStream)
        val model = ModelFactory.createDefaultModel()

        model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here

        return model
    }

    override fun encode(value: Model, outStream: OutputStream?) {
        value.write(outStream, encodeLang, null)
    }

}

我检查了侧输入文件多次,他们很好,他们有UTF-8编码。


共2个答案

匿名用户

最有可能的错误是在< code>RDFModelCoder的实现中。当实现< code > encode /< code > decode 时,必须记住所提供的< code>InputStream和< code>OutputStream并不专属于正在编码/解码的当前实例。例如,在当前< code >模型的编码形式之后,< code>InputStream中可能有更多数据。当使用< code > stream utils . getbytes(inStream)时,您将获取当前编码的< code >模型的数据以及流中的任何其他数据。

一般来说,当编写一个新的< code >编码器时,最好只结合现有的< code >编码器而不是手工解析流:这样更不容易出错。我建议将模型转换为< code>byte[],并使用< code>ByteArrayCoder.of()对其进行编码/解码。

匿名用户

Apache Jena提供了支持Hadoop IO的Elephas IO模块,由于Beam支持Hadoop InputFormat IO,您应该能够使用它来读取NTriple文件。

这可能会更有效,因为Elephas中的NTriples支持能够并行化IO,并避免将整个模型缓存到内存中(事实上,它根本不会使用< code>Model):

Configuration myHadoopConfiguration = new Configuration(false);

// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
                               NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need

// Read data only with Hadoop configuration.
p.apply("read",
         HadoopInputFormatIO.<LongWritable, TripleWritable>read()
        .withConfiguration(myHadoopConfiguration);

当然,这可能需要您在某种程度上重构整个管道。