Yes, you can perform evaluations for Direct Preference Optimization (DPO) using the Axolotl codebase. The evaluation process typically involves loading a prepared dataset, applying the model to generate predictions, and then comparing these predictions against the ground truth to compute metrics such as accuracy or loss. Here's a step-by-step guide on how you might approach this, based on the Axolotl codebase structure and the provided context:
Load the Evaluation Dataset: Use the load_prepare_dpo_datasets
function to load your evaluation dataset. This function is designed to load and prepare datasets for DPO training and evaluation. Make sure to specify your evaluation dataset configuration properly.
cfg = { # Your configuration details such as dataset paths, types, etc. } _, eval_dataset = load_prepare_dpo_datasets(cfg)
Prepare the Model for Evaluation: Ensure your DPO model is loaded and ready for evaluation. If you're using a model that's been trained with DPO strategies, make sure it's in evaluation mode to disable dropout layers or batch normalization layers that might interfere with the evaluation.
model.eval() # Assuming `model` is your DPO model
Generate Predictions: Use your model to generate predictions on the evaluation dataset. This might involve iterating over the dataset and collecting the model's outputs.
predictions = [] for batch in eval_dataset: with torch.no_grad(): outputs = model(batch) predictions.append(outputs)
Compute Evaluation Metrics: Depending on the nature of your task (e.g., classification, regression), compute the relevant metrics using the predictions and the true labels from your evaluation dataset. For instance, if you're working on a classification task, you might compute accuracy or F1 score.
from sklearn.metrics import accuracy_score true_labels = # Extract true labels from your eval_dataset predicted_labels = # Process your predictions to match the true labels' format accuracy = accuracy_score(true_labels, predicted_labels) print(f"Accuracy: {accuracy}")
Use Callbacks for Evaluation: If you're using the Axolotl framework's training loop, you can integrate evaluation into your training process using callbacks like BenchEvalCallback
or LogPredictionCallback
. These callbacks can be configured to perform evaluation at certain points in your training process (e.g., after each epoch).
from axolotl.utils.callbacks import BenchEvalCallback, LogPredictionCallback eval_callback = BenchEvalCallback(eval_dataset=eval_dataset, metric_name="accuracy") # Add `eval_callback` to your training loop's callbacks
Remember, the exact implementation details will depend on your specific model architecture, the nature of your DPO task, and how you've structured your data and training loop. The steps outlined above provide a general framework for performing evaluations within the context of DPO using the Axolotl codebase.
def validate_dpo_beta(cls, dpo_beta): if dpo_beta is not None: LOG.warning("dpo_beta is deprecated, use rl_beta instead") return dpo_beta
def eval(self): """ Sets the optimizer to "eval" mode. Useful for optimizers like `schedule_free` """ return self.optimizer.eval()
""" User-defined DPO strategies """
def load_prepare_dpo_datasets(cfg): def load_split(dataset_cfgs, _cfg): split_datasets: List[Any] = [] for i, ds_cfg in enumerate(dataset_cfgs): if ds_cfg["ds_type"] == "json": for data_file in ds_cfg["data_files"]: data_files = {ds_cfg["split"]: data_file} ds = load_dataset( # pylint: disable=invalid-name "json", data_files=data_files, split=ds_cfg["split"], ) split_datasets.insert(i, ds) else: ds = load_dataset( # pylint: disable=invalid-name ds_cfg["path"], split=ds_cfg["split"], ) split_datasets.insert(i, ds) tokenizer = None for i, data_set in enumerate(split_datasets): _type = dataset_cfgs[i]["type"] if _type: if isinstance(_type, DictDefault): _type = "user_defined.default" if _cfg.rl == "orpo": ds_transform_fn = load_orpo(_type, _cfg, dataset_idx=i) elif _cfg.rl == "kto": ds_transform_fn = load_kto(_type, _cfg, dataset_idx=i) else: ds_transform_fn = load_dpo(_type, _cfg, dataset_idx=i) split_datasets[i] = map_dataset( cfg, data_set, ds_transform_fn, tokenizer ) elif _cfg.rl == "kto": ds_transform_fn = load_kto(_type, _cfg, dataset_idx=i) split_datasets[i] = map_dataset( cfg, data_set, ds_transform_fn, tokenizer ) else: # If no `type` is provided, assume the dataset is already in the expected format with # "prompt", "chosen" and "rejected" already preprocessed split_datasets[i] = data_set return concatenate_datasets(split_datasets) with zero_first(is_main_process()): train_is_preprocessed = False eval_is_preprocessed = False if train_dataset := _load_preprocessed_ds(cfg, cfg.datasets): train_is_preprocessed = True else: train_dataset = load_split(cfg.datasets, cfg) eval_dataset = None if cfg.test_datasets: if eval_dataset := _load_preprocessed_ds(cfg, cfg.test_datasets): eval_is_preprocessed = True else: eval_dataset = load_split(cfg.test_datasets, cfg) if not eval_dataset: eval_dataset = None if not train_is_preprocessed: _save_preprocessed_ds(cfg, cfg.datasets, train_dataset) if eval_dataset and not eval_is_preprocessed: _save_preprocessed_ds(cfg, cfg.test_datasets, eval_dataset) return train_dataset, eval_dataset
class AxolotlDPOConfig(AxolotlTrainingMixins, DPOConfig): """ DPO config for DPO training """
Our evaluation script enables two modes of evaluation (controlled by the eval_mode
argument): e2e
- end2end evaluation, returns EM (exact match) and F1 scores calculated for the downstream task and retrieval
- which returns precision@k of the documents retrieved for provided inputs.
The evaluation script expects paths to two files:
evaluation_set
- a path to a file specifying the evaluation dataset, a single input per line.gold_data_path
- a path to a file contaning ground truth answers for datapoints from the evaluation_set
, a single output per line. Check below for expected formats of the gold data files.For retrieval
evaluation, we expect a gold data file where each line will consist of a tab-separated list of document titles constituting positive contexts for respective datapoints from the evaluation_set
. E.g. given a question who sings does he love me with reba
in the evaluation_set
, a respective ground truth line could look as follows:
Does He Love You Does He Love You Red Sandy Spika dress of Reba McEntire Greatest Hits Volume Two (Reba McEntire album) Shoot for the Moon (album)
We demonstrate how to evaluate retrieval against DPR evaluation data. You can download respective files from links listed here.
Download and unzip the gold data file. We use the biencoder-nq-dev
from https://dl.fbaipublicfiles.com/dpr/data/retriever/biencoder-nq-dev.json.gz.
wget https://dl.fbaipublicfiles.com/dpr/data/retriever/biencoder-nq-dev.json.gz && gzip -d biencoder-nq-dev.json.gz
Parse the unziped file using the parse_dpr_relevance_data.py
mkdir output # or wherever you want to save this python examples/research_projects/rag/parse_dpr_relevance_data.py \ --src_path biencoder-nq-dev.json \ --evaluation_set output/biencoder-nq-dev.questions \ --gold_data_path output/biencoder-nq-dev.pages
Run evaluation:
python examples/research_projects/rag/eval_rag.py \ --model_name_or_path facebook/rag-sequence-nq \ --model_type rag_sequence \ --evaluation_set output/biencoder-nq-dev.questions \ --gold_data_path output/biencoder-nq-dev.pages \ --predictions_path output/retrieval_preds.tsv \ --eval_mode retrieval \ --k 1
# EXPLANATION python examples/research_projects/rag/eval_rag.py \ --model_name_or_path facebook/rag-sequence-nq \ # model name or path of the model we're evaluating --model_type rag_sequence \ # RAG model type (rag_token or rag_sequence) --evaluation_set output/biencoder-nq-dev.questions \ # an input dataset for evaluation --gold_data_path poutput/biencoder-nq-dev.pages \ # a dataset containing ground truth answers for samples from the evaluation_set --predictions_path output/retrieval_preds.tsv \ # name of file where predictions will be stored --eval_mode retrieval \ # indicates whether we're performing retrieval evaluation or e2e evaluation --k 1 # parameter k for the precision@k metric
We support two formats of the gold data file (controlled by the gold_data_mode
parameter):
qa
- where a single line has the following format: input [tab] output_list
, e.g.:who is the owner of reading football club ['Xiu Li Dai', 'Dai Yongge', 'Dai Xiuli', 'Yongge Dai']
ans
- where a single line contains a single expected answer, e.g.:Xiu Li Dai
Predictions of the model for the samples from the evaluation_set
will be saved under the path specified by the predictions_path
parameter.
If this path already exists, the script will use saved predictions to calculate metrics.
Add --recalculate
parameter to force the script to perform inference from scratch.
An example e2e evaluation run could look as follows:
python examples/research_projects/rag/eval_rag.py \ --model_name_or_path facebook/rag-sequence-nq \ --model_type rag_sequence \ --evaluation_set path/to/test.source \ --gold_data_path path/to/gold_data \ --predictions_path path/to/e2e_preds.txt \ --eval_mode e2e \ --gold_data_mode qa \ --n_docs 5 \ # You can experiment with retrieving different number of documents at evaluation time --print_predictions \ --recalculate \ # adding this parameter will force recalculating predictions even if predictions_path already exists
Object detection models are commonly evaluated with a set of <a href="https://cocodataset.org/#detection-eval">COCO-style metrics</a>.
You can use one of the existing metrics implementations, but here you'll use the one from torchvision
to evaluate the final
model that you pushed to the Hub.
To use the torchvision
evaluator, you'll need to prepare a ground truth COCO dataset. The API to build a COCO dataset
requires the data to be stored in a certain format, so you'll need to save images and annotations to disk first. Just like
when you prepared your data for training, the annotations from the cppe5["test"]
need to be formatted. However, images
should stay as they are.
The evaluation step requires a bit of work, but it can be split in three major steps.
First, prepare the cppe5["test"]
set: format the annotations and save the data to disk.
>>> import json >>> # format annotations the same as for training, no need for data augmentation >>> def val_formatted_anns(image_id, objects): ... annotations = [] ... for i in range(0, len(objects["id"])): ... new_ann = { ... "id": objects["id"][i], ... "category_id": objects["category"][i], ... "iscrowd": 0, ... "image_id": image_id, ... "area": objects["area"][i], ... "bbox": objects["bbox"][i], ... } ... annotations.append(new_ann) ... return annotations >>> # Save images and annotations into the files torchvision.datasets.CocoDetection expects >>> def save_cppe5_annotation_file_images(cppe5): ... output_json = {} ... path_output_cppe5 = f"{os.getcwd()}/cppe5/" ... if not os.path.exists(path_output_cppe5): ... os.makedirs(path_output_cppe5) ... path_anno = os.path.join(path_output_cppe5, "cppe5_ann.json") ... categories_json = [{"supercategory": "none", "id": id, "name": id2label[id]} for id in id2label] ... output_json["images"] = [] ... output_json["annotations"] = [] ... for example in cppe5: ... ann = val_formatted_anns(example["image_id"], example["objects"]) ... output_json["images"].append( ... { ... "id": example["image_id"], ... "width": example["image"].width, ... "height": example["image"].height, ... "file_name": f"{example['image_id']}.png", ... } ... ) ... output_json["annotations"].extend(ann) ... output_json["categories"] = categories_json ... with open(path_anno, "w") as file: ... json.dump(output_json, file, ensure_ascii=False, indent=4) ... for im, img_id in zip(cppe5["image"], cppe5["image_id"]): ... path_img = os.path.join(path_output_cppe5, f"{img_id}.png") ... im.save(path_img) ... return path_output_cppe5, path_anno
Next, prepare an instance of a CocoDetection
class that can be used with cocoevaluator
.
>>> import torchvision >>> class CocoDetection(torchvision.datasets.CocoDetection): ... def __init__(self, img_folder, image_processor, ann_file): ... super().__init__(img_folder, ann_file) ... self.image_processor = image_processor ... def __getitem__(self, idx): ... # read in PIL image and target in COCO format ... img, target = super(CocoDetection, self).__getitem__(idx) ... # preprocess image and target: converting target to DETR format, ... # resizing + normalization of both image and target) ... image_id = self.ids[idx] ... target = {"image_id": image_id, "annotations": target} ... encoding = self.image_processor(images=img, annotations=target, return_tensors="pt") ... pixel_values = encoding["pixel_values"].squeeze() # remove batch dimension ... target = encoding["labels"][0] # remove batch dimension ... return {"pixel_values": pixel_values, "labels": target} >>> image_processor = AutoImageProcessor.from_pretrained("devonho/detr-resnet-50_finetuned_cppe5") >>> path_output_cppe5, path_anno = save_cppe5_annotation_file_images(cppe5["test"]) >>> test_ds_coco_format = CocoDetection(path_output_cppe5, image_processor, path_anno)
Finally, load the metrics and run the evaluation.
>>> import evaluate >>> from tqdm import tqdm >>> model = AutoModelForObjectDetection.from_pretrained("devonho/detr-resnet-50_finetuned_cppe5") >>> module = evaluate.load("ybelkada/cocoevaluate", coco=test_ds_coco_format.coco) >>> val_dataloader = torch.utils.data.DataLoader( ... test_ds_coco_format, batch_size=8, shuffle=False, num_workers=4, collate_fn=collate_fn ... ) >>> device = torch.device("cuda") if torch.cuda.is_available() else "cpu" >>> model.to(device) >>> with torch.no_grad(): ... for idx, batch in enumerate(tqdm(val_dataloader)): ... pixel_values = batch["pixel_values"].to(device) ... pixel_mask = batch["pixel_mask"].to(device) ... labels = [ ... {k: v for k, v in t.items()} for t in batch["labels"] ... ] # these are in DETR format, resized + normalized ... # forward pass ... outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask) ... orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0) ... # convert outputs of model to Pascal VOC format (xmin, ymin, xmax, ymax) ... results = image_processor.post_process_object_detection(outputs, threshold=0, target_sizes=orig_target_sizes) ... ... module.add(prediction=results, reference=labels) ... del batch >>> results = module.compute() >>> print(results) Accumulating evaluation results... DONE (t=0.08s). IoU metric: bbox Average Precision (AP) @[ IoU=0.50:0.95 | area= all | maxDets=100 ] = 0.352 Average Precision (AP) @[ IoU=0.50 | area= all | maxDets=100 ] = 0.681 Average Precision (AP) @[ IoU=0.75 | area= all | maxDets=100 ] = 0.292 Average Precision (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.168 Average Precision (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.208 Average Precision (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.429 Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets= 1 ] = 0.274 Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets= 10 ] = 0.484 Average Recall (AR) @[ IoU=0.50:0.95 | area= all | maxDets=100 ] = 0.501 Average Recall (AR) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.191 Average Recall (AR) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.323 Average Recall (AR) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.590
These results can be further improved by adjusting the hyperparameters in [~transformers.TrainingArguments
]. Give it a go!
For retrieval
evaluation, we expect a gold data file where each line will consist of a tab-separated list of document titles constituting positive contexts for respective datapoints from the evaluation_set
. E.g. given a question who sings does he love me with reba
in the evaluation_set
, a respective ground truth line could look as follows:
Does He Love You Does He Love You Red Sandy Spika dress of Reba McEntire Greatest Hits Volume Two (Reba McEntire album) Shoot for the Moon (album)
We demonstrate how to evaluate retrieval against DPR evaluation data. You can download respective files from links listed here.
Download and unzip the gold data file. We use the biencoder-nq-dev
from https://dl.fbaipublicfiles.com/dpr/data/retriever/biencoder-nq-dev.json.gz.
wget https://dl.fbaipublicfiles.com/dpr/data/retriever/biencoder-nq-dev.json.gz && gzip -d biencoder-nq-dev.json.gz
Parse the unziped file using the parse_dpr_relevance_data.py
mkdir output # or wherever you want to save this python examples/research_projects/rag/parse_dpr_relevance_data.py \ --src_path biencoder-nq-dev.json \ --evaluation_set output/biencoder-nq-dev.questions \ --gold_data_path output/biencoder-nq-dev.pages
Run evaluation:
python examples/research_projects/rag/eval_rag.py \ --model_name_or_path facebook/rag-sequence-nq \ --model_type rag_sequence \ --evaluation_set output/biencoder-nq-dev.questions \ --gold_data_path output/biencoder-nq-dev.pages \ --predictions_path output/retrieval_preds.tsv \ --eval_mode retrieval \ --k 1
# EXPLANATION python examples/research_projects/rag/eval_rag.py \ --model_name_or_path facebook/rag-sequence-nq \ # model name or path of the model we're evaluating --model_type rag_sequence \ # RAG model type (rag_token or rag_sequence) --evaluation_set output/biencoder-nq-dev.questions \ # an input dataset for evaluation --gold_data_path poutput/biencoder-nq-dev.pages \ # a dataset containing ground truth answers for samples from the evaluation_set --predictions_path output/retrieval_preds.tsv \ # name of file where predictions will be stored --eval_mode retrieval \ # indicates whether we're performing retrieval evaluation or e2e evaluation --k 1 # parameter k for the precision@k metric
def test_do_eval_no_train(self): # testing only zero3 since zero2 makes no sense with inference self.run_and_check( stage=ZERO3, dtype=FP16, eval_steps=1, distributed=False, do_train=False, do_eval=True, )
"""data handling specific to DPO"""
def evaluation_loop(model, eval_dataloader, processor, normalizer, metric, forced_decoder_ids, accelerator): model.eval() predictions = [] references = [] normalized_predictions = [] normalized_references = [] for _, batch in enumerate(tqdm(eval_dataloader)): with torch.cuda.amp.autocast(): with torch.no_grad(): generated_tokens = ( model.generate( input_features=batch["input_features"], forced_decoder_ids=forced_decoder_ids, max_new_tokens=255, ) .cpu() .numpy() ) labels = batch["labels"].cpu().numpy() labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id) decoded_preds = processor.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True) predictions.extend(decoded_preds) references.extend(decoded_labels) normalized_predictions.extend([normalizer(pred).strip() for pred in decoded_preds]) normalized_references.extend([normalizer(label).strip() for label in decoded_labels]) del generated_tokens, labels, batch gc.collect() wer = 100 * metric.compute(predictions=predictions, references=references) normalized_wer = 100 * metric.compute(predictions=normalized_predictions, references=normalized_references) eval_metrics = {"eval/wer": wer, "eval/normalized_wer": normalized_wer} if accelerator.get_tracker("wandb"): sample_size = min(len(predictions), 256) ids = [randint(0, len(predictions) - 1) for p in range(0, sample_size)] sample_predictions = [predictions[i] for i in ids] sample_references = [references[i] for i in ids] sample_normalized_predictions = [normalized_predictions[i] for i in ids] sample_normalized_references = [normalized_references[i] for i in ids] table_rows = [ list(r) for r in zip( sample_predictions, sample_references, sample_normalized_predictions, sample_normalized_references ) ] eval_metrics["eval_samples"] = wandb.Table( columns=["predictions", "references", "normalized_predictions", "normalized_references"], rows=table_rows, ) return eval_metrics
eval_dataloader = DataLoader( eval_dataset_for_model, collate_fn=data_collator, batch_size=args.per_device_eval_batch_size )
def on_evaluate( self, args: AxolotlTrainingArguments, state: TrainerState, # pylint: disable=unused-argument control: TrainerControl, # pylint: disable=unused-argument metrics: Dict[str, float], # pylint: disable=unused-argument **kwargs, # pylint: disable=unused-argument ): data_loader = trainer.get_bench_dataloader( bench_dataset.remove_columns(["input", "subject", "output", "name"]) ) trainer.model.eval() preds, refs = [], [] loss_bench = 0 for batch in tqdm(data_loader, total=len(data_loader)): (loss, logits, labels) = trainer.prediction_step( trainer.model, batch, prediction_loss_only=False, ) # There are two tokens, the output, and eos token. for i, logit in enumerate(logits): label_non_zero_id = (batch["labels"][i] != IGNORE_INDEX).nonzero()[ 0 ][0] logit_abcd = logit[label_non_zero_id - 1][abcd_idx] preds.append(torch.argmax(logit_abcd).item()) labels = labels[labels != IGNORE_INDEX].view(-1, 2)[:, 0] refs += [ abcd_idx.index(label) if label in abcd_idx else -1 for label in labels.tolist() ] loss_bench += loss.item() # Extract results by subject. bench_name = bench_dataset["name"] bench_names: dict = {s: {"refs": [], "preds": []} for s in set(bench_name)} for s, p, r in zip(bench_name, preds, refs): # pylint: disable=invalid-name bench_names[s]["preds"].append(p) bench_names[s]["refs"].append(r) barrier() local_bench_names = bench_names gathered_bench_names: List[Dict] = [{} for _ in range(get_world_size())] # Gather results from all GPUs to GPU 0 loss_bench_ranks = gather_scalar_from_all_ranks( lambda: loss_bench, get_world_size() ) len_data_loader_ranks = gather_scalar_from_all_ranks( lambda: len(data_loader), get_world_size() ) results = {} if is_distributed() and not is_main_process(): dist.gather_object(local_bench_names, dst=0) else: if is_distributed(): dist.gather_object(local_bench_names, gathered_bench_names, dst=0) else: gathered_bench_names = [local_bench_names] bench_loss = sum(loss_bench_ranks) / sum(len_data_loader_ranks) results = {f"{bench_split}_bench_loss": bench_loss} # Combine results from all GPUs combined_bench_names: Dict[str, Dict[str, List]] = {} for bench_name in gathered_bench_names: for name, data in bench_name.items(): if name not in combined_bench_names: combined_bench_names[name] = {"refs": [], "preds": []} combined_bench_names[name]["refs"].extend(data["refs"]) combined_bench_names[name]["preds"].extend(data["preds"]) bench_scores = [] bench_refs = [] bench_preds = [] for ( bench_name ) in combined_bench_names: # pylint: disable=consider-using-dict-items bench_score = accuracy.compute( references=combined_bench_names[bench_name]["refs"], predictions=combined_bench_names[bench_name]["preds"], )["accuracy"] bench_refs.extend(combined_bench_names[bench_name]["refs"]) bench_preds.extend(combined_bench_names[bench_name]["preds"]) if not pd.isna(bench_score): results[ f"{bench_split}_bench_accuracy_{bench_name}" ] = bench_score bench_scores.append(bench_score) else: results[f"{bench_split}_bench_accuracy_{bench_name}"] = 0.0 bench_scores.append(0.0) results[f"{bench_split}_bench_average_accuracy"] = np.mean(bench_scores) results[f"{bench_split}_bench_total_accuracy"] = accuracy.compute( references=bench_refs, predictions=bench_preds )["accuracy"] trainer.log(results) results = broadcast_dict(results) for key, val in results.items(): metrics[key] = val
# silence the warnings. Please re-enable for inference! model.eval()
def on_evaluate( self, args: AxolotlTrainingArguments, # pylint: disable=unused-argument state: TrainerState, control: TrainerControl, train_dataloader, # pylint: disable=unused-argument eval_dataloader, **kwargs, # pylint: disable=unused-argument ): eval_table_size = self.cfg.eval_table_size if eval_table_size <= 0: return control trainer.model.eval() device = torch.device(self.cfg.device) # pylint: disable=duplicate-code generation_config = GenerationConfig( max_new_tokens=self.cfg.eval_max_new_tokens, bos_token_id=tokenizer.bos_token_id, eos_token_id=tokenizer.eos_token_id, pad_token_id=tokenizer.pad_token_id, do_sample=False, use_cache=True, return_dict_in_generate=True, output_attentions=False, output_hidden_states=False, output_scores=False, ) def logits_to_tokens(logits) -> torch.Tensor: probabilities = torch.softmax(logits, dim=-1) # Get the predicted token ids (the ones with the highest probability) predicted_token_ids = torch.argmax(probabilities, dim=-1) return predicted_token_ids def find_ranges(lst): ranges = [] start = 0 for i in range(1, len(lst)): if lst[i] == 0: ranges.append((start, i - 1)) start = i end = len(lst) - 1 ranges.append((start, end)) return ranges def log_table_from_dataloader(name: str, table_dataloader): table_data: Dict[str, List[Any]] = { "id": [], "Prompt": [], "Correct Completion": [], "Predicted Completion (model.generate)": [], "Predicted Completion (trainer.prediction_step)": [], } row_index = 0 for batch in tqdm(table_dataloader): if row_index > eval_table_size: break batch_labels = batch["labels"].to(device) batch_input_ids = batch["input_ids"].to(device) if "position_ids" in batch: batch_pos_ids = batch["position_ids"].tolist() else: batch_pos_ids = [None] * len(batch["input_ids"]) (_, batch_logits, _) = trainer.prediction_step( trainer.model, batch, prediction_loss_only=False, ) prompt_token_ids_list = [] pred_step_token_ids_list = [] completion_token_ids_list = [] for input_ids_all, labels_all, pos_ids, logits in zip( batch_input_ids, batch_labels, batch_pos_ids, batch_logits, ): if pos_ids is None: pos_ranges = [(0, len(input_ids_all) - 1)] else: pos_ranges = find_ranges(pos_ids) for pos_range in pos_ranges: start, end = pos_range if start == end: continue input_ids = input_ids_all[start : end + 1] labels = labels_all[start : end + 1] tokens_without_loss = labels == IGNORE_INDEX tokens_with_loss = labels != IGNORE_INDEX tokens_exclude_padding = input_ids != tokenizer.pad_token_id prompt_token_includes = ( tokens_without_loss & tokens_exclude_padding ) prompt_token_ids = input_ids[prompt_token_includes] prompt_token_ids_list.append(prompt_token_ids) completion_token_ids = input_ids[tokens_with_loss] completion_token_ids_list.append(completion_token_ids) pred_step_token_ids = logits_to_tokens( logits[start : end + 1] )[tokens_with_loss] pred_step_token_ids_list.append(pred_step_token_ids) prompt_texts = tokenizer.batch_decode( prompt_token_ids_list, skip_special_tokens=True ) completion_texts = tokenizer.batch_decode( completion_token_ids_list, skip_special_tokens=True ) pred_step_texts = tokenizer.batch_decode( pred_step_token_ids_list, skip_special_tokens=True ) with torch.no_grad(): prompt_encoding = tokenizer( prompt_texts, padding=True, return_tensors="pt" ).to(self.cfg.device) predictions = trainer.model.generate( **prompt_encoding, generation_config=generation_config ) prediction_all_tokens = predictions["sequences"].cpu().tolist() prediction_without_prompt_tokens_list = [] for prompt_token_ids, prediction_tokens in zip( prompt_token_ids_list, prediction_all_tokens ): prediction_without_prompt_tokens = prediction_tokens[ len(prompt_token_ids) : ] prediction_without_prompt_tokens_list.append( prediction_without_prompt_tokens ) predicted_texts = tokenizer.batch_decode( prediction_without_prompt_tokens_list, skip_special_tokens=True ) for ( prompt_text, completion_text, prediction_text, pred_step_text, ) in zip( prompt_texts, completion_texts, predicted_texts, pred_step_texts ): table_data["id"].append(row_index) table_data["Prompt"].append(prompt_text) table_data["Correct Completion"].append(completion_text) table_data["Predicted Completion (model.generate)"].append( prediction_text ) table_data[ "Predicted Completion (trainer.prediction_step)" ].append(pred_step_text) row_index += 1 if logger == "wandb": wandb.run.log({f"{name} - Predictions vs Ground Truth": pd.DataFrame(table_data)}) # type: ignore[attr-defined] elif logger == "mlflow" and is_mlflow_available(): import mlflow tracking_uri = AxolotlInputConfig( **self.cfg.to_dict() ).mlflow_tracking_uri mlflow.log_table( data=table_data, artifact_file="PredictionsVsGroundTruth.json", tracking_uri=tracking_uri, ) if is_main_process(): log_table_from_dataloader("Eval", eval_dataloader) return control
prediction = post_processing_function(eval_examples, eval_dataset, all_preds) eval_metric = metric.compute(predictions=prediction.predictions, references=prediction.label_ids) logger.info(f"Evaluation metrics: {eval_metric}")
class UserDefinedDPOType(BaseModel): """User defined typing for DPO""" field_system: Optional[str] = None field_prompt: Optional[str] = None field_chosen: Optional[str] = None field_rejected: Optional[str] = None prompt_format: Optional[str] = None chosen_format: Optional[str] = None rejected_format: Optional[str] = None
def main(): parser = argparse.ArgumentParser() # Required parameters parser.add_argument( "--src_path", type=str, default="biencoder-nq-dev.json", help="Path to raw DPR training data", ) parser.add_argument( "--evaluation_set", type=str, help="where to store parsed evaluation_set file", ) parser.add_argument( "--gold_data_path", type=str, help="where to store parsed gold_data_path file", ) args = parser.parse_args() with open(args.src_path, "r") as src_file, open(args.evaluation_set, "w") as eval_file, open( args.gold_data_path, "w" ) as gold_file: dpr_records = json.load(src_file) for dpr_record in tqdm(dpr_records): question = dpr_record["question"] contexts = [context["title"] for context in dpr_record["positive_ctxs"]] eval_file.write(question + "\n") gold_file.write("\t".join(contexts) + "\n")
def on_evaluate( self, args: AxolotlTrainingArguments, # pylint: disable=unused-argument state: TrainerState, control: TrainerControl, train_dataloader, # pylint: disable=unused-argument eval_dataloader, **kwargs, # pylint: disable=unused-argument ): trainer.model.eval() device = torch.device(self.cfg.device) # pylint: disable=duplicate-code generation_config = GenerationConfig( max_new_tokens=self.cfg.eval_max_new_tokens, bos_token_id=tokenizer.bos_token_id, eos_token_id=tokenizer.eos_token_id, pad_token_id=tokenizer.pad_token_id, do_sample=False, use_cache=True, return_dict_in_generate=True, output_attentions=False, output_hidden_states=False, output_scores=False, ) def find_ranges(lst): ranges = [] start = 0 for i in range(1, len(lst)): if lst[i] == 0: ranges.append((start, i - 1)) start = i end = len(lst) - 1 ranges.append((start, end)) return ranges def compute(metric: evaluate.Metric, **kwargs): # safely compute a metric and return the score if the format is correct metric_score = None try: # Only pass the kwargs that are in the metric's feature list metric_kwargs = { k: kwargs[k] for k in metric._feature_names() # pylint: disable=protected-access if k in kwargs } metric_score = metric.compute(**metric_kwargs) return ( metric_score["score"] if "score" in metric_score else metric_score["mean_score"] ) except Exception: # pylint: disable=broad-exception-caught traceback.print_exc() LOG.debug( f"Failed to compute metric {metric.name} with kwargs {kwargs.keys()}" ) return metric_score def evaluate_preds(sources, predictions, references): scores = {} for metric_name, metric in self.metrics.items(): score = compute( metric, references=references, predictions=predictions, sources=sources, ) if score is None: score = compute( metric, references=[[r] for r in references], predictions=predictions, ) scores[metric_name] = score return scores def predict_with_generate(): eval_src, eval_pred, eval_ref = [], [], [] for batch in tqdm(eval_dataloader): batch_labels = batch["labels"].to(device) batch_input_ids = batch["input_ids"].to(device) if "position_ids" in batch: batch_pos_ids = batch["position_ids"].tolist() else: batch_pos_ids = [None] * len(batch["input_ids"]) prompt_token_ids_list = [] completion_token_ids_list = [] for input_ids_all, labels_all, pos_ids in zip( batch_input_ids, batch_labels, batch_pos_ids, ): if pos_ids is None: pos_ranges = [(0, len(input_ids_all) - 1)] else: pos_ranges = find_ranges(pos_ids) for pos_range in pos_ranges: start, end = pos_range if start == end: continue input_ids = input_ids_all[start : end + 1] labels = labels_all[start : end + 1] tokens_without_loss = labels == IGNORE_INDEX tokens_with_loss = labels != IGNORE_INDEX tokens_exclude_padding = input_ids != tokenizer.pad_token_id prompt_token_includes = ( tokens_without_loss & tokens_exclude_padding ) prompt_token_ids = input_ids[prompt_token_includes] prompt_token_ids_list.append(prompt_token_ids) completion_token_ids = input_ids[tokens_with_loss] completion_token_ids_list.append(completion_token_ids) prompt_texts = tokenizer.batch_decode( prompt_token_ids_list, skip_special_tokens=True ) completion_texts = tokenizer.batch_decode( completion_token_ids_list, skip_special_tokens=True ) with torch.no_grad(): prompt_encoding = tokenizer( prompt_texts, padding=True, return_tensors="pt" ).to(self.cfg.device) predictions = trainer.model.generate( **prompt_encoding, generation_config=generation_config ) prediction_all_tokens = predictions["sequences"].cpu().tolist() prediction_without_prompt_tokens_list = [] for prompt_token_ids, prediction_tokens in zip( prompt_token_ids_list, prediction_all_tokens ): prediction_without_prompt_tokens = prediction_tokens[ len(prompt_token_ids) : ] prediction_without_prompt_tokens_list.append( prediction_without_prompt_tokens ) predicted_texts = tokenizer.batch_decode( prediction_without_prompt_tokens_list, skip_special_tokens=True ) eval_src.extend(prompt_texts) eval_pred.extend(predicted_texts) eval_ref.extend(completion_texts) return eval_src, eval_pred, eval_ref if is_main_process(): eval_preds = predict_with_generate() trainer.log(evaluate_preds(*eval_preds)) return control
""" DPO strategies for chatml """
# New Code # def evaluate(args, model, eval_dataloader, accelerator, eval_dataset): model.eval() losses = [] for step, batch in enumerate(eval_dataloader): with torch.no_grad(): outputs = model(**batch) loss = outputs.loss losses.append(accelerator.gather_for_metrics(loss.repeat(args.per_device_eval_batch_size))) losses = torch.cat(losses) try: eval_loss = torch.mean(losses) perplexity = math.exp(eval_loss) except OverflowError: perplexity = float("inf") return perplexity, eval_loss
eval_dataloader = DataLoader(eval_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)
To perform distributed evaluation, pass your validation dataloader to the [~Accelerator.prepare
] method:
validation_dataloader = accelerator.prepare(validation_dataloader)
Each device in your distributed setup only receives a part of the evaluation data, which means you should group your predictions together with the [~Accelerator.gather_for_metrics
] method. This method requires all tensors to be the same size on each process, so if your tensors have different sizes on each process (for instance when dynamically padding to the maximum length in a batch), you should use the [~Accelerator.pad_across_processes
] method to pad you tensor to the largest size across processes. Note that the tensors needs to be 1D and that we concatenate the tensors along the first dimension.
for inputs, targets in validation_dataloader: predictions = model(inputs) # Gather all predictions and targets all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets)) # Example of use with a *Datasets.Metric* metric.add_batch(all_predictions, all_targets)
For more complex cases (e.g. 2D tensors, don't want to concatenate tensors, dict of 3D tensors), you can pass use_gather_object=True
in gather_for_metrics
. This will return the list of objects after gathering. Note that using it with GPU tensors is not well supported and inefficient.
[!TIP] Data at the end of a dataset may be duplicated so the batch can be equally divided among all workers. The [
~Accelerator.gather_for_metrics
] method automatically removes the duplicated data to calculate a more accurate metric.
PEFT_TYPE="boft" BLOCK_NUM=8 BLOCK_SIZE=0 N_BUTTERFLY_FACTOR=1 ITER_NUM=50000 export RUN_NAME="${PEFT_TYPE}_${BLOCK_NUM}${BLOCK_SIZE}${N_BUTTERFLY_FACTOR}" export MODEL_NAME="stabilityai/stable-diffusion-2-1" # export MODEL_NAME="runwayml/stable-diffusion-v1-5" export DATASET_NAME="oftverse/control-celeba-hq" export CKPT_NAME="checkpoint-${ITER_NUM}" export OUTPUT_DIR="./output/${DATASET_NAME}/${RUN_NAME}/${CKPT_NAME}" export CONTROLNET_PATH="${OUTPUT_DIR}/controlnet/model.safetensors" export UNET_PATH="${OUTPUT_DIR}/unet/${RUN_NAME}" accelerate launch eval.py \ --pretrained_model_name_or_path=$MODEL_NAME \ --dataset_name=$DATASET_NAME \ --controlnet_path=$CONTROLNET_PATH \ --unet_path=$UNET_PATH \ --adapter_name=$RUN_NAME \ --output_dir=$OUTPUT_DIR \ --dataset_name=$DATASET_NAME \ --vis_overlays \
---
title: "RLHF (Beta)"
description: "Reinforcement Learning from Human Feedback is a method whereby a language model is optimized from data using human feedback."
---
### Overview
Reinforcement Learning from Human Feedback is a method whereby a language model is optimized from data using human
feedback. Various methods include, but not limited to:
- Proximal Policy Optimization (PPO) (not yet supported in axolotl)
- Direct Preference Optimization (DPO)
- Identity Preference Optimization (IPO)
### RLHF using Axolotl
>[!IMPORTANT]
>This is a BETA feature and many features are not fully implemented. You are encouraged to open new PRs to improve the integration and functionality.
The various RL training methods are implemented in trl and wrapped via axolotl. Below are various examples with how you can use various preference datasets to train models that use ChatML
#### DPO
```yaml
rl: dpo
datasets:
- path: Intel/orca_dpo_pairs
split: train
type: chatml.intel
- path: argilla/ultrafeedback-binarized-preferences
split: train
type: chatml.argilla
rl: ipo
Paper: https://arxiv.org/abs/2403.07691
rl: orpo orpo_alpha: 0.1 remove_unused_columns: false chat_template: chatml datasets: - path: argilla/ultrafeedback-binarized-preferences-cleaned type: chat_template.argilla
datasets: - ds_type: json data_files: - orca_rlhf.jsonl split: train type: chatml.intel
Trl supports autounwrapping peft models, so that a ref model does not need to be additionally loaded, leading to less VRAM needed. This is on by default. To turn it off, pass the following config.
# load ref model when adapter training. rl_adapter_ref_model: true
New parameter-efficient fine-tuning methods are developed all the time. If you would like to add a new and promising method to PEFT, please follow these steps.
def training_function(config, args): # For testing only if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1": config["num_epochs"] = 2 # New Code # ddp_comm_hook_type = DDPCommunicationHookType(args.ddp_comm_hook) ddp_comm_wrapper = DDPCommunicationHookType(args.ddp_comm_wrapper) ddp_kwargs = DistributedDataParallelKwargs(comm_hook=ddp_comm_hook_type, comm_wrapper=ddp_comm_wrapper) # Initialize accelerator accelerator = Accelerator(cpu=args.cpu, mixed_precision=args.mixed_precision, kwargs_handlers=[ddp_kwargs]) # Sample hyper-parameters for learning rate, batch size, seed and a few other HPs lr = config["lr"] num_epochs = int(config["num_epochs"]) seed = int(config["seed"]) batch_size = int(config["batch_size"]) metric = evaluate.load("glue", "mrpc") set_seed(seed) train_dataloader, eval_dataloader = get_dataloaders(accelerator, batch_size) # Instantiate the model (we build the model here so that the seed also control new weights initialization) model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True) # We could avoid this line since the accelerator is set with `device_placement=True` (default value). # Note that if you are placing tensors on devices manually, this line absolutely needs to be before the optimizer # creation otherwise training will not work on TPU (`accelerate` will kindly throw an error to make us aware of that). model = model.to(accelerator.device) # Instantiate optimizer optimizer = AdamW(params=model.parameters(), lr=lr) # Instantiate scheduler lr_scheduler = get_linear_schedule_with_warmup( optimizer=optimizer, num_warmup_steps=100, num_training_steps=(len(train_dataloader) * num_epochs), ) # Prepare everything # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the # prepare method. model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, eval_dataloader, lr_scheduler ) # Now we train the model for epoch in range(num_epochs): model.train() for step, batch in enumerate(train_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) # We use the new `accumulate` context manager to perform gradient accumulation with accelerator.accumulate(model): output = model(**batch) loss = output.loss accelerator.backward(loss) optimizer.step() lr_scheduler.step() optimizer.zero_grad() model.eval() for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) metric.add_batch( predictions=predictions, references=references, ) eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric)
Distributed Data Parallel (DDP) communication hooks provide a generic interface to control how gradients are communicated across workers by overriding the vanilla allreduce in DistributedDataParallel
. A few built-in communication hooks are provided, and users can easily apply any of these hooks to optimize communication.
torch.float16
), reducing communication overhead.torch.bfloat16
), which can be more efficient on certain hardware.In this tutorial, you will see how to quickly set up DDP communication hooks and perform training with the utilities provided in 🤗 Accelerate, which can be as simple as adding just one new line of code! This demonstrates how to use DDP communication hooks to optimize gradient communication in distributed training with the 🤗 Accelerate library.
</hfoption> <hfoption id="Accelerate">import torch from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.algorithms.ddp_comm_hooks import default_hooks class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) model = MyModel() model = DDP(model, device_ids=[torch.cuda.current_device()]) model.register_comm_hook(state=None, hook=default_hooks.fp16_compress_hook) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) loss.backward() optimizer.step() optimizer.zero_grad()
</hfoption> </hfoptions>from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs import torch class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) # DDP Communication Hook setup ddp_kwargs = DistributedDataParallelKwargs(comm_hook=DDPCommunicationHookType.FP16) accelerator = Accelerator(kwargs_handlers=[ddp_kwargs]) model = MyModel() optimizer = torch.optim.Adam(model.parameters()) data_loader = DataLoader(dataset, batch_size=16) model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) accelerator.backward(loss) optimizer.step() optimizer.zero_grad()
BF16 Compression Hook API is experimental, and it requires NCCL version later than 2.9.6.
</Tip> <hfoptions id="bf16"> <hfoption id="PyTorch"></hfoption> <hfoption id="Accelerate">import torch from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.algorithms.ddp_comm_hooks import default_hooks class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) model = MyModel() model = DDP(model, device_ids=[torch.cuda.current_device()]) model.register_comm_hook(state=None, hook=default_hooks.bf16_compress_hook) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) loss.backward() optimizer.step() optimizer.zero_grad()
</hfoption> </hfoptions>from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs import torch class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) # DDP Communication Hook setup ddp_kwargs = DistributedDataParallelKwargs(comm_hook=DDPCommunicationHookType.BF16) accelerator = Accelerator(kwargs_handlers=[ddp_kwargs]) model = MyModel() optimizer = torch.optim.Adam(model.parameters()) data_loader = DataLoader(dataset, batch_size=16) model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) accelerator.backward(loss) optimizer.step() optimizer.zero_grad()
PowerSGD typically requires extra memory of the same size as the model’s gradients to enable error feedback, which can compensate for biased compressed communication and improve accuracy.
</Tip> <hfoptions id="powerSGD"> <hfoption id="PyTorch"></hfoption> <hfoption id="Accelerate">import torch from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) model = MyModel() model = DDP(model, device_ids=[torch.cuda.current_device()]) state = powerSGD_hook.PowerSGDState(process_group=None) model.register_comm_hook(state=state, hook=powerSGD_hook.powerSGD_hook) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) loss.backward() optimizer.step() optimizer.zero_grad()
</hfoption> </hfoptions>from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs import torch class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) # DDP Communication Hook setup ddp_kwargs = DistributedDataParallelKwargs(comm_hook=DDPCommunicationHookType.POWER_SGD) accelerator = Accelerator(kwargs_handlers=[ddp_kwargs]) model = MyModel() optimizer = torch.optim.Adam(model.parameters()) data_loader = DataLoader(dataset, batch_size=16) model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) accelerator.backward(loss) optimizer.step() optimizer.zero_grad()
There are two additional utilities for supporting optional functionalities with the communication hooks.
comm_wrapper
is an option to wrap a communication hook with additional functionality. For example, it can be used to combine FP16 compression with other communication strategies. Currently supported wrappers are no
, fp16
, and bf16
.
from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs import torch class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) # DDP Communication Hook setup ddp_kwargs = DistributedDataParallelKwargs( comm_hook=DDPCommunicationHookType.POWER_SGD, comm_wrapper=DDPCommunicationHookType.FP16 ) accelerator = Accelerator(kwargs_handlers=[ddp_kwargs]) model = MyModel() optimizer = torch.optim.Adam(model.parameters()) data_loader = DataLoader(dataset, batch_size=16) model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) accelerator.backward(loss) optimizer.step() optimizer.zero_grad()
comm_state_option
allows you to pass additional state information required by certain communication hooks. This is particularly useful for stateful hooks like PowerSGD
, which require maintaining hyperparameters and internal states across training steps. Below is an example showcasing the use of comm_state_option
with the PowerSGD
hook.
from accelerate import Accelerator, DDPCommunicationHookType, DistributedDataParallelKwargs import torch class MyModel(torch.nn.Module): def __init__(self): super().__init__() self.layer = torch.nn.Linear(10, 10) def forward(self, x): return self.layer(x) # DDP Communication Hook setup ddp_kwargs = DistributedDataParallelKwargs( comm_hook=DDPCommunicationHookType.POWER_SGD, comm_state_option={"matrix_approximation_rank": 2} ) accelerator = Accelerator(kwargs_handlers=[ddp_kwargs]) model = MyModel() optimizer = torch.optim.Adam(model.parameters()) data_loader = DataLoader(dataset, batch_size=16) model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) # Training loop for data, targets in data_loader: outputs = model(data) loss = criterion(outputs, targets) accelerator.backward(loss) optimizer.step() optimizer.zero_grad()
For more advanced usage and additional hooks, refer to the PyTorch DDP Communication Hooks documentation.
Set up the accelerate config by running accelerate config
and answer the SageMaker questions and set it up.
To use SageMaker DDP, select it when asked
What is the distributed mode? ([0] No distributed training, [1] data parallelism):
.
Example config below:
base_job_name: accelerate-sagemaker-1 compute_environment: AMAZON_SAGEMAKER distributed_type: DATA_PARALLEL ec2_instance_type: ml.p3.16xlarge iam_role_name: xxxxx image_uri: null mixed_precision: fp16 num_machines: 1 profile: xxxxx py_version: py38 pytorch_version: 1.10.2 region: us-east-1 transformers_version: 4.17.0 use_cpu: false
currently in development, will be supported soon.
🤗 Accelerate currently uses the 🤗 DLCs, with transformers
, datasets
and tokenizers
pre-installed. If you
want to use different/other Python packages you can do this by adding them to the requirements.txt
. These packages
will be installed before your training script is started.
The local mode in the SageMaker SDK allows you to run your training script locally inside the HuggingFace DLC (Deep Learning container) or using your custom container image. This is useful for debugging and testing your training script inside the final container environment. Local mode uses Docker compose (Note: Docker Compose V2 is not supported yet). The SDK will handle the authentication against ECR to pull the DLC to your local environment. You can emulate CPU (single and multi-instance) and GPU (single instance) SageMaker training jobs.
To use local mode, you need to set your ec2_instance_type
to local
.
ec2_instance_type: local
The configuration allows you to override parameters for the Estimator.
These settings have to be applied in the config file and are not part of accelerate config
. You can control many additional aspects of the training job, e.g. use Spot instances, enable network isolation and many more.
additional_args: # enable network isolation to restrict internet access for containers enable_network_isolation: True
You can find all available configuration here.
You can use Spot Instances e.g. using (see Advanced configuration):
additional_args: use_spot_instances: True max_wait: 86400
Note: Spot Instances are subject to be terminated and training to be continued from a checkpoint. This is not handled in 🤗 Accelerate out of the box. Contact us if you would like this feature.
undecided if feature is needed. Contact us if you would like this feature.
def check_early_stopping(self, eval_loss): delta = self.lowest_loss - eval_loss if delta >= self.min_delta: self.lowest_loss = eval_loss self.counter = 0 else: self.counter += 1 if self.counter >= self.patience: return True return False
def training_function(config, args): # For testing only if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1": config["num_epochs"] = 2 # New Code # # Pass the advanced FSDP settings not part of the accelerate config by creating fsdp_plugin fsdp_plugin = FullyShardedDataParallelPlugin( state_dict_config=FullStateDictConfig(offload_to_cpu=False, rank0_only=False), optim_state_dict_config=FullOptimStateDictConfig(offload_to_cpu=False, rank0_only=False), ) # Initialize accelerator if args.with_tracking: accelerator = Accelerator( cpu=args.cpu, mixed_precision=args.mixed_precision, log_with="wandb", project_dir=args.logging_dir, fsdp_plugin=fsdp_plugin, ) else: accelerator = Accelerator(fsdp_plugin=fsdp_plugin) accelerator.print(accelerator.distributed_type) if hasattr(args.checkpointing_steps, "isdigit"): if args.checkpointing_steps == "epoch": checkpointing_steps = args.checkpointing_steps elif args.checkpointing_steps.isdigit(): checkpointing_steps = int(args.checkpointing_steps) else: raise ValueError( f"Argument `checkpointing_steps` must be either a number or `epoch`. `{args.checkpointing_steps}` passed." ) else: checkpointing_steps = None # Sample hyper-parameters for learning rate, batch size, seed and a few other HPs lr = config["lr"] num_epochs = int(config["num_epochs"]) seed = int(config["seed"]) batch_size = int(config["batch_size"]) # We need to initialize the trackers we use, and also store our configuration if args.with_tracking: experiment_config = vars(args) accelerator.init_trackers("fsdp_glue_no_trainer", experiment_config) tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path) datasets = load_dataset("glue", "mrpc") metric = evaluate.load("glue", "mrpc") def tokenize_function(examples): # max_length=None => use the model max length (it's actually the default) outputs = tokenizer(examples["sentence1"], examples["sentence2"], truncation=True, max_length=None) return outputs # Apply the method we just defined to all the examples in all the splits of the dataset # starting with the main process first: with accelerator.main_process_first(): tokenized_datasets = datasets.map( tokenize_function, batched=True, remove_columns=["idx", "sentence1", "sentence2"], ) # We also rename the 'label' column to 'labels' which is the expected name for labels by the models of the # transformers library tokenized_datasets = tokenized_datasets.rename_column("label", "labels") # If the batch size is too big we use gradient accumulation gradient_accumulation_steps = 1 if batch_size > MAX_GPU_BATCH_SIZE and accelerator.distributed_type != DistributedType.XLA: gradient_accumulation_steps = batch_size // MAX_GPU_BATCH_SIZE batch_size = MAX_GPU_BATCH_SIZE def collate_fn(examples): # On TPU it's best to pad everything to the same length or training will be very slow. max_length = 128 if accelerator.distributed_type == DistributedType.XLA else None # When using mixed precision we want round multiples of 8/16 if accelerator.mixed_precision == "fp8": pad_to_multiple_of = 16 elif accelerator.mixed_precision != "no": pad_to_multiple_of = 8 else: pad_to_multiple_of = None return tokenizer.pad( examples, padding="longest", max_length=max_length, pad_to_multiple_of=pad_to_multiple_of, return_tensors="pt", ) # Instantiate dataloaders. train_dataloader = DataLoader( tokenized_datasets["train"], shuffle=True, collate_fn=collate_fn, batch_size=batch_size ) eval_dataloader = DataLoader( tokenized_datasets["validation"], shuffle=False, collate_fn=collate_fn, batch_size=EVAL_BATCH_SIZE ) set_seed(seed) # Instantiate the model (we build the model here so that the seed also control new weights initialization) model = AutoModelForSequenceClassification.from_pretrained( args.model_name_or_path, return_dict=True, low_cpu_mem_usage=True ) no_decay = ["bias", "LayerNorm.weight"] optimizer_grouped_parameters = [ { "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], "weight_decay": 0.003, }, { "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], "weight_decay": 0.0, }, ] optimizer = torch.optim.AdamW(params=optimizer_grouped_parameters, lr=lr, weight_decay=2e-4) # Instantiate scheduler lr_scheduler = get_linear_schedule_with_warmup( optimizer=optimizer, num_warmup_steps=10, num_training_steps=(len(train_dataloader) * num_epochs) // gradient_accumulation_steps, ) model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, eval_dataloader, lr_scheduler ) overall_step = 0 # Potentially load in the weights and states from a previous save if args.resume_from_checkpoint: if args.resume_from_checkpoint is not None or args.resume_from_checkpoint != "": accelerator.print(f"Resumed from checkpoint: {args.resume_from_checkpoint}") accelerator.load_state(args.resume_from_checkpoint) path = os.path.basename(args.resume_from_checkpoint) else: # Get the most recent checkpoint dirs = [f.name for f in os.scandir(os.getcwd()) if f.is_dir()] dirs.sort(key=os.path.getctime) path = dirs[-1] # Sorts folders by date modified, most recent checkpoint is the last # Extract `epoch_{i}` or `step_{i}` training_difference = os.path.splitext(path)[0] if "epoch" in training_difference: num_epochs -= int(training_difference.replace("epoch_", "")) resume_step = None else: resume_step = int(training_difference.replace("step_", "")) num_epochs -= resume_step // len(train_dataloader) # If resuming by step, we also need to know exactly how far into the DataLoader we went resume_step = (num_epochs * len(train_dataloader)) - resume_step # Now we train the model for epoch in range(num_epochs): # New Code # # context manager to track the peak memory usage during the training epoch with TorchTracemalloc() as tracemalloc: model.train() if args.with_tracking: total_loss = 0 for step, batch in enumerate(train_dataloader): # We need to skip steps until we reach the resumed step if args.resume_from_checkpoint and epoch == 0: if resume_step is not None and step < resume_step: pass # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) outputs = model(**batch) loss = outputs.loss # We keep track of the loss at each epoch if args.with_tracking: total_loss += loss.detach().float() accelerator.backward(loss) if step % gradient_accumulation_steps == 0: optimizer.step() lr_scheduler.step() optimizer.zero_grad() # accelerator.print(lr_scheduler.get_lr()) overall_step += 1 if isinstance(checkpointing_steps, int): output_dir = f"step_{overall_step}" if overall_step % checkpointing_steps == 0: if args.output_dir is not None: output_dir = os.path.join(args.output_dir, output_dir) accelerator.save_state(output_dir) # New Code # # Printing the GPU memory usage details such as allocated memory, peak memory, and total memory usage accelerator.print(f"Memory before entering the train : {b2mb(tracemalloc.begin)}") accelerator.print(f"Memory consumed at the end of the train (end-begin): {tracemalloc.used}") accelerator.print(f"Peak Memory consumed during the train (max-begin): {tracemalloc.peaked}") accelerator.print( f"Total Peak Memory consumed during the train (max): {tracemalloc.peaked + b2mb(tracemalloc.begin)}" ) # Logging the peak memory usage of the GPU to the tracker if args.with_tracking: accelerator.log( { "train_total_peak_memory": tracemalloc.peaked + b2mb(tracemalloc.begin), }, step=epoch, ) # New Code # # context manager to track the peak memory usage during the evaluation with TorchTracemalloc() as tracemalloc: model.eval() for step, batch in enumerate(eval_dataloader): # We could avoid this line since we set the accelerator with `device_placement=True`. batch.to(accelerator.device) with torch.no_grad(): outputs = model(**batch) predictions = outputs.logits.argmax(dim=-1) predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) metric.add_batch( predictions=predictions, references=references, ) eval_metric = metric.compute() # Use accelerator.print to print only on the main process. accelerator.print(f"epoch {epoch}:", eval_metric) if args.with_tracking: accelerator.log( { "accuracy": eval_metric["accuracy"], "f1": eval_metric["f1"], "train_loss": total_loss.item() / len(train_dataloader), }, step=epoch, ) if checkpointing_steps == "epoch": output_dir = f"epoch_{epoch}" if args.output_dir is not None: output_dir = os.path.join(args.output_dir, output_dir) accelerator.save_state(output_dir) # New Code # # Printing the GPU memory usage details such as allocated memory, peak memory, and total memory usage accelerator.print(f"Memory before entering the eval : {b2mb(tracemalloc.begin)}") accelerator.print(f"Memory consumed at the end of the eval (end-begin): {tracemalloc.used}") accelerator.print(f"Peak Memory consumed during the eval (max-begin): {tracemalloc.peaked}") accelerator.print( f"Total Peak Memory consumed during the eval (max): {tracemalloc.peaked + b2mb(tracemalloc.begin)}" ) # Logging the peak memory usage of the GPU to the tracker if args.with_tracking: accelerator.log( { "eval_total_peak_memory": tracemalloc.peaked + b2mb(tracemalloc.begin), }, step=epoch, ) if args.with_tracking: accelerator.end_training()