Python源码示例:tablib.Dataset()
示例1
def test_split_amounts(self):
dataset = tablib.Dataset(
["15/6/2016", "", "100.56", "Example payment"],
["16/6/2016", "60.31", "", "Example income"],
["17/6/2016", "", "-102.56", "Example payment 2"],
headers=["date", "amount_in", "amount_out", "description"],
)
self.makeResource().import_data(dataset)
self.assertEqual(StatementLine.objects.count(), 3)
obj = StatementLine.objects.all().order_by("date")
self.assertEqual(obj[0].date, date(2016, 6, 15))
self.assertEqual(obj[0].amount, Decimal("-100.56"))
self.assertEqual(obj[0].description, "Example payment")
self.assertEqual(obj[1].date, date(2016, 6, 16))
self.assertEqual(obj[1].amount, Decimal("60.31"))
self.assertEqual(obj[1].description, "Example income")
self.assertEqual(obj[2].date, date(2016, 6, 17))
self.assertEqual(obj[2].amount, Decimal("-102.56"))
self.assertEqual(obj[2].description, "Example payment 2")
示例2
def process(self, bom_file):
""" Process a BOM file """
self.data = None
ext = os.path.splitext(bom_file.name)[-1].lower()
if ext in ['.csv', '.tsv', ]:
# These file formats need string decoding
raw_data = bom_file.read().decode('utf-8')
elif ext in ['.xls', '.xlsx']:
raw_data = bom_file.read()
else:
raise ValidationError({'bom_file': _('Unsupported file format: {f}'.format(f=ext))})
try:
self.data = tablib.Dataset().load(raw_data)
except tablib.UnsupportedFormat:
raise ValidationError({'bom_file': _('Error reading BOM file (invalid data)')})
except tablib.core.InvalidDimensions:
raise ValidationError({'bom_file': _('Error reading BOM file (incorrect row size)')})
示例3
def create_dataset(self, in_stream):
"""
Create dataset from first sheet.
"""
from io import BytesIO
import openpyxl
xlsx_book = openpyxl.load_workbook(BytesIO(in_stream), read_only=True)
dataset = tablib.Dataset()
sheet = xlsx_book.active
# obtain generator
rows = sheet.rows
dataset.headers = [cell.value for cell in next(rows)]
for row in rows:
row_values = [cell.value for cell in row]
dataset.append(row_values)
return dataset
#: These are the default formats for import and export. Whether they can be
#: used or not is depending on their implementation in the tablib library.
示例4
def dataset(self):
"""A Tablib Dataset representation of the RecordCollection."""
# Create a new Tablib Dataset.
data = tablib.Dataset()
# If the RecordCollection is empty, just return the empty set
# Check number of rows by typecasting to list
if len(list(self)) == 0:
return data
# Set the column names as headers on Tablib Dataset.
first = self[0]
data.headers = first.keys()
for row in self.all():
row = _reduce_datetimes(row.values())
data.append(row)
return data
示例5
def get_csv_from_url(self, sheet_url):
"""
Return a CSV (text data) from a protected Google sheet URL.
"""
sheet_id = extract_key_from_csv_url(sheet_url)
values = self.get_sheet_values(sheet_id)
headers = [re.sub("[:,\"'\n]", "", h) for h in values.pop(0)]
logger.error("Sheet Headers: %s" % headers)
# TODO: this should be shared across screendoor importer
data = Dataset(headers=headers)
n_headers = len(headers)
for row in values:
n_cols = len(row)
if n_cols < n_headers:
row += [""] * (n_headers - n_cols)
data.append(row)
csv_data = data.export("csv")
return csv_data
示例6
def report(self) -> t.Optional[str]:
"""
Create an report and output it as configured.
:return: the report string if ``to_string == True``
"""
if not self.misc["out"] == "-" and not os.path.exists(os.path.dirname(self.misc["out"])):
logging.error("Folder for report ({}) doesn't exist".format(os.path.dirname(self.misc["out"])))
exit(1)
with click.open_file(self.misc["out"], mode='w') as f:
import tablib
data = tablib.Dataset(itertools.chain.from_iterable(x.split(",") for x in self.misc["columns"]))
for row in self._table():
data.append(row)
f.write(data.csv)
chown(f)
示例7
def load(app_id):
"""Load the data from wherever it is found."""
path_to_data = find_experiment_export(app_id)
if path_to_data is None:
raise IOError("Dataset {} could not be found.".format(app_id))
return Data(path_to_data)
示例8
def __init__(self, path):
self.odo_resource = odo.resource(path)
self.tablib_dataset = tablib.Dataset().load(open(path).read(), "csv")
示例9
def get_dataset(self):
reader = self._get_csv_reader()
if self.has_headings:
six.next(reader)
data = list(reader)
headers = [
column.to_field or "col_%s" % column.column_number for column in self.columns.all()
]
return Dataset(*data, headers=headers)
示例10
def test_import_one(self):
dataset = tablib.Dataset(
["15/6/2016", "5.10", "Example payment"], headers=["date", "amount", "description"]
)
self.makeResource().import_data(dataset)
self.assertEqual(StatementLine.objects.count(), 1)
obj = StatementLine.objects.get()
self.assertEqual(obj.date, date(2016, 6, 15))
self.assertEqual(obj.amount, Decimal("5.10"))
self.assertEqual(obj.description, "Example payment")
示例11
def test_import_skip_duplicates(self):
dataset = tablib.Dataset(
["15/6/2016", "5.10", "Example payment"], headers=["date", "amount", "description"]
)
self.makeResource().import_data(dataset)
# Now do the import again
self.makeResource().import_data(dataset)
# The record in the second should have been ignored
self.assertEqual(StatementLine.objects.count(), 1)
示例12
def test_import_skip_duplicates_whitespace(self):
dataset1 = tablib.Dataset(
["15/6/2016", "5.10", "Example payment"], headers=["date", "amount", "description"]
)
dataset2 = tablib.Dataset(
["15/6/2016", "5.10", "Example payment "], # Whitespace added
headers=["date", "amount", "description"],
)
self.makeResource().import_data(dataset1)
self.makeResource().import_data(dataset2)
# The record in the second should have been ignored
self.assertEqual(StatementLine.objects.count(), 1)
示例13
def test_import_two_identical(self):
"""Ensure they both get imported and that one doesnt get skipped as a duplicate
After all, if there are two imported rows that look identical, it is probably because
there are two identical transactions.
"""
dataset = tablib.Dataset(
["15/6/2016", "5.10", "Example payment"],
["15/6/2016", "5.10", "Example payment"],
headers=["date", "amount", "description"],
)
self.makeResource().import_data(dataset)
self.assertEqual(StatementLine.objects.count(), 2)
示例14
def test_import_a_few_with_identical_transactions(self):
dataset = tablib.Dataset(
["15/6/2016", "5.10", "Example payment"],
["16/6/2016", "10.91", "Another payment"],
["16/6/2016", "10.91", "Another payment"],
["17/6/2016", "-1.23", "Paying someone"],
headers=["date", "amount", "description"],
)
self.makeResource().import_data(dataset)
self.assertEqual(StatementLine.objects.count(), 4)
objs = StatementLine.objects.all().order_by("pk")
self.assertEqual(objs[0].date, date(2016, 6, 15))
self.assertEqual(objs[0].amount, Decimal("5.10"))
self.assertEqual(objs[0].description, "Example payment")
self.assertEqual(objs[1].date, date(2016, 6, 16))
self.assertEqual(objs[1].amount, Decimal("10.91"))
self.assertEqual(objs[1].description, "Another payment")
self.assertEqual(objs[2].date, date(2016, 6, 16))
self.assertEqual(objs[2].amount, Decimal("10.91"))
self.assertEqual(objs[2].description, "Another payment")
self.assertEqual(objs[3].date, date(2016, 6, 17))
self.assertEqual(objs[3].amount, Decimal("-1.23"))
self.assertEqual(objs[3].description, "Paying someone")
示例15
def test_error_no_date(self):
dataset = tablib.Dataset(["5.10", "Example payment"], headers=["amount", "description"])
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("No date", str(result.row_errors()[0][1][0].error))
示例16
def test_error_empty_date(self):
dataset = tablib.Dataset(
["", "5.10", "Example payment"], headers=["date", "amount", "description"]
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Expected dd/mm/yyyy", str(result.row_errors()[0][1][0].error))
示例17
def test_error_empty_amounts(self):
dataset = tablib.Dataset(
["15/6/2016", "", "", "Example payment"],
headers=["date", "amount_in", "amount_out", "description"],
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Value required", str(result.row_errors()[0][1][0].error))
示例18
def test_error_both_amounts(self):
dataset = tablib.Dataset(
["15/6/2016", "5.10", "1.20", "Example payment"],
headers=["date", "amount_in", "amount_out", "description"],
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Values found for both", str(result.row_errors()[0][1][0].error))
示例19
def test_error_neither_amount(self):
dataset = tablib.Dataset(
["15/6/2016", "", "", "Example payment"],
headers=["date", "amount_in", "amount_out", "description"],
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("either", str(result.row_errors()[0][1][0].error))
示例20
def test_error_invalid_in_amount(self):
dataset = tablib.Dataset(
["15/6/2016", "a", "", "Example payment"],
headers=["date", "amount_in", "amount_out", "description"],
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Invalid", str(result.row_errors()[0][1][0].error))
示例21
def test_error_invalid_out_amount(self):
dataset = tablib.Dataset(
["15/6/2016", "", "a", "Example payment"],
headers=["date", "amount_in", "amount_out", "description"],
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Invalid", str(result.row_errors()[0][1][0].error))
示例22
def test_error_invalid_amount(self):
dataset = tablib.Dataset(
["15/6/2016", "a", "Example payment"], headers=["date", "amount", "description"]
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("Invalid", str(result.row_errors()[0][1][0].error))
示例23
def test_error_zero_amount(self):
dataset = tablib.Dataset(
["15/6/2016", "0", "Example payment"], headers=["date", "amount", "description"]
)
result = self.makeResource().import_data(dataset)
self.assertEqual(len(result.row_errors()), 1)
self.assertIn("zero not allowed", str(result.row_errors()[0][1][0].error))
示例24
def __init__(self, target, thread=100, path=None, format='csv'):
Module.__init__(self)
self.subdomains = set()
self.module = 'Check'
self.source = 'Takeover'
self.target = target
self.thread = thread
self.path = path
self.format = format
self.fingerprints = None
self.subdomainq = Queue()
self.cnames = list()
self.results = Dataset()
示例25
def create_dataset(self, in_stream):
"""
Create dataset from first sheet.
"""
import xlrd
xls_book = xlrd.open_workbook(file_contents=in_stream)
dataset = tablib.Dataset()
sheet = xls_book.sheets()[0]
dataset.headers = sheet.row_values(0)
for i in range(1, sheet.nrows):
dataset.append(sheet.row_values(i))
return dataset
示例26
def get_data(self, format="", verbose=False, **kwargs):
"""
This method returns the downloaded data in specified format.
:param format: extension name of data format. Available: json, xls, yaml, csv, dbf, tsv, html, latex, xlsx, ods
:param verbose: (optional) Flag to enable verbose only.
:param kwargs: Optional arguments that data downloader takes.
:return:
"""
if format:
data = tablib.Dataset()
if format not in data._formats:
raise tablib.UnsupportedFormat(
"Format {0} cannot be exported.".format(format)
)
self._download_data(**kwargs)
if verbose:
print(*self.headers, sep=", ")
for row in self.rows:
print(*row, sep=", ")
elif format:
data.headers = self.headers
for row in self.rows:
data.append(row)
return data.export(format)
else:
return self.headers, self.rows
示例27
def dataset(self):
"""A Tablib Dataset containing the row."""
data = tablib.Dataset()
data.headers = self.keys()
row = _reduce_datetimes(self.values())
data.append(row)
return data
示例28
def csv_precheck(csv_data):
"""
Do some basic sanity checks on a CSV.
"""
data = Dataset().load(csv_data, format="csv")
unique_names = []
for header in data.headers:
if header in unique_names:
raise UniqueColumnError(header)
unique_names.append(header)
示例29
def clean_csv_headers(csv):
"""
Remove commas, line breaks, etc, anything that will screw
up the translation from CSV -> database table. CSVKit, in particular,
doesn't like header columns with these chars in it.
"""
data = Dataset().load(csv, format="csv")
headers = [re.sub("[,\"'\n]", "", h) for h in data.headers]
new_data = Dataset(headers=headers)
for row in data:
new_data.append(row)
return new_data.export("csv")
# NOTE: InvalidDimensions
示例30
def test_can_build_csv(self, mockget):
importer = ScreendoorImporter(api_key="KEY", base_url="https://fake.tld")
csv = importer.build_csv_from_data(LIST_FORMS[0], LIST_RESPONSES)
self.assertTrue(csv)
parsed_csv = Dataset().load(csv)
self.assertTrue(
"What's your email address? (ID: xyejrz01)" in parsed_csv.headers
)