diff --git a/hf_olmo/convert_olmo_to_hf.py b/hf_olmo/convert_olmo_to_hf.py index 2e0a9e074..731488e9e 100644 --- a/hf_olmo/convert_olmo_to_hf.py +++ b/hf_olmo/convert_olmo_to_hf.py @@ -309,6 +309,10 @@ def main(): print(f"Converted checkpoint saved to {args.destination_dir}") + # remove local dir copy + print(f"Removing temporary local dir: {local_checkpoint_dir}") + shutil.rmtree(local_checkpoint_dir) + if __name__ == "__main__": main() diff --git a/install_torch.sh b/install_torch.sh new file mode 100644 index 000000000..5ac68ad6e --- /dev/null +++ b/install_torch.sh @@ -0,0 +1,2 @@ +#!/bin/bash +pip install torch diff --git a/olmo/util.py b/olmo/util.py index aad77eb1c..3f4093c7c 100644 --- a/olmo/util.py +++ b/olmo/util.py @@ -30,6 +30,7 @@ from olmo_data.data import get_data_path + from .aliases import PathOrStr from .exceptions import ( OLMoCliError, diff --git a/pyproject.toml b/pyproject.toml index 87bd75591..3ccd96ab2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "cached_path>=1.6.2", "transformers", "importlib_resources", + "beaker-gantry", + "datasets" ] [project.optional-dependencies] diff --git a/scripts/convert_checkpoints.sh b/scripts/convert_checkpoints.sh new file mode 100644 index 000000000..311c0b420 --- /dev/null +++ b/scripts/convert_checkpoints.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +# Converts s3 checkpoints into WEKA +# To be run at the top of the root of OLMo repository. +# Script requires the use of GANTRY and AWS access to WEKA +# +# Usage: scripts/convert_checkpoints.sh [-s] +# -s if converted checkpoint is found in s3, then save to weka +# -c sanity check; don't actually do conversion. just go through the motions and print stuff +# +# calls: convert_checkpoints_batch.py +# usage: convert_checkpoints_batch.py [-h] +# (--checkpoint-path CHECKPOINT_PATH | --checkpoint-path-file CHECKPOINT_PATH_FILE) +# [--weka-load-dir WEKA_LOAD_DIR] +# [--weka-prefix WEKA_PREFIX] +# [--sanity-check] [--save-to-weka] +# +# Example use: +# Run: +# sh scripts/convert_checkpoints.sh s3://ai2-llm/checkpoints/cheap_decisions/dolma-v1-6-and-sources-baseline-3x-code-1B-N-1T-D-mitchish1-001/step9* +# This will convert all models in the directory and save them to: +# weka://oe-eval-default/ai2-llm/checkpoints/cheap_decisions/dolma-v1-6-and-sources-baseline-3x-code-1B-N-1T-D-mitchish1-001-hf/step9* +# +# It will first, though, check that the weka directory doesn't exist AND that s3 doesn't have a corresponding directory (so as not to replicate what conversions already made) +# +# ASSUMPTIONS +# - INPUT must be on s3. Multiple wildcards allowed +# - OUTPUT to weka is saved to the path as found on s3 with "-hf" suffix appended to the path +# - Assumes tokenizer allenai/gpt-neox-olmo-dolma-v1_5.json +# +# OUTPUT logs +# - saves log.jsonl. For every checkpoint found given input: +# - "unprocessed_path" := checkpoint path to convert +# - "converted_path" := checkpoint converted path +# - "conversion_status" := [new | existing (already in weka) | existing-downloaded (from s3) ] +# - "date" := datestamp +# - "error" := error if any conversions didn't pan out for any reason +# - saves model_checkpoints.jsonl: this is input file is formatted for oe-eval-internal experiments +# - example log files for the following run: +# > sh scripts/convert_checkpoints.sh s3://ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step91*6-unsharded +# log.jsonl: +# {"unprocessed_path": "s3://ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9176-unsharded", "converted_path": "weka://oe-eval-default/ianm/ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9176-unsharded-hf", "conversion": "existing", "date_time": "Oct-04-2024_2012", "error": ""} +# {"unprocessed_path": "s3://ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9166-unsharded", "converted_path": "weka://oe-eval-default/ianm/ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9166-unsharded-hf", "conversion": "existing", "date_time": "Oct-04-2024_2012", "error": ""} +# {"unprocessed_path": "s3://ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9186-unsharded", "converted_path": "weka://oe-eval-default/ianm/ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC/step9186-unsharded-hf", "conversion": "existing", "date_time": "Oct-04-2024_2012", "error": ""} +# model_checkpoints.jsonl: +# {"model_name": "baseline-300M-1xC", "checkpoints_location": "weka://oe-eval-default/ianm/ai2-llm/checkpoints/OLMo-ladder/baseline-300M-1xC", "revisions": ["step9176-unsharded-hf", "step9166-unsharded-hf", "step9186-unsharded-hf"]} +# +# SH run SPECIFICATION DEFAULTS: +# - Budget for oe-eval (see below) +# - Loading for weka weka://oe-eval-default/ (see below) +# - Gantry experiments saved to beaker://ai2/cheap-decisions +# - Weka prefix is used for model_checkpoints.jsonl +# +# TODOs +# - Make tokenizer updatable + +CHECKPOINT_PATH=$1 +SAVE_TO_WEKA="" +SANITY_CHECK="" +shift + +usage() { + echo "Usage: $0 [-s]" + echo " -s --save-to-weka" + echo " -c --sanity-check" + exit 1; +} + +while getopts "sc" opt; +do + case $opt in + s) SAVE_TO_WEKA="--save-to-weka" ;; + c) SANITY_CHECK="--sanity-check" ;; # mostly useful for local test runs - it will stop from doing any copying or conversions. + *) usage ;; + esac +done + +#echo "python scripts/convert_checkpoints_batch.py --checkpoint-path $CHECKPOINT_PATH --weka-load-dir '/data/input' --weka-prefix 'weka://oe-eval-default' $SAVE_TO_WEKA $SANITY_CHECK" + +gantry run \ + --description "Converting $CHECKPOINT_PATH" \ + --allow-dirty \ + --workspace ai2/cheap-decisions \ + --priority normal \ + --gpus 0 \ + --preemptible \ + --cluster ai2/jupiter-cirrascale-2 \ + --budget ai2/oe-eval \ + --env-secret AWS_ACCESS_KEY_ID=JENA_AWS_ACCESS_KEY_ID \ + --env-secret AWS_SECRET_ACCESS_KEY=JENA_AWS_SECRET_ACCESS_KEY \ + --shared-memory 10GiB \ + --weka=oe-eval-default:/data/input \ + --yes \ + -- /bin/bash -c "python scripts/convert_checkpoints_batch.py --checkpoint-path $CHECKPOINT_PATH --weka-load-dir '/data/input' --weka-prefix 'weka://oe-eval-default' $SAVE_TO_WEKA $SANITY_CHECK" + diff --git a/scripts/convert_checkpoints_batch.py b/scripts/convert_checkpoints_batch.py new file mode 100644 index 000000000..d6a3bfee1 --- /dev/null +++ b/scripts/convert_checkpoints_batch.py @@ -0,0 +1,275 @@ +""" +Modification of s3_unshard_to_hf.py +Wrapper for hf_olmo/convert_olmo_to_hf.py + +Takes a model checkpoint stored on S3, unshards, and converts to HF format. +Saves the converted checkpoints to weka. +Requires AWS CLI to be installed and configured. +""" + +import argparse +import json +import os +import subprocess +import time +from pathlib import Path +from typing import Dict, List + +import boto3 +from gantry import RESULTS_DIR + +# possible converted locations. +# "self" is the target location where the converted model would be saved +# key: template, value: description +# template: MUST obey .format(load_dir, retain_path_name) + +WEKA_CHECK_LOCATIONS_PREFIXES = {"{}/{}-hf/pytorch_model.bin": "self", "{}/ianm/{}-hf/pytorch_model.bin": "ian's"} + + +def convert_checkpoint(cps, load_dir="/data/input", sanity_check=False, weka_prefix="/weka", save_to_weka=False): + s3_client = boto3.client("s3") + s3_resource = boto3.resource("s3") + + cps = expand_paths(cps, s3_client) + + print(f">>> Total of {len(cps)} paths to process. <<<", flush=True) + + processed: Dict = {} + errored: Dict = {} + + # Convert to old-style checkpoint. + for checkpoint_path in cps: + print("\n\n------------------------------------------------------------", flush=True) + print(f"\nProcessing Checkpoint: {checkpoint_path}\n", flush=True) + + error = "" + converted_path = "" + existing_location = "" + conversion_status = "" + + # sort out paths, bucket names, and so on ... + path_bits = checkpoint_path.strip("/").replace("s3://", "").split("/") + s3_bucket_name = path_bits[0] + s3_prefix = "/".join(path_bits[1:]) + temp_path = "/".join(path_bits) # checkpoint_path.replace('s3://', '').strip('/') + local_path = f"{load_dir}/{temp_path}-hf/" + + # the converted model may already exist in local_path or in + path_found = False + potential_existing_locations = [ + candidate_loc.format(load_dir, temp_path) for candidate_loc in WEKA_CHECK_LOCATIONS_PREFIXES + ] + for loc in potential_existing_locations: + if os.path.exists(loc): + existing_location = loc.replace("/pytorch_model.bin", "") + path_found = True + break + + # if one of the potential existing location has converted model in it then use that + if path_found: + # then there is no conversion to do. + conversion_status = "existing" + converted_path = existing_location + print(f"Converted Checkpoint Found: {converted_path}\n", flush=True) + else: + s3_bucket = s3_resource.Bucket(s3_bucket_name) + s3_hf_exists = s3_path_exists(s3_bucket, s3_prefix, s3_bucket_name) + + # if s3 already has a location for converted model then use that + if s3_hf_exists is not None: + path_found = True + print(f"Converted Checkpoint Found: {s3_hf_exists}", flush=True) + + # if save to weka flag is passed, then download the s3 converted model to the local path + if save_to_weka: + copy_s3_to_local( + s3_bucket, s3_prefix, local_path, local_path.replace(load_dir, weka_prefix), sanity_check + ) + conversion_status = "existing-downloaded" + converted_path = local_path + else: + conversion_status = "existing" + converted_path = s3_hf_exists + + # if no existing conversions are found then process and save to local path + if not path_found: + conversion_status = "new" + converted_path = local_path + conversion_cmd = f"python hf_olmo/convert_olmo_to_hf.py --checkpoint-dir '{checkpoint_path}' --destination-dir '{local_path}' --tokenizer 'allenai/gpt-neox-olmo-dolma-v1_5'" + + if sanity_check: + print("SANITY CHECK MODE (not running the conversion)") + print(conversion_cmd + "\n") + else: + try: + subprocess.run(conversion_cmd, shell=True, check=True) + except subprocess.CalledProcessError as e: + print(f"Error during checkpoint conversion: {checkpoint_path}") + error = { + 'error_code': e.returncode, + 'error_stdout': e.stdout + } + conversion_status = "error" + converted_path = "" + + timestamp = time.strftime("%b-%d-%Y_%H%M", time.localtime()) + + # Keep info for log.jsonl + local_log = { + "unprocessed_path": checkpoint_path, + "converted_path": converted_path.replace(load_dir, weka_prefix), + "conversion": conversion_status, + "date_time": timestamp, + "error": error, + } + + if conversion_status == 'error': + errored[checkpoint_path] = { + "unprocessed_path": checkpoint_path, + "date_time": timestamp, + "error": error + } + else: + # output model checkpoint location for eval scripts + curr = Path(converted_path) + parent = curr.parent + if parent.name not in processed: + processed[parent.name] = { + "model_name": parent.name, + "checkpoints_location": str(parent).replace(load_dir, weka_prefix), + "revisions": [curr.name], + } + else: + processed[parent.name]["revisions"].append(curr.name) + + # Output Log + if not sanity_check: + with open(os.path.join(RESULTS_DIR, "log.jsonl"), "a+") as fout: + fout.write(json.dumps(local_log) + "\n") + + # Output checkpoint location for eval scripts + if not sanity_check: + with open(os.path.join(RESULTS_DIR, "model_checkpoints.jsonl"), "w") as fout: + for _, p in processed.items(): + fout.write(json.dumps(p) + "\n") + if len(errored) > 0: + with open(os.path.join(RESULTS_DIR, "errors.jsonl"), "w") as fout: + for _, p in errored.items(): + fout.write(json.dumps(p) + "\n") + + +def s3_path_exists(bucket, prefix, bucket_name): + # look for pytorch_model.bin in directories ending with -hf or -hf-olmo. + objs = list(bucket.objects.filter(Prefix=prefix + "-hf/pytorch_model.bin")) + if len(objs) > 0: + return f"s3://{bucket_name}/{prefix}-hf" + else: + objs2 = list(bucket.objects.filter(Prefix=prefix + "-hf-olmo/pytorch_model.bin")) + return f"s3://{bucket_name}/{prefix}-hf-olmo" if (len(objs2) > 0) else None + + +def copy_s3_to_local(bucket, prefix, local_path, display_name, sanity_check): + # if not os.path.exists(os.path.dirname(local_path)): + print(f"Downloading checkpoint to {display_name}\n", flush=True) + if not sanity_check: + for obj in bucket.objects.filter(Prefix=prefix): + target = os.path.join(local_path, os.path.relpath(obj.key, os.path.dirname(prefix))) + if not os.path.exists(os.path.dirname(target)): + os.makedirs(os.path.dirname(target)) + if obj.key[-1] == "/": + continue + bucket.download_file(obj.key, target) + + +def expand_paths(cps, s3): + expanded: List[str] = [] + + for cp in cps: + bucket = cp.split("/")[2] + segs = cp.split("*") + prefix = segs[0].replace("s3://" + bucket + "/", "") + + relevant_dirs = [] + skip_parent = [] + + paginator = s3.get_paginator("list_objects_v2") + page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix) + contents = {obj["Key"]: str(Path(obj["Key"]).parent) for page in page_iterator for obj in page["Contents"]} + paths = set(contents.values()) + + for path in contents: + p = Path(path) + parent = str(p.parent) + grandpa = str(p.parent.parent) + + if parent in relevant_dirs or parent in skip_parent: + continue + if p.parent.name in ["optim", "train", "model"]: + if f"{grandpa}-unsharded" in paths: + # skip condition + skip_parent.append(parent) + continue + else: + relevant_dirs.append(grandpa) + elif p.name == "model.pt": + relevant_dirs.append(parent) + + search_segs = [seg for i, seg in enumerate(segs) if i > 0 and seg != ""] + + # subselect the directory with remaining segments (for multiple wildcard *) + temp_dirs = relevant_dirs + if len(search_segs) > 0: + for s in search_segs: + temp_dirs = [d for d in temp_dirs if s in d] + + exp = set([f"s3://{bucket}/{d}" for d in temp_dirs]) + + expanded += exp + return expanded + + +def read_checkpoints(f): + with open(f, "r") as fin: + checkpoints = [line for line in fin if line and line != ""] + return checkpoints + + +def main(): + parser = argparse.ArgumentParser() + + group_batch = parser.add_mutually_exclusive_group(required=True) + group_batch.add_argument("--checkpoint-path", help="path to sharded checkpoint", type=str) + group_batch.add_argument( + "--checkpoint-path-file", help="file that lists sharded checkpoint paths (batch run option)", type=str + ) + parser.add_argument("--weka-load-dir", help="mounted location of weka bucket", default="/data/input", type=str) + parser.add_argument("--weka-prefix", help="weka directory prefix for output", default="/weka", type=str) + parser.add_argument( + "--sanity-check", help="print what would be run; do not actually run conversion", action="store_true" + ) + parser.add_argument( + "--save-to-weka", help="if checkpoints are found on s3, save them to loaded weka dir", action="store_true" + ) + + args = parser.parse_args() + + if args.checkpoint_path is not None: + convert_checkpoint( + [args.checkpoint_path], + load_dir=args.weka_load_dir.rstrip("/"), + sanity_check=args.sanity_check, + weka_prefix=args.weka_prefix, + save_to_weka=args.save_to_weka, + ) + else: + convert_checkpoint( + read_checkpoints(args.checkpoint_path_file), + load_dir=args.weka_load_dir.rstrip("/"), + sanity_check=args.sanity_check, + weka_prefix=args.weka_prefix, + save_to_weka=args.save_to_weka, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/convert_olmo_to_hf_new.py b/scripts/convert_olmo_to_hf_new.py index 0f4ebe9f0..b0752e651 100644 --- a/scripts/convert_olmo_to_hf_new.py +++ b/scripts/convert_olmo_to_hf_new.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import argparse import gc import json