|
4 | 4 |
|
5 | 5 | import argparse |
6 | 6 | import os |
| 7 | +from pathlib import Path |
7 | 8 |
|
8 | 9 | import datasets |
9 | 10 | import ray |
|
28 | 29 |
|
29 | 30 | parser = argparse.ArgumentParser() |
30 | 31 | parser.add_argument("--as-test", action="store_true") |
| 32 | +parser.add_argument("--save-dir", type=str, required=True, help="Output directory") |
31 | 33 | args = parser.parse_args() |
| 34 | +args.save_dir = Path(args.save_dir) |
32 | 35 |
|
33 | 36 | SYSTEM_PROMPT = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." # noqa: E501 |
34 | 37 | MAX_TOKENS = 16384 |
| 38 | +# We explicitly set the target number of blocks to help tune performance. |
| 39 | +# For materialized datasets, the number of blocks determined by ray data can be small, |
| 40 | +# especially for a multi-stage pipeline like the one here. |
| 41 | +TARGET_NUM_ROWS_PER_BLOCK = 100 |
| 42 | + |
| 43 | +# Enable more detailed logging of tasks per actor |
| 44 | +ray.init(runtime_env={"env_vars": {"RAY_ENABLE_RECORD_ACTOR_TASK_LOGGING": 1}}) |
| 45 | + |
35 | 46 | # 1. Load datasets |
36 | | -apps_ds = datasets.load_dataset("codeparrot/apps", split="test", trust_remote_code=True) |
| 47 | +apps_ds = datasets.load_dataset( |
| 48 | + "codeparrot/apps", split="test", trust_remote_code=True |
| 49 | +) # 10K |
37 | 50 | taco_ds_medium = datasets.load_dataset( |
38 | | - "BAAI/TACO", split="test", name="MEDIUM", trust_remote_code=True |
39 | | -) |
| 51 | + "BAAI/TACO", split="train", name="MEDIUM", trust_remote_code=True |
| 52 | +) # 3244 |
| 53 | +taco_ds_test = datasets.load_dataset( |
| 54 | + "BAAI/TACO", split="test", name="ALL", trust_remote_code=True |
| 55 | +) # 1000 |
40 | 56 | numina_ds = datasets.load_dataset( |
41 | 57 | "AI-MO/NuminaMath-CoT", split="train", trust_remote_code=True |
42 | 58 | ) |
43 | 59 |
|
44 | 60 | # convert all to ray dataset |
45 | | -apps_ds = ray.data.from_huggingface(apps_ds) |
46 | | -taco_ds_medium = ray.data.from_huggingface(taco_ds_medium) |
| 61 | +apps_ds = ray.data.from_huggingface(apps_ds).repartition( |
| 62 | + num_blocks=None, target_num_rows_per_block=TARGET_NUM_ROWS_PER_BLOCK |
| 63 | +) |
| 64 | +taco_ds_medium = ray.data.from_huggingface( |
| 65 | + taco_ds_medium, |
| 66 | +).repartition(num_blocks=None, target_num_rows_per_block=TARGET_NUM_ROWS_PER_BLOCK) |
47 | 67 | taco_ds_medium = taco_ds_medium.map( |
48 | 68 | taco_coerce_types, fn_args=(taco_ds_medium.schema(),) |
49 | 69 | ) |
50 | | -numina_ds = ray.data.from_huggingface(numina_ds) |
| 70 | +taco_ds_test = ray.data.from_huggingface( |
| 71 | + taco_ds_test, |
| 72 | +).repartition(num_blocks=None, target_num_rows_per_block=TARGET_NUM_ROWS_PER_BLOCK) |
| 73 | +taco_ds_test = taco_ds_test.map(taco_coerce_types, fn_args=(taco_ds_test.schema(),)) |
| 74 | +numina_ds = ray.data.from_huggingface( |
| 75 | + numina_ds, |
| 76 | +).repartition(num_blocks=None, target_num_rows_per_block=TARGET_NUM_ROWS_PER_BLOCK) |
51 | 77 |
|
52 | 78 |
|
53 | 79 | # get subsets from numina based on the source column |
54 | | -numina_ds_amc_aime = numina_ds.filter(lambda x: x["source"] == "amc_aime") |
| 80 | +numina_ds_amc_aime = numina_ds.filter(lambda x: x["source"] == "amc_aime") # 4070 |
55 | 81 | numina_ds_olympiads = numina_ds.filter(lambda x: x["source"] == "olympiads").limit( |
56 | 82 | 20000 |
57 | | -) |
58 | | -numina_ds_math = numina_ds.filter(lambda x: x["source"] == "math") |
| 83 | +) # 20k |
| 84 | +numina_ds_math = numina_ds.filter(lambda x: x["source"] == "math") # 7477 |
59 | 85 |
|
60 | 86 |
|
61 | 87 | if args.as_test: |
62 | | - num_samples = 100 |
| 88 | + num_samples = 5000 |
63 | 89 | apps_ds = apps_ds.limit(num_samples) |
64 | 90 | taco_ds_medium = taco_ds_medium.limit(num_samples) |
| 91 | + taco_ds_test = taco_ds_test.limit(num_samples) |
65 | 92 | numina_ds_amc_aime = numina_ds_amc_aime.limit(num_samples) |
66 | 93 | numina_ds_olympiads = numina_ds_olympiads.limit(num_samples) |
67 | 94 | numina_ds_math = numina_ds_math.limit(num_samples) |
|
70 | 97 | datasets = [ |
71 | 98 | apps_ds, |
72 | 99 | taco_ds_medium, |
| 100 | + taco_ds_test, |
73 | 101 | numina_ds_amc_aime, |
74 | 102 | numina_ds_olympiads, |
75 | 103 | numina_ds_math, |
|
79 | 107 | preprocessors = [ |
80 | 108 | APPSPreprocessor, |
81 | 109 | TACOPreprocessor, |
| 110 | + TACOPreprocessor, |
82 | 111 | NUMINAPreprocessor, |
83 | 112 | NUMINAPreprocessor, |
84 | 113 | NUMINAPreprocessor, |
85 | 114 | ] |
86 | 115 |
|
87 | | -dataset_names = ["apps", "taco", "numina_amc_aime", "numina_math", "numina_olympiads"] |
| 116 | +dataset_names = [ |
| 117 | + "apps", |
| 118 | + "taco_train", |
| 119 | + "taco_test", |
| 120 | + "numina_amc_aime", |
| 121 | + "numina_math", |
| 122 | + "numina_olympiads", |
| 123 | +] |
88 | 124 | scorer_configs = [ |
89 | 125 | dict( |
90 | 126 | cls=APPSScorer, fn_constructor_kwargs=dict(response_column="formatted_response") |
|
93 | 129 | cls=TACOScorer, |
94 | 130 | fn_constructor_kwargs=dict(response_column="formatted_response", backend="ray"), |
95 | 131 | ), |
| 132 | + dict( |
| 133 | + cls=TACOScorer, |
| 134 | + fn_constructor_kwargs=dict(response_column="formatted_response", backend="ray"), |
| 135 | + ), |
96 | 136 | dict( |
97 | 137 | cls=MathEqualScorer, |
98 | 138 | fn_constructor_kwargs=dict( |
|
114 | 154 | ] |
115 | 155 |
|
116 | 156 | for i, ds in enumerate(datasets): |
117 | | - if i < 1: |
118 | | - continue |
119 | 157 | # 1. Preprocess and get model prompts |
120 | 158 | preprocess_cls = preprocessors[i] |
121 | 159 | datasets[i] = ds.map( |
|
126 | 164 | # 2. Get model responses |
127 | 165 |
|
128 | 166 | config = vLLMEngineProcessorConfig( |
129 | | - # model="Qwen/QwQ-32B-Preview", |
130 | | - model="Qwen/Qwen2-0.5B-Instruct", |
| 167 | + model="Qwen/QwQ-32B-Preview", |
| 168 | + # model="Qwen/Qwen2-0.5B-Instruct", |
131 | 169 | engine_kwargs=dict( |
132 | 170 | enable_prefix_caching=True, |
133 | 171 | enable_chunked_prefill=True, |
134 | | - max_num_batched_tokens=16384, |
| 172 | + max_num_batched_tokens=4096, |
| 173 | + tensor_parallel_size=4, |
135 | 174 | ), |
136 | 175 | concurrency=2, |
137 | | - batch_size=20, |
| 176 | + batch_size=128, |
138 | 177 | ) |
139 | 178 |
|
140 | 179 | processor = build_llm_processor( |
|
145 | 184 | {"role": "user", "content": row["user_input"]}, |
146 | 185 | ], |
147 | 186 | sampling_params=dict( |
148 | | - temperature=0.3, |
| 187 | + temperature=0, |
149 | 188 | max_tokens=MAX_TOKENS, |
150 | | - detokenize=False, |
151 | 189 | ), |
152 | 190 | ), |
153 | 191 | postprocess=lambda row: dict( |
|
166 | 204 | # number of processors to run in parallel |
167 | 205 | # Each handles a batch of requests |
168 | 206 | concurrency=1, |
169 | | - batch_size=64, |
| 207 | + batch_size=16, |
170 | 208 | ) |
171 | 209 | # define the reformatter |
172 | 210 | reformatter = build_llm_processor( |
|
219 | 257 | ) |
220 | 258 |
|
221 | 259 | # 6. Save datasets |
222 | | - dir_name = f"data/sky-t1-preview-{dataset_names[i]}" |
223 | | - datasets[i] = datasets[i].materialize() |
224 | | - datasets[i].write_json(os.path.abspath(dir_name)) |
| 260 | + dir_name = args.save_dir / f"sky-t1-preview-{dataset_names[i]}" |
| 261 | + # use absolute path while saving with ray data |
| 262 | + datasets[i].write_json(str(dir_name.expanduser().absolute())) |
225 | 263 |
|
226 | 264 |
|
227 | 265 | # 7. Union |
|
0 commit comments