如何在Spark中的RDD中跳过多于一行的标题


问题内容

我第一个RDD中的数据就像

1253
545553
12344896
1 2 1
1 43 2
1 46 1
1 53 2

现在,前三个整数是我需要广播的一些计数器。之后,所有行的格式都一样

1 2 1
1 43 2

在函数中对它们进行一些计算后,我会将3个计数器之后的所有这些值映射到新的RDD。但是我不明白如何将前三个值分开并正常映射其余三个值。

我的Python代码是这样的

documents = sc.textFile("file.txt").map(lambda line: line.split(" "))

final_doc = documents.map(lambda x: (int(x[0]), function1(int(x[1]), int(x[2])))).reduceByKey(lambda x, y: x + " " + y)

仅当前三个值不在文本文件中时,它才起作用,但使用它们会产生错误。

我不想跳过前三个值,而是将它们存储在3个广播变量中,然后将其余数据集传递给map函数。

是的,文本文件必须仅采用该格式。我无法删除这3个值/计数器

Function1只是进行一些计算并返回值。


问题答案:
  1. 导入Python 2

    from __future__ import print_function
    
  2. 准备伪数据:

    s = "1253\n545553\n12344896\n1 2 1\n1 43 2\n1 46 1\n1 53 2"
    

    with open(“file.txt”, “w”) as fw: fw.write(s)

  3. 读取原始输入:

    raw = sc.textFile("file.txt")
    
  4. 提取标题:

    header = raw.take(3)
    

    print(header)

    [u‘1253’, u‘545553’, u‘12344896’]

  5. 过滤线:

    • 使用 zipWithIndex

          content = raw.zipWithIndex().filter(lambda kv: kv[1] > 2).keys()
      

      print(content.first())

      1 2 1

    • 使用 mapPartitionsWithIndex

          from itertools import islice
      

      content = raw.mapPartitionsWithIndex(
      lambda i, iter: islice(iter, 3, None) if i == 0 else iter)

      print(content.first())

      1 2 1

注意
:所有功劳归于pzecevicSean
Owen
(请参阅链接来源)。