如何转置简单的数据框(将列转换为行)以遵循整洁的数据原理


问题内容

TLDR
:我从一个轻快的包中创建了一个轻快的数据框。dask数据框将每个观察(事件)视为一列。因此,我没有为每个事件提供数据行,而是为每个事件提供一列。目标是将列转置为行的方式与熊猫可以使用df.T转置数据帧的方式相同。

详细信息
我的时间轴上示例Twitter数据。为了达到我的出发点,下​​面是将磁盘中的json读取为adask.bag然后将其转换为a的代码dask.dataframe

import dask.bag as db
import dask.dataframe as dd
import json


b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()

问题 我所有的个人事件(即推文)都记录为列副行。按照tidy原则,我希望每个事件都有一行。
pandas具有用于数据帧的转置方法和dask.array具有用于数组的转置方法。我的目标是执行相同的转置操作,但要在一个淡淡的数据帧上。我该怎么办?

  1. 将行转换为列

编辑解决方案

此代码解决了原始的转置问题,通过定义要保留的列并删除其余的列来清理Twitter
json文件,并通过将函数应用于Series来创建新列。然后,我们将一个更小的干净文件写入磁盘。

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob

# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')


# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)


# see all the fields of the dataframe 
fields = list(df.columns)

# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']

# remove the fields i don't want from column list
for f in keepers:
    if f in fields:
        fields.remove(f)

# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)

clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean

# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
    newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames

# custom saver function for dataframes using newfilenames
def saver(frame,filename):
    return frame.to_json('./'+filename)

# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]

# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)

问题答案:

我认为您可以通过完全绕过bag来获得所需的结果,例如

import glob

import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

filenames = glob.glob('sampleTwitter*.json')
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
ddf = dd.from_delayed(dfs)