Skip to content
This repository was archived by the owner on Mar 21, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ gets uploaded to AzureML, by skipping all test folders.
- ([#546](https://github.com/microsoft/InnerEye-DeepLearning/pull/546)) Environment and hello_world_model documentation updated
- ([#525](https://github.com/microsoft/InnerEye-DeepLearning/pull/525)) Enable --store_dataset_sample
- ([#495](https://github.com/microsoft/InnerEye-DeepLearning/pull/495)) Fix model comparison.
- ([#547](https://github.com/microsoft/InnerEye-DeepLearning/pull/547)) The parameter pl_find_unused_parameters was no longer used
to initialize the DDP Plugin.
- ([#482](https://github.com/microsoft/InnerEye-DeepLearning/pull/482)) Check bool parameter is either true or false.
- ([#475](https://github.com/microsoft/InnerEye-DeepLearning/pull/475)) Bug in AML SDK meant that we could not train
any large models anymore because data loaders ran out of memory.
Expand Down
95 changes: 14 additions & 81 deletions InnerEye/ML/model_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
# ------------------------------------------------------------------------------------------
import logging
import os
import subprocess
import sys
from pathlib import Path
from time import sleep
from typing import Any, Dict, Optional, Tuple, TypeVar

import numpy as np
from pytorch_lightning import LightningModule, Trainer, seed_everything
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.plugins import DDPPlugin
from pytorch_lightning.utilities.exceptions import MisconfigurationException

from InnerEye.Azure.azure_runner import ENV_GLOBAL_RANK, ENV_LOCAL_RANK, ENV_NODE_RANK
from InnerEye.Azure.azure_util import RUN_CONTEXT, is_offline_run_context
Expand Down Expand Up @@ -131,8 +127,15 @@ def create_lightning_trainer(container: LightningContainer,
num_gpus = container.num_gpus_per_node
effective_num_gpus = num_gpus * num_nodes
# Accelerator should be "ddp" when running large models in AzureML (when using DDP_spawn, we get out of GPU memory).
# For unit tests, only "ddp_spawn" works
accelerator = "ddp" if effective_num_gpus > 1 else None
if effective_num_gpus > 1:
accelerator: Optional[str] = "ddp"
# Initialize the DDP plugin. The default for pl_find_unused_parameters is False. If True, the plugin prints out
# lengthy warnings about the performance impact of find_unused_parameters.
plugins = [DDPPlugin(num_nodes=num_nodes, sync_batchnorm=True,
find_unused_parameters=container.pl_find_unused_parameters)]
else:
accelerator = None
plugins = []
logging.info(f"Using {num_gpus} GPUs per node with accelerator '{accelerator}'")
tensorboard_logger = TensorBoardLogger(save_dir=str(container.logs_folder), name="Lightning", version="")
loggers = [tensorboard_logger, AzureMLLogger()]
Expand Down Expand Up @@ -172,6 +175,7 @@ def create_lightning_trainer(container: LightningContainer,
deterministic=deterministic,
benchmark=benchmark,
accelerator=accelerator,
plugins=plugins,
max_epochs=container.num_epochs,
num_sanity_val_steps=container.pl_num_sanity_val_steps,
callbacks=callbacks,
Expand Down Expand Up @@ -269,8 +273,10 @@ def model_train(checkpoint_path: Optional[Path],
# Per-subject model outputs for regression models are written per rank, and need to be aggregated here.
# Each thread per rank will come here, and upload its files to the run outputs. Rank 0 will later download them.
if is_azureml_run and world_size > 1 and isinstance(lightning_model, ScalarLightning):
upload_output_file_as_temp(lightning_model.train_subject_outputs_logger.csv_path, container.outputs_folder) # type: ignore
upload_output_file_as_temp(lightning_model.val_subject_outputs_logger.csv_path, container.outputs_folder) # type: ignore
upload_output_file_as_temp(lightning_model.train_subject_outputs_logger.csv_path, # type: ignore
container.outputs_folder)
upload_output_file_as_temp(lightning_model.val_subject_outputs_logger.csv_path, # type: ignore
container.outputs_folder)
# DDP will start multiple instances of the runner, one for each GPU. Those should terminate here after training.
# We can now use the global_rank of the Lightining model, rather than environment variables, because DDP has set
# all necessary properties.
Expand Down Expand Up @@ -335,76 +341,3 @@ def aggregate_and_create_subject_metrics_file(outputs_folder: Path) -> None:
else:
# For all files but the first one, cut off the header line.
f.write("\n".join(temp_file_contents.splitlines()[1:]))


class InnerEyeDDPPlugin(DDPPlugin):
"""
This is a temporary fix for the broken DDP plugin in Pytorch-Lightning v1.2.8
Hopefully we can remove it once it is fixed in Pytorch-Lightning.
"""

def _call_children_scripts(self) -> None:
# This is the only line changed compared to DDPPlugin
assert self.local_rank == 0

# The code below is in the same as the original DDPPlugin
self._check_can_spawn_children()
self._has_spawned_children = True

# DDP Environment variables
os.environ["MASTER_ADDR"] = self.cluster_environment.master_address() # type: ignore
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port()) # type: ignore

# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank()) # type: ignore
os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank()) # type: ignore

path_lib = os.path.abspath

# pull out the commands used to run the script and resolve the abs file path
command = sys.argv
try:
full_path = path_lib(command[0])
except Exception:
full_path = os.path.abspath(command[0])

command[0] = full_path
# use the same python interpreter and actually running
command = [sys.executable] + command

# the visible devices tell us how many GPUs we want to use.
# when the trainer script was called the device has already been scoped by the time
# code reaches this point. so, to call the scripts, we need to leave cuda visible devices alone
# but forward the GPUs selected via environment variables
if self.parallel_devices is None:
raise MisconfigurationException("you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)")

os.environ["PL_TRAINER_GPUS"] = ",".join([str(device.index) for device in self.parallel_devices])
os.environ["PL_IN_DDP_SUBPROCESS"] = "1"

if self.lightning_module.logger is not None:
os.environ["PL_EXP_VERSION"] = str(self.lightning_module.logger.version)

num_gpus = len(self.parallel_devices)
os.environ["WORLD_SIZE"] = f"{num_gpus * self.num_nodes}"

self.interactive_ddp_procs = []

for local_rank in range(1, self.num_processes): # type: ignore
env_copy = os.environ.copy()
env_copy["LOCAL_RANK"] = f"{local_rank}"

# remove env var if global seed not set
if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy:
del env_copy["PL_GLOBAL_SEED"]

# start process
# if hydra is available and initialized, make sure to set the cwd correctly
cwd: Optional[str] = None
proc = subprocess.Popen(command, env=env_copy, cwd=cwd)
self.interactive_ddp_procs.append(proc)

# starting all processes at once can cause issues
# with dataloaders delay between 1-10 seconds
delay = np.random.uniform(1, 5, 1)[0]
sleep(delay)