Skip to content

Commit 3d5e2ba

Browse files
author
zhanglongbin
committed
Fix the bug of dataflow with ID #29
1 parent 97918b6 commit 3d5e2ba

1 file changed

Lines changed: 102 additions & 24 deletions

File tree

data_celery/datasource/mongo/tasks.py

Lines changed: 102 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,52 @@
11
import shutil
22
from data_celery.main import celery_app
3-
import time,os,json
3+
import time, os, json
44
from data_server.database.session import get_sync_session
55
from sqlalchemy.orm import Session
6-
from data_server.datasource.DatasourceModels import CollectionTask,DataSourceTaskStatusEnum,DataSourceTypeEnum
6+
from data_server.datasource.DatasourceModels import CollectionTask, DataSourceTaskStatusEnum, DataSourceTypeEnum
77
from data_celery.db.DatasourceManager import get_collection_task_by_uid
88
from data_celery.utils import (ensure_directory_exists,
99
get_current_ip, get_current_time, get_datasource_temp_parquet_dir,
1010
ensure_directory_exists_remove, get_datasource_csg_hub_server_dir)
1111
from data_server.datasource.services.datasource import get_datasource_connector
12-
from data_celery.mongo_tools.tools import insert_datasource_run_task_log_info,insert_datasource_run_task_log_error
12+
from data_celery.mongo_tools.tools import insert_datasource_run_task_log_info, insert_datasource_run_task_log_error
1313
from data_engine.exporter.load import load_exporter
1414
from pathlib import Path
1515
import pandas as pd
1616
from loguru import logger
1717

18+
# Import BSON types for MongoDB ObjectId conversion
19+
from datetime import datetime, date
20+
21+
try:
22+
from bson import ObjectId
23+
from bson.errors import InvalidId
24+
25+
BSON_AVAILABLE = True
26+
except ImportError:
27+
BSON_AVAILABLE = False
28+
ObjectId = None
29+
30+
31+
def convert_mongo_document(doc):
32+
"""
33+
Convert MongoDB document to JSON-serializable format.
34+
Handles ObjectId, datetime, and other BSON types.
35+
"""
36+
if isinstance(doc, dict):
37+
return {key: convert_mongo_document(value) for key, value in doc.items()}
38+
elif isinstance(doc, list):
39+
return [convert_mongo_document(item) for item in doc]
40+
elif BSON_AVAILABLE and isinstance(doc, ObjectId):
41+
return str(doc)
42+
elif isinstance(doc, (datetime, date)):
43+
return doc.isoformat()
44+
else:
45+
return doc
46+
1847

1948
@celery_app.task(name="collection_mongo_task")
20-
def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
49+
def collection_mongo_task(task_uid: str, user_name: str, user_token: str):
2150
"""
2251
Collection task
2352
Args:
@@ -64,7 +93,7 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
6493
collection_task.start_run_at = get_current_time()
6594
db_session.commit()
6695
# Read data source
67-
extra_config = collection_task.datasource.extra_config
96+
extra_config = json.loads(collection_task.datasource.extra_config)
6897
if not extra_config:
6998
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
7099
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} has no extra configuration.")
@@ -76,13 +105,19 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
76105
return False
77106
mongo_config = extra_config["mongo"]
78107
max_line = 10000
79-
csg_hub_dataset_id = 0
108+
csg_hub_dataset_id = ''
80109
csg_hub_dataset_default_branch = "main"
81110
if "csg_hub_dataset_default_branch" in extra_config:
82111
csg_hub_dataset_default_branch = extra_config["csg_hub_dataset_default_branch"]
83-
if "csg_hub_dataset_id" in extra_config and isinstance(extra_config['csg_hub_dataset_id'], int):
112+
if "csg_hub_dataset_id" in extra_config:
84113
csg_hub_dataset_id = extra_config["csg_hub_dataset_id"]
85-
if csg_hub_dataset_id <= 0:
114+
# Read csg_hub_dataset_name if provided, otherwise use default branch
115+
csg_hub_dataset_name = None
116+
if "csg_hub_dataset_name" in extra_config and extra_config['csg_hub_dataset_name'] != '':
117+
csg_hub_dataset_name = extra_config["csg_hub_dataset_name"]
118+
else:
119+
csg_hub_dataset_name = csg_hub_dataset_default_branch
120+
if csg_hub_dataset_id is None or csg_hub_dataset_id == '':
86121
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
87122
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} has no CSG Hub Dataset ID.")
88123
return False
@@ -96,7 +131,8 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
96131
connector = get_datasource_connector(collection_task.datasource)
97132
if not connector.test_connection():
98133
collection_task.task_status = DataSourceTaskStatusEnum.ERROR.value
99-
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to connect to the database.")
134+
insert_datasource_run_task_log_error(task_uid,
135+
f"Task with UID {task_uid} failed to connect to the database.")
100136
return False
101137

102138
total_count = 0
@@ -117,11 +153,22 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
117153
while True:
118154
# Execute pagination query (specific implementation depends on connector details)
119155
rows = connector.query_collection(collection_name, offset=(page - 1) * page_size,
120-
limit=page_size)
156+
limit=page_size)
121157

122158
if not rows:
123159
break # If there is no more data, exit the loop
124160

161+
# Add rows to buffer, converting MongoDB types to JSON-serializable format
162+
if isinstance(rows, list):
163+
# Convert each document to handle ObjectId and other BSON types
164+
converted_rows = [convert_mongo_document(row) for row in rows]
165+
rows_buffer.extend(converted_rows)
166+
else:
167+
# If rows is a generator or iterator, convert to list first
168+
rows_list = list(rows)
169+
converted_rows = [convert_mongo_document(row) for row in rows_list]
170+
rows_buffer.extend(converted_rows)
171+
125172
# If the number of rows in the buffer list reaches or exceeds the maximum number of rows, write to the file and clear the buffer list
126173
if len(rows_buffer) >= max_line:
127174
file_path = os.path.join(table_dir, f"data_{file_index:04d}.parquet")
@@ -130,7 +177,8 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
130177
current_file_row_count += len(rows_buffer)
131178
records_count += len(rows_buffer)
132179
collection_task.records_count = records_count
133-
insert_datasource_run_task_log_info(task_uid, f"Task with UID {task_uid} get data count {records_count}...")
180+
insert_datasource_run_task_log_info(task_uid,
181+
f"Task with UID {task_uid} get data count {records_count}...")
134182
db_session.commit()
135183
file_index += 1
136184
rows_buffer = [] # Clear the buffer list
@@ -143,16 +191,18 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
143191
current_file_row_count += len(rows_buffer)
144192
records_count += len(rows_buffer)
145193
collection_task.records_count = records_count
146-
insert_datasource_run_task_log_info(task_uid, f"Task with UID {task_uid} get data count {records_count}...")
194+
insert_datasource_run_task_log_info(task_uid,
195+
f"Task with UID {task_uid} get data count {records_count}...")
147196
db_session.commit()
148197

149198
except Exception as e:
150-
insert_datasource_run_task_log_error(task_uid, f"Task with UID {task_uid} failed to get collection document {collection_name}: {e}")
199+
insert_datasource_run_task_log_error(task_uid,
200+
f"Task with UID {task_uid} failed to get collection document {collection_name}: {e}")
151201
collection_task.records_count = total_count
152202
collection_task.total_count = total_count
153203
db_session.commit()
154204
upload_to_csg_hub_server(csg_hub_dataset_id,
155-
csg_hub_dataset_default_branch,
205+
csg_hub_dataset_name,
156206
user_name, user_token, db_session,
157207
collection_task, datasource_temp_parquet_dir,
158208
datasource_csg_hub_server_dir)
@@ -176,15 +226,15 @@ def collection_mongo_task(task_uid: str,user_name: str,user_token: str):
176226
return True
177227

178228

179-
def upload_to_csg_hub_server(csg_hub_dataset_id: int,
229+
def upload_to_csg_hub_server(csg_hub_dataset_id: str,
180230
csg_hub_dataset_default_branch: str,
181-
user_name: str,user_token: str,db_session: Session,
182-
collection_task: CollectionTask,datasource_temp_json_dir: str,
231+
user_name: str, user_token: str, db_session: Session,
232+
collection_task: CollectionTask, datasource_temp_json_dir: str,
183233
datasource_csg_hub_server_dir: str):
184234
"""
185235
Upload to CSG Hub server
186236
Args:
187-
csg_hub_dataset_id (int): CSG Hub dataset ID
237+
csg_hub_dataset_id (str): CSG Hub dataset ID
188238
csg_hub_dataset_default_branch (str): CSG Hub dataset default branch
189239
user_name (str): User name
190240
user_token (str): User token
@@ -198,26 +248,54 @@ def upload_to_csg_hub_server(csg_hub_dataset_id: int,
198248
# Upload to CSG Hub server
199249

200250
ensure_directory_exists_remove(datasource_csg_hub_server_dir)
201-
insert_datasource_run_task_log_info(collection_task.task_uid, f"Starting upload csg hub-server the task[{collection_task.task_uid}]...")
251+
insert_datasource_run_task_log_info(collection_task.task_uid,
252+
f"Starting upload csg hub-server the task[{collection_task.task_uid}]...")
202253
exporter = load_exporter(
203254
export_path=datasource_temp_json_dir,
204-
repo_id=str(csg_hub_dataset_id),
255+
repo_id=csg_hub_dataset_id,
205256
branch=csg_hub_dataset_default_branch,
206257
user_name=user_name,
207258
user_token=user_token,
208259
work_dir=datasource_csg_hub_server_dir
209260
)
210-
upload_path: Path = Path(datasource_csg_hub_server_dir)
261+
upload_path: Path = Path(datasource_temp_json_dir)
262+
# Check whether the uploaded directory exists and is not empty
263+
if not os.path.exists(upload_path):
264+
insert_datasource_run_task_log_error(collection_task.task_uid,
265+
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} does not exist")
266+
return False
267+
268+
# List all files in the upload directory for debugging
269+
file_list = []
270+
for root, dirs, files in os.walk(upload_path):
271+
for file in files:
272+
file_list.append(os.path.join(root, file))
273+
insert_datasource_run_task_log_info(collection_task.task_uid,
274+
f"Files to upload: {len(file_list)} files found in {upload_path}")
275+
if len(file_list) == 0:
276+
insert_datasource_run_task_log_error(collection_task.task_uid,
277+
f"the task[{collection_task.task_uid}] upload csg hub-server fail: upload path {upload_path} is empty")
278+
return False
279+
211280
output_branch_name = exporter.export_from_files(upload_path)
212281

213282
if output_branch_name:
214283
collection_task.csg_hub_branch = output_branch_name
215284
db_session.commit()
216-
insert_datasource_run_task_log_info(collection_task.task_uid, f"the task[{collection_task.task_uid}] upload csg hub-server success...")
285+
insert_datasource_run_task_log_info(collection_task.task_uid,
286+
f"the task[{collection_task.task_uid}] upload csg hub-server success...")
217287
else:
218-
insert_datasource_run_task_log_error(collection_task.task_uid, f"the task[{collection_task.task_uid}] upload csg hub-server fail...")
288+
insert_datasource_run_task_log_error(collection_task.task_uid,
289+
f"the task[{collection_task.task_uid}] upload csg hub-server fail: export_from_files returned None")
219290
except Exception as e:
220291
logger.error(e)
221-
insert_datasource_run_task_log_error(collection_task.task_uid,f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {e}")
292+
error_msg = str(e)
293+
# Check if this is a "nothing to commit" error
294+
if "nothing to commit" in error_msg.lower() or "working tree clean" in error_msg.lower():
295+
insert_datasource_run_task_log_error(collection_task.task_uid,
296+
f"the task[{collection_task.task_uid}] upload csg hub-server fail: No files to commit. This may happen if: 1) Files are already committed in the branch, 2) Files are ignored by .gitignore, 3) File paths are incorrect. Error: {error_msg}")
297+
else:
298+
insert_datasource_run_task_log_error(collection_task.task_uid,
299+
f"Task UID {collection_task.task_uid} Error occurred while uploading to CSG Hub server: {error_msg}")
222300
return False
223301
return True

0 commit comments

Comments
 (0)