-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson_to_csv.py
More file actions
113 lines (89 loc) · 3.95 KB
/
json_to_csv.py
File metadata and controls
113 lines (89 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
"""
JSON to CSV Flattener
This script converts a JSON array of objects into a flat CSV. It can handle nested
lists (like 'LoyaltyPoints' inside a 'User' object) by denormalizing them.
Arguments and Options:
Positional Arguments:
input_file Path to the JSON file to be converted.
Optional Arguments:
--output-file, -o Path for the CSV output. (Default: '<input>.csv')
--nested-key, -k The key containing the list of items to flatten.
(Default: None - treats the file as a simple list of objects)
--parent-keys, -p Keys from the parent object to include in every row.
Repeatable: -p UserId -p UserName
Usage Examples:
Flatten a simple list:
$ python json_to_csv.py data.json
Flatten nested 'LoyaltyPoints' and keep the 'UserId':
$ python json_to_csv.py data.json --nested-key LoyaltyPoints --parent-keys UserId
"""
import json
import pandas as pd
import argparse
import time
import sys
from pathlib import Path
class JSONToCSV:
def __init__(self, input_path, output_path=None, nested_key=None, parent_keys=None):
self.input_path = Path(input_path)
self.output_path = Path(output_path) if output_path else self.input_path.with_suffix(".csv")
self.nested_key = nested_key
self.parent_keys = parent_keys or []
def run(self):
if not self.input_path.exists():
print(f"❌ Error: File not found: {self.input_path}")
return False
print(f"🚀 Starting transformation: {self.input_path.name}")
start_time = time.time()
try:
with self.input_path.open(encoding="utf-8") as f:
data = json.load(f)
rows = []
row_count = 0
# If no nested key is provided, assume the top level is the list to convert
if not self.nested_key:
rows = data
row_count = len(data)
else:
# Denormalize nested structures
for idx, entry in enumerate(data, start=1):
# Extract parent info
parent_data = {k: entry.get(k) for k in self.parent_keys}
children = entry.get(self.nested_key, [])
if not isinstance(children, list):
continue
for child in children:
# Combine parent info with child info
row = {**parent_data, **child}
rows.append(row)
row_count += 1
if row_count % 1000 == 0:
print(f"⏳ Processed {row_count} rows... (Record {idx}/{len(data)})")
# Save using Pandas
df = pd.DataFrame(rows)
df.to_csv(self.output_path, index=False, encoding="utf-8-sig")
end_time = time.time()
print(f"✅ Done! {row_count} rows written to {self.output_path}")
print(f"🕒 Total time: {end_time - start_time:.2f} seconds")
return True
except Exception as e:
print(f"❌ Error during transformation: {e}")
return False
def main():
parser = argparse.ArgumentParser(description="Convert and flatten JSON files to CSV.")
parser.add_argument("input_file", help="Path to the JSON file")
parser.add_argument("--output-file", "-o", help="Path for the output CSV")
parser.add_argument("--nested-key", "-k", help="The key to flatten (e.g., LoyaltyPoints)")
parser.add_argument("--parent-keys", "-p", action="append", help="Parent keys to retain (can be used multiple times)")
args = parser.parse_args()
transformer = JSONToCSV(
input_path=args.input_file,
output_path=args.output_file,
nested_key=args.nested_key,
parent_keys=args.parent_keys
)
if not transformer.run():
sys.exit(1)
if __name__ == "__main__":
main()