From 0317afb5db53f1e0ca2afe44f4934407c6c7ca96 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Fri, 21 May 2021 08:45:09 -0700 Subject: [PATCH 1/3] Write parquet with decimal columns in load_test --- gpu_bdb/bdb_tools/readers.py | 2 +- .../queries/load_test/gpu_bdb_load_test.py | 20 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/gpu_bdb/bdb_tools/readers.py b/gpu_bdb/bdb_tools/readers.py index ee273762..ed312052 100755 --- a/gpu_bdb/bdb_tools/readers.py +++ b/gpu_bdb/bdb_tools/readers.py @@ -127,7 +127,7 @@ def read(self, table, relevant_cols=None, **kwargs): if (table in SMALL_TABLES) or (table in SUPER_SMALL_TABLES): df = df.repartition(npartitions=1) - return df + return df.compute() class ORCReader(Reader): diff --git a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py index ab57193f..ab0a32c1 100755 --- a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py +++ b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py @@ -1,5 +1,6 @@ from bdb_tools.utils import benchmark, gpubdb_argparser, run_query from bdb_tools.readers import build_reader +from cudf.core.dtypes import Decimal64Dtype import os, subprocess, math, time @@ -33,22 +34,21 @@ def get_schema(table): schema = fp.read() names = [line.replace(",", "").split()[0] for line in schema.split("\n")] types = [ - line.replace(",", "") + line.replace(", ", "") .split()[1] .replace("bigint", "int") .replace("string", "str") for line in schema.split("\n") ] - types = [ - col_type.split("(")[0].replace("decimal", "float") for col_type in types - ] + return names, types def read_csv_table(table, chunksize="256 MiB"): # build dict of dtypes to use when reading CSV names, types = get_schema(table) - dtype = {names[i]: types[i] for i in range(0, len(names))} + types_float = ['float' if 'decimal' in t else t for t in types] + dtype = {names[i]: types_float[i] for i in range(0, len(names))} data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" @@ -83,9 +83,15 @@ def read_csv_table(table, chunksize="256 MiB"): if os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=types, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=types_float, chunksize=chunksize, quoting=3 ) + for i, dtype in enumerate(types): + if 'decimal' in dtype: + precision = int(dtype[dtype.find("(")+1:dtype.find(",")]) + scale = int(dtype[dtype.find(",")+1:dtype.find(")")]) + df[names[i]] = df[names[i]].astype(Decimal64Dtype(precision, scale)) + return df @@ -133,7 +139,7 @@ def repartition(table, outdir, npartitions=None, chunksize=None, compression="sn def main(client, config): # location you want to write Parquet versions of the table data data_dir = config["data_dir"].split('parquet_')[0] - outdir = f"{data_dir}/parquet_{part_size}gb/" + outdir = f"{data_dir}/parquet_{part_size}gb_decimal/" t0 = time.time() for table in tables: From caad91c759fea440a52c2049412a2f154eda4151 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 24 May 2021 10:02:10 -0700 Subject: [PATCH 2/3] Read in as string --- gpu_bdb/bdb_tools/readers.py | 2 +- gpu_bdb/queries/load_test/gpu_bdb_load_test.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/gpu_bdb/bdb_tools/readers.py b/gpu_bdb/bdb_tools/readers.py index ed312052..ee273762 100755 --- a/gpu_bdb/bdb_tools/readers.py +++ b/gpu_bdb/bdb_tools/readers.py @@ -127,7 +127,7 @@ def read(self, table, relevant_cols=None, **kwargs): if (table in SMALL_TABLES) or (table in SUPER_SMALL_TABLES): df = df.repartition(npartitions=1) - return df.compute() + return df class ORCReader(Reader): diff --git a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py index ab0a32c1..a5932809 100755 --- a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py +++ b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py @@ -25,7 +25,7 @@ tables = [table.split(".")[0] for table in os.listdir(spark_schema_dir)] scale = [x for x in config["data_dir"].split("/") if "sf" in x][0] -part_size = 3 +part_size = 2 chunksize = "128 MiB" # Spark uses different names for column types, and RAPIDS doesn't yet support Decimal types. @@ -47,8 +47,8 @@ def get_schema(table): def read_csv_table(table, chunksize="256 MiB"): # build dict of dtypes to use when reading CSV names, types = get_schema(table) - types_float = ['float' if 'decimal' in t else t for t in types] - dtype = {names[i]: types_float[i] for i in range(0, len(names))} + types_str = ['str' if 'decimal' in t else t for t in types] + dtype_str = {names[i]: types_str[i] for i in range(0, len(names))} data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" @@ -67,7 +67,7 @@ def read_csv_table(table, chunksize="256 MiB"): if "audit" not in fn and os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=dtype, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=dtype_str, chunksize=chunksize, quoting=3 ) else: paths = [ @@ -83,7 +83,7 @@ def read_csv_table(table, chunksize="256 MiB"): if os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=types_float, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=types_str, chunksize=chunksize, quoting=3 ) for i, dtype in enumerate(types): From 515e4dcb47b4ef2fc1cec5a587d25a82894217e6 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Tue, 25 May 2021 12:03:47 -0700 Subject: [PATCH 3/3] Cleanup --- gpu_bdb/queries/load_test/gpu_bdb_load_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py index a5932809..1e9dff6f 100755 --- a/gpu_bdb/queries/load_test/gpu_bdb_load_test.py +++ b/gpu_bdb/queries/load_test/gpu_bdb_load_test.py @@ -25,7 +25,7 @@ tables = [table.split(".")[0] for table in os.listdir(spark_schema_dir)] scale = [x for x in config["data_dir"].split("/") if "sf" in x][0] -part_size = 2 +part_size = 3 chunksize = "128 MiB" # Spark uses different names for column types, and RAPIDS doesn't yet support Decimal types. @@ -47,8 +47,8 @@ def get_schema(table): def read_csv_table(table, chunksize="256 MiB"): # build dict of dtypes to use when reading CSV names, types = get_schema(table) - types_str = ['str' if 'decimal' in t else t for t in types] - dtype_str = {names[i]: types_str[i] for i in range(0, len(names))} + types_dec_as_str = ['str' if 'decimal' in t else t for t in types] + dtype_dec_as_str = {names[i]: types_dec_as_str[i] for i in range(0, len(names))} data_dir = config["data_dir"].split('parquet_')[0] base_path = f"{data_dir}/data/{table}" @@ -67,7 +67,7 @@ def read_csv_table(table, chunksize="256 MiB"): if "audit" not in fn and os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=dtype_str, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=dtype_dec_as_str, chunksize=chunksize, quoting=3 ) else: paths = [ @@ -83,7 +83,7 @@ def read_csv_table(table, chunksize="256 MiB"): if os.path.getsize(f"{base_path}/{fn}") > 0 ] df = dask_cudf.read_csv( - paths, sep="|", names=names, dtype=types_str, chunksize=chunksize, quoting=3 + paths, sep="|", names=names, dtype=types_dec_as_str, chunksize=chunksize, quoting=3 ) for i, dtype in enumerate(types):