|
33 | 33 | SYSTEM_PROMPT = "You are a helpful and harmless assistant. You are Qwen developed by Alibaba. You should think step-by-step." # noqa: E501 |
34 | 34 | MAX_TOKENS = 16384 |
35 | 35 | # 1. Load datasets |
36 | | -apps_ds = datasets.load_dataset( |
37 | | - "codeparrot/apps", |
38 | | - split="test", |
| 36 | +apps_ds = datasets.load_dataset("codeparrot/apps", split="test", trust_remote_code=True) |
| 37 | +taco_ds_medium = datasets.load_dataset( |
| 38 | + "BAAI/TACO", split="test", name="MEDIUM", trust_remote_code=True |
| 39 | +) |
| 40 | +numina_ds = datasets.load_dataset( |
| 41 | + "AI-MO/NuminaMath-CoT", split="train", trust_remote_code=True |
39 | 42 | ) |
40 | | -taco_ds_medium = datasets.load_dataset("BAAI/TACO", split="test", name="MEDIUM") |
41 | | -numina_ds = datasets.load_dataset("AI-MO/NuminaMath-CoT", split="train") |
42 | 43 |
|
43 | 44 | # convert all to ray dataset |
44 | 45 | apps_ds = ray.data.from_huggingface(apps_ds) |
|
75 | 76 | ] |
76 | 77 |
|
77 | 78 | # these are user-defined simple preprocessing functions to go from entry -> prompt |
78 | | -preprocess_fns = [ |
79 | | - APPSPreprocessor(), |
80 | | - TACOPreprocessor(), |
81 | | - NUMINAPreprocessor(), |
82 | | - NUMINAPreprocessor(), |
83 | | - NUMINAPreprocessor(), |
| 79 | +preprocessors = [ |
| 80 | + APPSPreprocessor, |
| 81 | + TACOPreprocessor, |
| 82 | + NUMINAPreprocessor, |
| 83 | + NUMINAPreprocessor, |
| 84 | + NUMINAPreprocessor, |
84 | 85 | ] |
85 | 86 |
|
86 | | -numina_scorer = MathEqualScorer( |
87 | | - response_column="formatted_response", answer_column="solution" |
88 | | -) |
89 | | -scorers = [ |
90 | | - APPSScorer(response_column="formatted_response"), |
91 | | - TACOScorer(response_column="formatted_response", backend="ray"), |
92 | | - numina_scorer, |
93 | | - numina_scorer, |
94 | | - numina_scorer, |
95 | | -] |
96 | 87 | dataset_names = ["apps", "taco", "numina_amc_aime", "numina_math", "numina_olympiads"] |
| 88 | +scorer_configs = [ |
| 89 | + dict( |
| 90 | + cls=APPSScorer, fn_constructor_kwargs=dict(response_column="formatted_response") |
| 91 | + ), |
| 92 | + dict( |
| 93 | + cls=TACOScorer, |
| 94 | + fn_constructor_kwargs=dict(response_column="formatted_response", backend="ray"), |
| 95 | + ), |
| 96 | + dict( |
| 97 | + cls=MathEqualScorer, |
| 98 | + fn_constructor_kwargs=dict( |
| 99 | + response_column="formatted_response", answer_column="solution" |
| 100 | + ), |
| 101 | + ), |
| 102 | + dict( |
| 103 | + cls=MathEqualScorer, |
| 104 | + fn_constructor_kwargs=dict( |
| 105 | + response_column="formatted_response", answer_column="solution" |
| 106 | + ), |
| 107 | + ), |
| 108 | + dict( |
| 109 | + cls=MathEqualScorer, |
| 110 | + fn_constructor_kwargs=dict( |
| 111 | + response_column="formatted_response", answer_column="solution" |
| 112 | + ), |
| 113 | + ), |
| 114 | +] |
| 115 | + |
97 | 116 | for i, ds in enumerate(datasets): |
98 | | - datasets[i] = ds.map(preprocess_fns[i]) |
| 117 | + if i < 1: |
| 118 | + continue |
| 119 | + # 1. Preprocess and get model prompts |
| 120 | + preprocess_cls = preprocessors[i] |
| 121 | + datasets[i] = ds.map( |
| 122 | + preprocess_cls, |
| 123 | + concurrency=5, |
| 124 | + ) |
| 125 | + |
| 126 | + # 2. Get model responses |
99 | 127 |
|
100 | 128 | config = vLLMEngineProcessorConfig( |
101 | | - model="Qwen/QwQ-32B-Preview", |
102 | | - # model="Qwen/Qwen2-0.5B-Instruct", |
| 129 | + # model="Qwen/QwQ-32B-Preview", |
| 130 | + model="Qwen/Qwen2-0.5B-Instruct", |
103 | 131 | engine_kwargs=dict( |
104 | 132 | enable_prefix_caching=True, |
105 | 133 | enable_chunked_prefill=True, |
106 | 134 | max_num_batched_tokens=16384, |
107 | 135 | ), |
108 | 136 | concurrency=2, |
109 | | - batch_size=64, |
| 137 | + batch_size=20, |
110 | 138 | ) |
111 | 139 |
|
112 | 140 | processor = build_llm_processor( |
|
118 | 146 | ], |
119 | 147 | sampling_params=dict( |
120 | 148 | temperature=0.3, |
121 | | - max_tokens=20, |
| 149 | + max_tokens=MAX_TOKENS, |
122 | 150 | detokenize=False, |
123 | 151 | ), |
124 | 152 | ), |
|
130 | 158 | datasets[i] = processor(datasets[i]) |
131 | 159 |
|
132 | 160 | # 3. Reformat the examples into a structured format |
| 161 | + |
133 | 162 | # define a configuration for the reformatter |
134 | 163 | config = HttpRequestProcessorConfig( |
135 | 164 | url="https://api.openai.com/v1/chat/completions", |
136 | 165 | headers={"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"}, |
137 | 166 | # number of processors to run in parallel |
138 | 167 | # Each handles a batch of requests |
139 | 168 | concurrency=1, |
| 169 | + batch_size=64, |
140 | 170 | ) |
141 | 171 | # define the reformatter |
142 | 172 | reformatter = build_llm_processor( |
|
170 | 200 | datasets[i] = reformatter(datasets[i]) |
171 | 201 |
|
172 | 202 | # 4. Rejection Sampling based on scoring |
173 | | - datasets[i] = datasets[i].map(scorers[i]) |
174 | | - score_column = scorers[i].SCORE_COLUMN |
| 203 | + scorer_cls, fn_constructor_kwargs = ( |
| 204 | + scorer_configs[i]["cls"], |
| 205 | + scorer_configs[i]["fn_constructor_kwargs"], |
| 206 | + ) |
| 207 | + datasets[i] = datasets[i].map( |
| 208 | + scorer_cls, concurrency=4, fn_constructor_kwargs=fn_constructor_kwargs |
| 209 | + ) |
| 210 | + score_column = scorer_cls.SCORE_COLUMN |
175 | 211 | datasets[i] = datasets[i].filter(lambda x, sc=score_column: x[sc]) |
176 | 212 |
|
177 | 213 | # 5. Convert to ShareGPT format |
178 | | - datasets[i] = datasets[i].map(convert_to_sharegpt_format) |
| 214 | + datasets[i] = datasets[i].map( |
| 215 | + convert_to_sharegpt_format, |
| 216 | + fn_kwargs=dict( |
| 217 | + prompt_column="user_input", response_column="formatted_response" |
| 218 | + ), |
| 219 | + ) |
179 | 220 |
|
180 | 221 | # 6. Save datasets |
181 | 222 | dir_name = f"data/sky-t1-preview-{dataset_names[i]}" |
182 | 223 | datasets[i] = datasets[i].materialize() |
183 | 224 | datasets[i].write_json(os.path.abspath(dir_name)) |
| 225 | + |
| 226 | + |
| 227 | +# 7. Union |
| 228 | + |
| 229 | +# final_dataset = datasets[0].union(*datasets[1:]) |
| 230 | +# dir_name = f"data/sky-t1-preview-full" |
| 231 | +# # save in folder as a single JSON file |
| 232 | +# final_dataset.repartition(1).write_json(os.path.abspath(dir_name)) |
0 commit comments