Skip to content

Commit df15c4f

Browse files
authored
Merge pull request #11 from z275748353/main
add a new operator
2 parents ca22b89 + f41777c commit df15c4f

41 files changed

Lines changed: 2008 additions & 660 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Dockerfile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,4 @@ RUN git config --global --add safe.directory '*'
4343

4444
# Start fastapi API Server
4545
EXPOSE 8000
46-
# CMD ["df-server"]
47-
CMD ["uvicorn", "data_server.main:app", "--host", "0.0.0.0", "--port", "8000"]
4846

README.md

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ This project inherits the [Apache License 2.0](LICENSE) from Data Juicer.
3434
docker build -t dataflow . -f Dockerfile
3535
```
3636

37-
## Building data-flow-celery from Source
38-
39-
```
40-
docker build -t dataflow-celery . -f Dockerfile-celery
41-
```
4237

4338
## Prerequisites
4439

@@ -80,6 +75,7 @@ docker run -d --name dataflow-redis \
8075

8176
docker run -d --name dataflow-api -p 8000:8000 \
8277
-v /home/apidata:/data/dataflow_data \
78+
-c "uvicorn data_server.main:app --host 0.0.0.0 --port 8000" \
8379
-e DATA_DIR=/data/dataflow_data \
8480
-e CSGHUB_ENDPOINT=https://hub.opencsg.com \
8581
-e MAX_WORKERS=99 \
@@ -107,6 +103,7 @@ docker run -d --name dataflow-api -p 8000:8000 \
107103

108104
docker run -d --name celery-work -p 8001:8001 \
109105
-v /home/celery-data:/data/dataflow_celery \
106+
-c "celery -A data_celery.main:celery_app worker --loglevel=info --pool=gevent" \
110107
-e DATA_DIR=/data/dataflow_celery \
111108
-e CSGHUB_ENDPOINT=https://hub.opencsg.com \
112109
-e MAX_WORKERS=99 \
@@ -146,14 +143,6 @@ uvicorn data_server.main:app --reload
146143
## Run data-flow-celery server in development mode locally
147144

148145
```bash
149-
# Create virtual python 3.10 environment
150-
conda create -n dataflow python=3.10
151-
152-
# Install dependencies
153-
pip install '.[dist]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
154-
pip install '.[tools]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
155-
pip install '.[sci]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
156-
pip install -r docker/requirements.txt
157146

158147
# Run the celery server locally
159148
celery -A data_celery.main:celery_app worker --loglevel=info --pool=gevent
3.06 KB
Loading
3.35 KB
Loading
2.8 KB
Loading
3.36 KB
Loading
3.01 KB
Loading

data_celery/formatify/tasks.py

Lines changed: 88 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@
2828
from data_engine.utils.env import GetHubEndpoint
2929
@celery_app.task
3030
def format_task(task_id: int, user_name: str, user_token: str):
31+
3132
tmp_path: str = None
3233
db_session: Session = None
3334
format_task: DataFormatTask = None
35+
3436
try:
3537
db_session: Session = get_sync_session()
3638
format_task: DataFormatTask = FormatifyManager.get_formatify_task(db_session, task_id)
3739
tmp_path = get_format_folder_path(format_task.task_uid)
38-
insert_formatity_task_log_info(format_task.task_uid, f"Create a temporary directory:{tmp_path}")
40+
insert_formatity_task_log_info(format_task.task_uid, f"Create temporary directory:{tmp_path}")
3941
ensure_directory_exists(tmp_path)
4042

41-
insert_formatity_task_log_info(format_task.task_uid, f"Start downloading the source directory...")
43+
insert_formatity_task_log_info(format_task.task_uid, f"Start downloading directory....")
4244
ingesterCSGHUB = load_ingester(
4345
dataset_path=tmp_path,
4446
repo_id=format_task.from_csg_hub_repo_id,
@@ -47,18 +49,25 @@ def format_task(task_id: int, user_name: str, user_token: str):
4749
user_token=user_token,
4850
)
4951
ingester_result = ingesterCSGHUB.ingest()
50-
insert_formatity_task_log_info(format_task.task_uid, f"Download of the source directory completed... Directory address:{ingester_result}")
52+
insert_formatity_task_log_info(format_task.task_uid, f"Download directory completed... Directory address:{ingester_result}")
5153
work_dir = Path(tmp_path).joinpath('work')
52-
insert_formatity_task_log_info(format_task.task_uid, f"Start converting files...")
54+
file_bool = search_files(tmp_path,[format_task.from_data_type])
55+
56+
if not file_bool:
57+
insert_formatity_task_log_info(format_task.task_uid, f"file not found. task ended....")
58+
format_task.task_status = DataFormatTaskStatusEnum.ERROR.value
59+
db_session.commit()
60+
return
61+
insert_formatity_task_log_info(format_task.task_uid, f"Start converting file...")
5362

5463
format_task_func(
5564
tmp_path=ingester_result,
5665
from_type=format_task.from_data_type,
5766
to_type=format_task.to_data_type,
5867
task_uid=format_task.task_uid,
5968
)
60-
insert_formatity_task_log_info(format_task.task_uid, f"File conversion completed...")
61-
insert_formatity_task_log_info(format_task.task_uid, f"Start uploading the target directory...")
69+
insert_formatity_task_log_info(format_task.task_uid, f"Conversion file complete....")
70+
insert_formatity_task_log_info(format_task.task_uid, f"Start uploading directory...")
6271

6372
exporter = load_exporter(
6473
export_path=ingester_result,
@@ -78,13 +87,13 @@ def format_task(task_id: int, user_name: str, user_token: str):
7887
traceback.print_exc()
7988
format_task.task_status = DataFormatTaskStatusEnum.ERROR.value
8089
db_session.commit()
81-
insert_formatity_task_log_error(format_task.task_uid, f"The conversion task failed.: {str(e)}")
90+
insert_formatity_task_log_error(format_task.task_uid, f"Conversion task failed: {str(e)}")
8291
finally:
8392
pass
8493

8594
if tmp_path:
8695
shutil.rmtree(tmp_path)
87-
insert_formatity_task_log_info(format_task.task_uid, f"Delete the temporary directory:{tmp_path}")
96+
insert_formatity_task_log_info(format_task.task_uid, f"Delete temporary directory:{tmp_path}")
8897

8998

9099
def format_task_func(
@@ -94,7 +103,7 @@ def format_task_func(
94103
task_uid: str
95104
):
96105
insert_formatity_task_log_info(task_uid,
97-
f"Convert directory{tmp_path},Source file type:{getFormatTypeName(from_type)}Target file type:{getFormatTypeName(to_type)}")
106+
f"Change the table of contents{tmp_path},Source file type:{getFormatTypeName(from_type)}Source file type:{getFormatTypeName(to_type)}")
98107
match from_type:
99108
case DataFormatTypeEnum.Excel.value:
100109
match to_type:
@@ -131,28 +140,30 @@ def convert_excel_to_csv(file_path: str, task_uid):
131140
df = pd.read_excel(file_path)
132141
new_file = os.path.splitext(file_path)[0] + '.csv'
133142
df.to_csv(new_file, index=False)
134-
insert_formatity_task_log_info(task_uid, f'The conversion of the file {new_file} was successful.')
143+
insert_formatity_task_log_info(task_uid, f'convert file {new_file} succeed')
135144
os.remove(file_path)
136145
return True
137146
except Exception as e:
138-
insert_formatity_task_log_error(task_uid, f"An error occurred while converting the file {file_path}: {e}")
147+
print(f"convert file {file_path} error: {e}")
148+
insert_formatity_task_log_error(task_uid, f"convert file {file_path} error: {e}")
139149
return False
140150
else:
141151
return True
142152

143153

144154
def convert_excel_to_json(file_path: str, task_uid):
145155
if file_path.lower().endswith(('.xlsx', '.xls')):
146-
insert_formatity_task_log_info(task_uid, f'Source file address: {file_path}')
156+
insert_formatity_task_log_info(task_uid, f'Source file address{file_path}')
147157
try:
148158
df = pd.read_excel(file_path)
149159
new_file = os.path.splitext(file_path)[0] + '.json'
150160
df.to_json(new_file, orient='records', force_ascii=False)
151-
insert_formatity_task_log_info(task_uid, f'The file {new_file} has been converted successfully.')
161+
insert_formatity_task_log_info(task_uid, f'convert file {new_file} succeed')
152162
os.remove(file_path)
153163
return True
154164
except Exception as e:
155-
insert_formatity_task_log_error(task_uid, f"When converting the file {file_path}, an error occurred: {e}")
165+
print(f"convert file {file_path} error: {e}")
166+
insert_formatity_task_log_error(task_uid, f"convert file {file_path} error: {e}")
156167

157168
return False
158169
else:
@@ -162,24 +173,25 @@ def convert_excel_to_json(file_path: str, task_uid):
162173

163174
def convert_excel_to_parquet(file_path: str, task_uid):
164175
if file_path.lower().endswith(('.xlsx', '.xls')):
165-
insert_formatity_task_log_info(task_uid, f'Source file address: {file_path}')
176+
insert_formatity_task_log_info(task_uid, f'Source file address{file_path}')
166177
try:
167178
df = pd.read_excel(file_path)
168179
new_file = os.path.splitext(file_path)[0] + '.parquet'
169180
df.to_parquet(new_file + '.parquet', index=False)
170-
insert_formatity_task_log_info(task_uid, f'The file {new_file} has been converted successfully.')
181+
insert_formatity_task_log_info(task_uid, f'convert file {new_file} succeed')
171182
os.remove(file_path)
172183
return True
173184
except Exception as e:
174-
insert_formatity_task_log_error(task_uid, f"When converting the file {file_path}, an error occurred: {e}")
185+
print(f"convert file {file_path} error: {e}")
186+
insert_formatity_task_log_error(task_uid, f"convert file {file_path} error: {e}")
175187
return False
176188
else:
177189
return True
178190

179191

180192
def convert_word_to_markdown(file_path: str, task_uid):
181193
if file_path.lower().endswith(('.docx', '.doc')):
182-
insert_formatity_task_log_info(task_uid, f'Source file address: {file_path}')
194+
insert_formatity_task_log_info(task_uid, f'Source file address{file_path}')
183195
try:
184196
with open(file_path, "rb") as docx_file:
185197
result = mammoth.convert_to_html(docx_file)
@@ -188,11 +200,12 @@ def convert_word_to_markdown(file_path: str, task_uid):
188200
markdown_file_path = os.path.splitext(file_path)[0] + '.md'
189201
with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
190202
md_file.write(markdown_content)
191-
insert_formatity_task_log_info(task_uid, f'The file {markdown_file_path} has been converted successfully.')
203+
insert_formatity_task_log_info(task_uid, f'convert file {markdown_file_path} succeed')
192204
os.remove(file_path)
193205
return True
194206
except Exception as e:
195-
insert_formatity_task_log_error(task_uid, f"When converting the file {file_path}, an error occurred: {e}")
207+
print(f"convert file {file_path} error: {e}")
208+
insert_formatity_task_log_error(task_uid, f"convert file {file_path} error: {e}")
196209
return False
197210
else:
198211

@@ -201,12 +214,12 @@ def convert_word_to_markdown(file_path: str, task_uid):
201214

202215
def convert_ppt_to_markdown(file_path: str, task_uid):
203216
if file_path.lower().endswith(('.pptx', '.ppt')):
204-
insert_formatity_task_log_info(task_uid, f'Source file address: {file_path}')
217+
insert_formatity_task_log_info(task_uid, f'Source file address{file_path}')
205218
try:
206219
prs = Presentation(file_path)
207220
markdown_content = ""
208221
for i, slide in enumerate(prs.slides):
209-
markdown_content += f" PPT {i + 1}\n\n"
222+
markdown_content += f" lantern slide {i + 1}\n\n"
210223
for shape in slide.shapes:
211224
if hasattr(shape, "text") and shape.text.strip():
212225
text_content = shape.text.strip()
@@ -218,12 +231,63 @@ def convert_ppt_to_markdown(file_path: str, task_uid):
218231
markdown_file_path = os.path.splitext(file_path)[0] + '.md'
219232
with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
220233
md_file.write(markdown_content)
221-
insert_formatity_task_log_info(task_uid, f'The file {markdown_file_path} has been converted successfully.')
234+
insert_formatity_task_log_info(task_uid, f'convert file {markdown_file_path} succeed')
222235
os.remove(file_path)
223236
return True
224237
except Exception as e:
225-
insert_formatity_task_log_error(task_uid, f"When converting the file {file_path}, an error occurred: {e}")
238+
print(f"convert file {file_path} error: {e}")
239+
insert_formatity_task_log_error(task_uid, f"convert file {file_path} error: {e}")
226240
return False
227241
else:
228242

229243
return True
244+
245+
246+
from typing import List, Dict, Tuple
247+
248+
def search_files(folder_path: str, types: List[int]) -> Tuple[bool, List[str]]:
249+
250+
type_map: Dict[int, List[str]] = {
251+
0: ['.ppt', '.pptx'], # PPT
252+
1: ['.doc', '.docx'], # Word
253+
3: ['.xls', '.xlsx'] # Excel
254+
}
255+
256+
257+
target_extensions = set()
258+
for file_type in types:
259+
if file_type in type_map:
260+
for ext in type_map[file_type]:
261+
target_extensions.add(ext.lower())
262+
263+
264+
found_files: List[str] = []
265+
266+
def traverse(current_path: str) -> None:
267+
268+
try:
269+
270+
entries = os.listdir(current_path)
271+
272+
for entry in entries:
273+
entry_path = os.path.join(current_path, entry)
274+
275+
if os.path.isdir(entry_path):
276+
277+
traverse(entry_path)
278+
elif os.path.isfile(entry_path):
279+
280+
file_ext = os.path.splitext(entry)[1].lower()
281+
if file_ext in target_extensions:
282+
found_files.append(entry_path)
283+
284+
except PermissionError:
285+
print(f"No permission to access the folder: {current_path}")
286+
except Exception as e:
287+
print(f"Processing path {current_path} error: {str(e)}")
288+
289+
290+
traverse(folder_path)
291+
292+
293+
return bool(len(found_files) > 0)

data_celery/job/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def run_pipline_job(job_uuid,user_id, user_name, user_token):
3737
if job_obj is not None and job_obj.job_celery_uuid is not None and job_obj.job_celery_uuid != "":
3838
job_celery_uuid = job_obj.job_celery_uuid
3939
break
40-
# time.sleep(1)
40+
time.sleep(1)
4141
if job_celery_uuid == "":
4242
insert_pipline_job_run_task_log_error(job_uuid, f"not found job celery uuid : {job_uuid}")
4343
return False

data_celery/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def get_process_resource_usage_task(worker_name:str,current_ip:str):
197197

198198
def get_process_resource_usage(redis_celery,job_uuid,process_id):
199199
try:
200-
# print(1)
200+
201201
process = psutil.Process(int(process_id))
202202
if process.is_running():
203203
cpu_usage = process.cpu_percent(interval=1)

0 commit comments

Comments
 (0)