Skip to content

Commit 325d59a

Browse files
authored
Merge pull request #16 from z275748353/main
Add newly developed operators, internationalize tools
2 parents 9db0688 + ae70b5b commit 325d59a

34 files changed

Lines changed: 1357 additions & 624 deletions
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
name: '教育价值评估并打分'
2+
description: '利用大模型对一部分文本进行教育价值评估并打分,根据分数区间进行过滤。'
3+
type: 'data_refine'
4+
buildin: true
5+
project_name: 'dataflow-demo-process'
6+
dataset_path: '/path/to/your/dataset'
7+
export_path: '/path/to/your/dataset.jsonl'
8+
np: 1
9+
open_tracer: false
10+
trace_num: 3
11+
process:
12+
- annotate_edu_train_bert_scorer_mapper:
13+
auth_token: ''
14+
model_url: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
15+
model_name: 'text-embedding-v4'
16+
dimensions: 1024
17+
query_text: 'What is Deep Learning?'
18+
- text_high_score_filter:
19+
score_field: 'text_score'
20+
min_score: 0
21+
max_score: 5
22+
- text_bloom_filter:
23+
hash_func: md5
24+
initial_capacity: 100
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: '生成高质量中文多轮对话数据集'
2+
description: 'High-quality Chinese multi-round dialogue datasets are produced by automatically generating large models and through quality scoring and semantic deduplication screening.'
3+
type: 'data_refine'
4+
buildin: true
5+
project_name: 'dataflow-demo-process'
6+
dataset_path: ''
7+
exprot_path: '/path/to/your/dataset.jsonl'
8+
np: 1
9+
open_tracer: false
10+
trace_num: 3
11+
process:
12+
- pipeline_magpie_zh_mapper:
13+
auth_token: ''
14+
model_url: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
15+
model_name: 'qwen-plus'
16+
- gather_generated_data_filter:
17+
- encode_and_get_nearest_mapper:
18+
auth_token: ''
19+
model_url: 'https://dashscope.aliyuncs.com/compatible-mode/v1'
20+
model_name: 'text-embedding-v4'
21+
dimensions: 1024
22+
- dedup_and_save_deduplicator:
23+
similarity_threshold: 0.5
24+
nn_indices_key: 'nn_indices'
25+
nn_scores_key: 'nn_scores'

data_engine/config/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ def init_configs(args=None,redirect=True):
7373
type=str,
7474
default='hello_world',
7575
help='Name of your data process project.')
76+
parser.add_argument('--tool_name',
77+
type=str,
78+
default='',
79+
help='Name of the tool being executed.')
7680
parser.add_argument(
7781
'--executor_type',
7882
type=str,

data_engine/core/executor.py

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,27 @@ def __init__(
7171
logger.info('Setting up data ingester...')
7272
insert_pipline_job_run_task_log_info(self.job_uid,
7373
'Setting up data ingester...')
74-
# Only have one embeded ingester: from csghub
75-
self.ingester = load_ingester(
76-
dataset_path = self.cfg.dataset_path,
77-
repo_id = self.cfg.repo_id,
78-
branch = self.cfg.branch,
79-
user_name = self.user_name,
80-
user_token = self.user_token
81-
)
74+
75+
# Check if this is the specific output_only tool by tool name
76+
tool_name = getattr(self.cfg, 'tool_name', '')
77+
is_specific_output_only = (tool_name == 'template_executor_06_common_internal')
78+
79+
# normal_logic
80+
if not is_specific_output_only:
81+
# Only have one embeded ingester: from csghub
82+
self.ingester = load_ingester(
83+
dataset_path = self.cfg.dataset_path,
84+
repo_id = self.cfg.repo_id,
85+
branch = self.cfg.branch,
86+
user_name = self.user_name,
87+
user_token = self.user_token
88+
)
89+
# skip_create_ingester
90+
else:
91+
logger.info('Skipping ingester setup for output_only tool')
92+
insert_pipline_job_run_task_log_info(self.job_uid,
93+
'Skipping ingester setup for output_only tool')
94+
self.ingester = None
8295
# assign src_path as dataset_path to format creation
8396

8497
# whether to use checkpoint mechanism. If it's true, Executor will
@@ -193,44 +206,62 @@ def run(self, load_data_np=None):
193206
:return: processed dataset.
194207
"""
195208
# 0. ingest data
196-
with TRACE_HELPER.trace_block(
197-
"ingest",
198-
parent=get_telemetry_envelope_metadata(),
199-
):
200-
self.src_path = self.ingester.ingest()
201-
logger.info(f'Data ingested from {self.src_path}')
209+
# Skip data ingestion for specific output_only tool
210+
if self.ingester is not None:
211+
with TRACE_HELPER.trace_block(
212+
"ingest",
213+
parent=get_telemetry_envelope_metadata(),
214+
):
215+
self.src_path = self.ingester.ingest()
216+
logger.info(f'Data ingested from {self.src_path}')
217+
insert_pipline_job_run_task_log_info(self.job_uid,
218+
f'Data ingested from {self.src_path}')
219+
else:
220+
logger.info('Skipping data ingestion for output_only tool')
202221
insert_pipline_job_run_task_log_info(self.job_uid,
203-
f'Data ingested from {self.src_path}')
222+
'Skipping data ingestion for output_only tool')
223+
self.src_path = None
204224
# set src_path to format, let format continue it's job
205225

206-
# 1. setup formatter
207-
with TRACE_HELPER.trace_block(
208-
"format",
209-
parent=get_telemetry_envelope_metadata(),
210-
):
211-
logger.info('Setting up data formatter...')
212-
insert_pipline_job_run_task_log_info(self.job_uid,
213-
'Setting up data formatter...')
214-
self.formatter = load_formatter(
215-
self.src_path,
216-
self.cfg.generated_dataset_config,
217-
self.cfg.text_keys, self.cfg.suffixes,
218-
self.cfg.add_suffix
219-
)
220-
221-
# 2. format data
222-
if self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
223-
logger.info('Loading dataset from checkpoint...')
224-
insert_pipline_job_run_task_log_info(self.job_uid,
225-
'Loading dataset from checkpoint...')
226-
dataset = self.ckpt_manager.load_ckpt()
227-
else:
228-
logger.info('Loading dataset from data formatter...')
226+
# 1. setup formatter and load data (skip for output_only tools)
227+
if self.ingester is not None:
228+
with TRACE_HELPER.trace_block(
229+
"format",
230+
parent=get_telemetry_envelope_metadata(),
231+
):
232+
logger.info('Setting up data formatter...')
229233
insert_pipline_job_run_task_log_info(self.job_uid,
230-
'Loading dataset from data formatter...')
231-
if load_data_np is None:
232-
load_data_np = self.cfg.np
233-
dataset = self.formatter.load_dataset(load_data_np, self.cfg)
234+
'Setting up data formatter...')
235+
self.formatter = load_formatter(
236+
self.src_path,
237+
self.cfg.generated_dataset_config,
238+
self.cfg.text_keys, self.cfg.suffixes,
239+
self.cfg.add_suffix
240+
)
241+
242+
# 2. format data
243+
if self.cfg.use_checkpoint and self.ckpt_manager.ckpt_available:
244+
logger.info('Loading dataset from checkpoint...')
245+
insert_pipline_job_run_task_log_info(self.job_uid,
246+
'Loading dataset from checkpoint...')
247+
dataset = self.ckpt_manager.load_ckpt()
248+
else:
249+
logger.info('Loading dataset from data formatter...')
250+
insert_pipline_job_run_task_log_info(self.job_uid,
251+
'Loading dataset from data formatter...')
252+
if load_data_np is None:
253+
load_data_np = self.cfg.np
254+
dataset = self.formatter.load_dataset(load_data_np, self.cfg)
255+
else:
256+
logger.info('Skipping data formatting and loading for output_only tool')
257+
insert_pipline_job_run_task_log_info(self.job_uid,
258+
'Skipping data formatting and loading for output_only tool')
259+
# Create an empty dataset for output_only tools
260+
import datasets
261+
# Create an empty Arrow table with basic schema
262+
empty_table = datasets.Dataset.from_dict({})
263+
from data_engine.core.data import NestedDataset
264+
dataset = NestedDataset(empty_table)
234265

235266
# 3. extract processes
236267
logger.info('Preparing process operators...')
@@ -242,10 +273,11 @@ def run(self, load_data_np=None):
242273
# 4. data process
243274
# - If tracer is open, trace each op after it's processed
244275
# - If checkpoint is open, clean the cache files after each process
276+
dataset_count = len(dataset) if dataset is not None else 0
245277
with TRACE_HELPER.trace_block(
246278
"run",
247279
parent=get_telemetry_envelope_metadata(),
248-
extraAttributes={"dataset_count": len(dataset)}
280+
extraAttributes={"dataset_count": dataset_count}
249281
):
250282
logger.info('Processing data...')
251283
insert_pipline_job_run_task_log_info(self.job_uid,

data_engine/core/executor_tools.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def __init__(self, *, tool_def: Tool_def, params: ExecutedParams):
2424
"""
2525
self.tool_def = tool_def
2626
self.executed_params = params
27+
2728
logger.info(f'Using user_id={self.executed_params.user_id}, '
2829
f'user_name={self.executed_params.user_name}, '
2930
f'user_token={"xxxxxx" if self.executed_params.user_token is not None and len(self.executed_params.user_token)>0 else None}')
@@ -49,7 +50,6 @@ def run(self):
4950
"""
5051
# 1. setup tool
5152
logger.info('Preparing tool...')
52-
5353
tool_obj: TOOL = load_tool(self.tool_def, self.executed_params)
5454

5555
with TRACE_HELPER_TOOL.trace_block(

data_engine/exporter/csghub_exporter.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ def find_next_version(self, origin_branch: str, valid_branches: List):
200200
for b in valid_branches:
201201
if origin_branch == "main" and re.match(r"^v\d+", b):
202202
numStr = b.split(".")[0][1:]
203+
if not numStr.isdigit():
204+
continue
203205
num = int(numStr)
204206
latestNum = max(latestNum, num)
205207
elif b.startswith(origin_branch) and len(b) > len(origin_branch):

data_engine/format/load.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ def load_formatter(dataset_path,
77
text_keys=None,
88
suffixes=[],
99
add_suffix=False,
10+
max_samples=None,
1011
**kwargs) -> BaseFormatter:
1112
"""
1213
Load mixture formatter for multiple different data formats with an optional
@@ -37,5 +38,6 @@ def load_formatter(dataset_path,
3738
text_keys=text_keys,
3839
suffixes=suffixes,
3940
add_suffix=add_suffix,
41+
max_samples=max_samples,
4042
**kwargs)
4143
return formatter

data_engine/ops/deduplicator/dedup_and_save_deduplicator.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import numpy as np
33
from loguru import logger
44

5-
from data_engine.utils.constant import HashKeys
5+
from data_engine.utils.constant import HashKeys, Fields, StatsKeys
66
from ..base_op import OPERATORS, Deduplicator, Sample, Param, DataType
77

88
OP_NAME = 'dedup_and_save_deduplicator'
@@ -17,31 +17,38 @@ class DedupAndSaveDeduplicator(Deduplicator):
1717
"""
1818

1919
def __init__(self,
20-
similarity_threshold: float = 0.95,
20+
similarity_threshold: float = 0.5,
2121
nn_indices_key: str = 'nn_indices',
2222
nn_scores_key: str = 'nn_scores',
23+
fields_to_filter: list = None,
2324
*args,
2425
**kwargs):
2526
super().__init__(*args, **kwargs)
2627
self.similarity_threshold = similarity_threshold
2728
self.nn_indices_key = nn_indices_key
2829
self.nn_scores_key = nn_scores_key
30+
self.fields_to_filter = fields_to_filter or ['embedding', 'nn_indices', 'nn_scores', 'text', 'instruction', 'response']
2931

3032
def compute_hash(self, sample):
3133
# This method is a placeholder to fit the framework.
3234
# The actual logic doesn't rely on this hash.
3335
if self.nn_indices_key not in sample or self.nn_scores_key not in sample:
3436
sample[self.nn_indices_key] = [[]]
3537
sample[self.nn_scores_key] = [[]]
36-
sample[HashKeys.similarity_hash] = f"similarity_data_{id(sample)}"
38+
# Do not create the similarity_hash field because the actual deduplication logic does not require it
39+
# sample[HashKeys.similarity_hash] = f"similarity_data_{id(sample)}"
3740
return sample
3841

3942
def process(self, dataset, show_num=0):
43+
print(f"[dedup_and_save_deduplicator] Input: {len(dataset)} samples")
44+
4045
if len(dataset) <= 1:
46+
print(f"[dedup_and_save_deduplicator] Output: {len(dataset)} samples (no deduplication needed)")
4147
return dataset, {}
4248

4349
# Convert dataset to pandas DataFrame for easier graph processing
4450
df = dataset.to_pandas()
51+
print(f"[dedup_and_save_deduplicator] Processing similarity graph with threshold: {self.similarity_threshold}")
4552

4653
# Create a graph and add all samples as nodes
4754
G = nx.Graph()
@@ -77,6 +84,7 @@ def process(self, dataset, show_num=0):
7784

7885
# Filter the original dataset to keep only the selected samples
7986
filtered_dataset = dataset.select(indices_to_keep)
87+
print(f"[dedup_and_save_deduplicator] Output: {len(filtered_dataset)} samples after deduplication (removed {len(dataset) - len(filtered_dataset)} duplicates)")
8088

8189
# For tracing, sample some duplicate pairs from components with more than one member
8290
dup_pairs = {}
@@ -89,7 +97,28 @@ def process(self, dataset, show_num=0):
8997
dup_pairs[group_key] = [dataset[i] for i in sorted_component[:2]]
9098
processed_components += 1
9199

92-
return filtered_dataset, dup_pairs
100+
print(f"[dedup_and_save_deduplicator] Found {len(connected_components)} connected components")
101+
102+
# Unified processing of field filtering - Move the specified field to stats
103+
def move_fields_to_stats(sample):
104+
if Fields.stats not in sample:
105+
sample[Fields.stats] = {}
106+
107+
# move_the_specified_field_to_stats
108+
for field in self.fields_to_filter:
109+
if field in sample:
110+
# Obtain the corresponding StatsKeys constant based on the field name
111+
stats_key = getattr(StatsKeys, field, field)
112+
sample[Fields.stats][stats_key] = sample[field]
113+
del sample[field]
114+
115+
return sample
116+
117+
# applicationFieldFiltering
118+
final_dataset = filtered_dataset.map(move_fields_to_stats)
119+
print(f"[dedup_and_save_deduplicator] Filtered fields {self.fields_to_filter} to stats")
120+
121+
return final_dataset, dup_pairs
93122

94123
@classmethod
95124
@property
@@ -119,7 +148,8 @@ def sample(cls):
119148
@property
120149
def init_params(cls):
121150
return [
122-
Param("similarity_threshold", DataType.FLOAT, {}, 0.95),
151+
Param("similarity_threshold", DataType.FLOAT, {}, 0.5),
123152
Param("nn_indices_key", DataType.STRING, {}, "nn_indices"),
124153
Param("nn_scores_key", DataType.STRING, {}, "nn_scores"),
154+
Param("fields_to_filter", DataType.LIST, {}, ["embedding", "nn_indices", "nn_scores", "text", "instruction", "response"]),
125155
]

0 commit comments

Comments
 (0)