将dask数据框中的列转换为Doc2Vec的TaggedDocument


问题内容

介绍

目前,我正在尝试与gensim一起使用dask进行NLP文档计算,并且在将我的语料库转换为“
TaggedDocument
”时遇到了问题。

因为我已经尝试了多种解决该问题的方法,所以将列出我的尝试。

每次处理此问题的尝试都会遇到一些稍有不同的麻烦。

首先是一些初始给定。

数据

df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)



  claim_no   claim_txt I                                    CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0

期望的输出

>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', 'casing', 'elements', 'encase', 'active', 'materials', 'battery', 'cell', 'interior', 'space', 'wherein', 'least', 'one', 'gas', 'tight', 'seal', 'layer', 'arranged', 'first', 'second', 'contact', 'surfaces', 'seal', 'interior', 'space', 'characterized', 'one', 'first', 'second', 'contact', 'surfaces', 'comprises', 'electrically', 'insulating', 'void', 'volume', 'layer', 'first', 'second', 'contact', 'surfaces', 'comprises', 'formable', 'material', 'layer', 'fills', 'voids', 'surface', 'void', 'volume', 'layer', 'hermetically', 'assembled', 'position', 'form', 'seal', 'layer'], tags=['8697278-17'])
>>len(tagged_document) == len(df['claim_txt'])

错误编号1不允许生成器

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        yield gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

TypeError:无法序列化类型生成器的对象。

我仍然无法使用发电机来解决这个问题。解决这个问题的方法非常棒!因为这对于普通的熊猫来说效果很好。

错误号2仅每个分区的第一个元素

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    for i, line in enumerate(df[corp]):
        return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))

tagged_document = df.map_partitions(read_corpus_tag_sub,meta=TaggedDocument)
tagged_document = tagged_document.compute()

这个有点笨,因为该函数不会进行迭代(我知道),但是给出了所需的格式,但是只返回每个分区的第一行。

错误3函数调用挂起100%cpu

def read_corpus_tag_sub(df,corp='claim_txt',tags=['claim_no']):
    tagged_list = []
    for i, line in enumerate(df[corp]):
        tagged = gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(line), (list(df.loc[i,tags].values)))
        tagged_list.append(tagged)
    return tagged_list

据我所知,在循环外重构返回值时,此函数挂起将在dask客户端中建立内存,并且我的CPU利用率达到100%,但没有任何任务正在计算。请记住,我以相同的方式调用该函数。

熊猫解决方案

def tag_corp(corp,tag):
    return gensim.models.doc2vec.TaggedDocument(gensim.utils.simple_preprocess(corp), ([tag]))

tagged_document = [tag_corp(x,y) for x,y in list(zip(df_smple['claim_txt'],df_smple['claim_no']))]

List comp我还没有时间测试这个解决方案

其他熊猫解决方案

tagged_document = list(read_corpus_tag_sub(df))

该解决方案将持续几个小时。但是,完成后,我没有足够的内存来处理这个问题。

结论(?)

我觉得现在超级迷路了。这是我看过的线程列表。我承认自己真的是个新手,我花了这么多时间,感觉就像是在做一个傻瓜。

  1. 发电机包
  2. 使用Dask处理文字
  3. 使用Dask加快熊猫申请
  4. 如何利用一台计算机上的所有内核并行化Pandas Dataframe上的apply()?
  5. python dask DataFrame,是否支持(普通并行化)行?
  6. map_partitions在做什么?
  7. 简单的dask map_partitions示例
  8. 文件

问题答案:

我不熟悉Dask API /限制,但通常:

  • 如果您可以将数据迭代为(单词,标签)元组(甚至忽略Doc2Vec/TaggedDocument步骤),那么Dask端将已经处理,并将这些元组转换为TaggedDocument实例应该是微不足道的

  • 通常,对于大型数据集,您不想(并且可能没有足够的RAM来)将整个数据集实例化为list内存中的内容-因此,涉及list()或的尝试.append()可能在某种程度上都可以正常工作,但会耗尽本地内存(导致严重的交换)和/或仅未达到数据末尾。

大型数据集的首选方法是创建一个可迭代的对象,该对象每次被要求对数据进行迭代(因为Doc2Vec训练将需要多次通过)可以依次提供每个项目,但永远不要将整个数据集读入内存中的对象。

关于此模式的一个不错的博客文章是:Python中的数据流:生成器,迭代器,可迭代

给定您显示的代码,我怀疑适合您的方法可能是:

from gensim.utils import simple_preprocess

class MyDataframeCorpus(object):
    def __init__(self, source_df, text_col, tag_col):
        self.source_df = source_df
        self.text_col = text_col
        self.tag_col = tag_col

    def __iter__(self):
        for i, row in self.source_df.iterrows():
            yield TaggedDocument(words=simple_preprocess(row[self.text_col]), 
                                 tags=[row[self.tag_col]])

corpus_for_doc2vec = MyDataframeCorpus(df, 'claim_txt', 'claim_no')