Skip to content

Commit d68cc42

Browse files
JingTYHaiHui886
authored andcommitted
Download the required files for the flagged_words_filter during the build process.
1 parent 77827f3 commit d68cc42

4 files changed

Lines changed: 115 additions & 15 deletions

File tree

Dockerfile

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,26 @@ RUN git config --global user.email "dataflow@opencsg.com" && \
5454
git config --global user.name "dataflow" && \
5555
git config --global --add safe.directory '*'
5656

57+
# Download required resources for offline deployment
58+
# Create default cache directories
59+
RUN mkdir -p /root/.cache/data_engine/assets && \
60+
mkdir -p /root/.cache/data_engine/models
61+
62+
# Download JSON resources (flagged_words and stopwords)
63+
RUN wget -O /root/.cache/data_engine/assets/flagged_words.json \
64+
https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/flagged_words.json && \
65+
wget -O /root/.cache/data_engine/assets/stopwords.json \
66+
https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/stopwords.json
67+
68+
# Download SentencePiece models (Chinese and English)
69+
RUN wget -O /root/.cache/data_engine/models/zh.sp.model \
70+
https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/models/zh.sp.model && \
71+
wget -O /root/.cache/data_engine/models/en.sp.model \
72+
https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/models/en.sp.model
73+
74+
# Verify downloaded files
75+
RUN ls -lh /root/.cache/data_engine/assets/ && \
76+
ls -lh /root/.cache/data_engine/models/
77+
5778
# Start fastapi API Server
5879
EXPOSE 8000

data_engine/ops/filter/flagged_words_filter.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
from ..common import (SPECIAL_CHARACTERS, get_words_from_document,
1414
words_refinement)
1515
from ..op_fusion import INTER_WORDS
16+
from loguru import logger
17+
import os
18+
from data_celery.mongo_tools.tools import (
19+
insert_pipline_job_run_task_log_info,
20+
insert_pipline_job_run_task_log_warning,
21+
insert_pipline_job_run_task_log_error
22+
)
1623

1724
OP_NAME = 'flagged_words_filter'
1825

@@ -63,16 +70,58 @@ def __init__(self,
6370
self.words_aug_join_char = words_aug_join_char
6471
self.model_key = None
6572

73+
# Log flagged_words_filter initialization
74+
msg = f"[flagged_words_filter] Initializing with lang='{lang}', tokenization={tokenization}"
75+
logger.info(msg)
76+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
77+
78+
msg = f"[flagged_words_filter] flagged_words_dir: {flagged_words_dir}"
79+
logger.info(msg)
80+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
81+
82+
# Check if flagged_words file exists before loading
83+
expected_file = os.path.join(flagged_words_dir, 'flagged_words.json')
84+
if os.path.exists(expected_file):
85+
msg = f"[flagged_words_filter] ✓ Found local flagged_words.json at: {expected_file}"
86+
logger.info(msg)
87+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
88+
else:
89+
msg = f"[flagged_words_filter] ✗ Local flagged_words.json NOT found at: {expected_file}, will attempt download"
90+
logger.warning(msg)
91+
insert_pipline_job_run_task_log_warning(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
92+
93+
# Load flagged words
94+
msg = "[flagged_words_filter] Loading flagged words..."
95+
logger.info(msg)
96+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
97+
6698
self.FLAGGED_WORDS = load_words_asset(words_dir=flagged_words_dir,
6799
words_type='flagged_words')
100+
101+
total_words = sum(len(words) for words in self.FLAGGED_WORDS.values())
102+
msg = f"[flagged_words_filter] ✓ Successfully loaded flagged_words: {len(self.FLAGGED_WORDS)} languages, {total_words} total words"
103+
logger.info(msg)
104+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
68105

69106
if 'all' not in self.FLAGGED_WORDS:
70107
self.FLAGGED_WORDS['all'] = [
71108
val for vals in self.FLAGGED_WORDS.values() for val in vals
72109
]
110+
73111
if tokenization:
112+
msg = f"[flagged_words_filter] Tokenization enabled, preparing sentencepiece model for lang='{lang}'"
113+
logger.info(msg)
114+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
115+
116+
msg = f"[flagged_words_filter] Expected model file: {lang}.sp.model"
117+
logger.info(msg)
118+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
119+
74120
self.model_key = prepare_model(model_type='sentencepiece',
75121
lang=lang)
122+
msg = f"[flagged_words_filter] ✓ Successfully prepared sentencepiece model"
123+
logger.info(msg)
124+
insert_pipline_job_run_task_log_info(self.job_uid, msg, operator_name=OP_NAME, operator_index=self.pipline_index)
76125

77126
def compute_stats(self, sample, context=False):
78127
# check if it's computed already

data_engine/utils/asset_utils.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,35 @@ def load_words_asset(words_dir: str, words_type: str):
3333
words_dict = {}
3434
os.makedirs(words_dir, exist_ok=True)
3535

36+
logger.info(f'[load_words_asset] Searching for {words_type} in directory: {words_dir}')
37+
3638
# try to load words from `words_type` file
3739
for filename in os.listdir(words_dir):
3840
if filename.endswith('.json') and words_type in filename:
39-
with open(os.path.join(words_dir, filename), 'r') as file:
41+
file_path = os.path.join(words_dir, filename)
42+
logger.info(f'[load_words_asset] ✓ Found local file: {file_path}')
43+
with open(file_path, 'r') as file:
4044
loaded_words = json.load(file)
4145
for key in loaded_words:
4246
if key in words_dict:
4347
words_dict[key] += loaded_words[key]
4448
else:
4549
words_dict[key] = loaded_words[key]
50+
logger.info(f'[load_words_asset] ✓ Successfully loaded from local file (no network access)')
51+
4652
# if the asset file is not found, then download it from ASSET_LINKS
4753
if not bool(words_dict):
48-
logger.info(f'Specified {words_dir} does not contain '
49-
f'any {words_type} files in json format, now '
50-
'download the one cached by data_engine team')
51-
response = requests.get(ASSET_LINKS[words_type])
54+
download_url = ASSET_LINKS[words_type]
55+
logger.warning(f'[load_words_asset] ✗ Local file NOT found in {words_dir}')
56+
logger.info(f'[load_words_asset] ⬇ Attempting to download from: {download_url}')
57+
58+
response = requests.get(download_url)
5259
words_dict = response.json()
60+
5361
# cache the asset file locally
5462
cache_path = os.path.join(words_dir, f'{words_type}.json')
5563
with open(cache_path, 'w') as file:
5664
json.dump(words_dict, file)
65+
logger.info(f'[load_words_asset] ✓ Downloaded and cached to: {cache_path}')
5766

5867
return words_dict

data_engine/utils/model_utils.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,38 +65,53 @@ def check_model(model_name, force=False):
6565
the model file maybe incomplete for some reason, so need to
6666
download again forcefully.
6767
"""
68+
logger.info(f'[check_model] Checking model: {model_name}, force={force}')
69+
6870
# check for local model
6971
if os.path.exists(model_name):
72+
logger.info(f'[check_model] ✓ Found model at absolute path: {model_name}')
7073
return model_name
7174

7275
if not os.path.exists(DJMC):
76+
logger.info(f'[check_model] Creating models cache directory: {DJMC}')
7377
os.makedirs(DJMC)
7478

7579
# check if the specified model exists. If it does not exist, download it
7680
cached_model_path = os.path.join(DJMC, model_name)
81+
logger.info(f'[check_model] Expected model path: {cached_model_path}')
82+
83+
if os.path.exists(cached_model_path):
84+
logger.info(f'[check_model] ✓ Found cached model (no network access needed)')
85+
if not force:
86+
return cached_model_path
87+
7788
if force:
7889
if os.path.exists(cached_model_path):
7990
os.remove(cached_model_path)
80-
logger.info(
81-
f'Model [{cached_model_path}] invalid, force to downloading...'
91+
logger.warning(
92+
f'[check_model] Model [{cached_model_path}] marked invalid, force downloading...'
8293
)
8394
else:
84-
logger.info(
85-
f'Model [{cached_model_path}] not found. Downloading...')
95+
logger.warning(
96+
f'[check_model] ✗ Model [{cached_model_path}] not found. Attempting download...')
8697

8798
try:
8899
model_link = os.path.join(MODEL_LINKS, model_name)
100+
logger.info(f'[check_model] ⬇ Downloading from primary link: {model_link}')
89101
wget.download(model_link, cached_model_path, bar=None)
102+
logger.info(f'[check_model] ✓ Successfully downloaded to: {cached_model_path}')
90103
except: # noqa: E722
91104
try:
92105
backup_model_link = os.path.join(
93106
get_backup_model_link(model_name), model_name)
107+
logger.warning(f'[check_model] Primary download failed, trying backup: {backup_model_link}')
94108
wget.download(backup_model_link, cached_model_path, bar=None)
109+
logger.info(f'[check_model] ✓ Successfully downloaded from backup')
95110
except: # noqa: E722
96111
logger.error(
97-
f'Downloading model [{model_name}] error. '
98-
f'Please retry later or download it into {DJMC} '
99-
f'manually from {model_link} or {backup_model_link} ')
112+
f'[check_model] ✗ Download failed for [{model_name}]. '
113+
f'Please download it manually into {DJMC} '
114+
f'from {model_link} or {backup_model_link} ')
100115
exit(1)
101116
return cached_model_path
102117

@@ -127,12 +142,18 @@ def prepare_sentencepiece_model(model_path):
127142
"""
128143
import sentencepiece
129144

130-
logger.info('Loading sentencepiece model...')
145+
logger.info(f'[prepare_sentencepiece_model] Preparing sentencepiece model: {model_path}')
131146
sentencepiece_model = sentencepiece.SentencePieceProcessor()
132147
try:
133-
sentencepiece_model.load(check_model(model_path))
148+
model_file = check_model(model_path)
149+
logger.info(f'[prepare_sentencepiece_model] Loading model from: {model_file}')
150+
sentencepiece_model.load(model_file)
151+
logger.info(f'[prepare_sentencepiece_model] ✓ Successfully loaded sentencepiece model (no download needed)')
134152
except: # noqa: E722
135-
sentencepiece_model.load(check_model(model_path, force=True))
153+
logger.warning(f'[prepare_sentencepiece_model] First load attempt failed, retrying with force=True...')
154+
model_file = check_model(model_path, force=True)
155+
sentencepiece_model.load(model_file)
156+
logger.info(f'[prepare_sentencepiece_model] ✓ Successfully loaded sentencepiece model after download')
136157
return sentencepiece_model
137158

138159

0 commit comments

Comments
 (0)