191 lines
6.8 KiB
Python
Executable File
191 lines
6.8 KiB
Python
Executable File
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime, timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
from . import USER_CONFIG_DIR
|
|
from .torch_utils import TORCH_1_9
|
|
|
|
if TYPE_CHECKING:
|
|
from ultralytics.engine.trainer import BaseTrainer
|
|
|
|
|
|
_RUN_TIMESTAMP_STORES = {}
|
|
|
|
|
|
def find_free_network_port() -> int:
|
|
"""Find a free port on localhost.
|
|
|
|
It is useful in single-node training when we don't want to connect to a real main node but have to set the
|
|
`MASTER_PORT` environment variable.
|
|
|
|
Returns:
|
|
(int): The available network port number.
|
|
"""
|
|
import socket
|
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("127.0.0.1", 0))
|
|
return s.getsockname()[1] # port
|
|
|
|
|
|
def _get_run_sync_port(base_name: str) -> int:
|
|
"""Derive a stable coordination port for sharing a run timestamp across nodes."""
|
|
override = os.getenv("ULTRALYTICS_RUN_SYNC_PORT")
|
|
if override:
|
|
try:
|
|
return int(override)
|
|
except ValueError as exc:
|
|
raise RuntimeError("ULTRALYTICS_RUN_SYNC_PORT must be an integer.") from exc
|
|
|
|
try:
|
|
master_port = int(os.getenv("MASTER_PORT", "29500"))
|
|
except ValueError:
|
|
master_port = 29500
|
|
|
|
offset = 11 + sum(ord(char) for char in base_name) % 97
|
|
return 1024 + ((master_port + offset - 1024) % 64512)
|
|
|
|
|
|
def get_distributed_run_timestamp(base_name: str, world_size: int, rank: int, *, timeout: timedelta = timedelta(seconds=120)) -> str:
|
|
"""Return a timestamp string that is shared across all distributed ranks."""
|
|
if world_size <= 1 or rank == -1:
|
|
return datetime.now().strftime("%Y%m%d%H%M%S")
|
|
|
|
master_addr = os.getenv("MASTER_ADDR")
|
|
if not master_addr:
|
|
raise RuntimeError("MASTER_ADDR is required to coordinate a distributed run name.")
|
|
|
|
from torch.distributed import TCPStore
|
|
|
|
sync_port = _get_run_sync_port(base_name)
|
|
safe_base_name = "".join(char if char.isalnum() else "_" for char in base_name)
|
|
key = f"ultralytics_run_timestamp_{safe_base_name}"
|
|
is_master = rank == 0
|
|
store_host = "0.0.0.0" if is_master else master_addr
|
|
|
|
try:
|
|
store = TCPStore(store_host, sync_port, world_size, is_master, timeout=timeout, wait_for_workers=True)
|
|
except TypeError:
|
|
store = TCPStore(store_host, sync_port, world_size, is_master, timeout)
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
f"Failed to coordinate the distributed run timestamp via TCPStore at {master_addr}:{sync_port}. "
|
|
"Set ULTRALYTICS_RUN_SYNC_PORT to an open cross-node port if needed."
|
|
) from exc
|
|
|
|
# Keep the TCPStore alive for the full process lifetime. Rank 0 hosts the store server, so if this object is
|
|
# garbage collected as soon as build_run_name() returns, the other ranks can lose the connection mid-handshake.
|
|
_RUN_TIMESTAMP_STORES[(master_addr, sync_port, world_size, rank)] = store
|
|
|
|
if is_master:
|
|
store.set(key, datetime.now().strftime("%Y%m%d%H%M%S"))
|
|
|
|
timestamp = store.get(key)
|
|
return timestamp.decode("utf-8") if isinstance(timestamp, bytes) else str(timestamp)
|
|
|
|
|
|
def generate_ddp_file(trainer: BaseTrainer) -> str:
|
|
"""Generate a DDP (Distributed Data Parallel) file for multi-GPU training.
|
|
|
|
This function creates a temporary Python file that enables distributed training across multiple GPUs. The file
|
|
contains the necessary configuration to initialize the trainer in a distributed environment.
|
|
|
|
Args:
|
|
trainer (ultralytics.engine.trainer.BaseTrainer): The trainer containing training configuration and arguments.
|
|
Must have args attribute and be a class instance.
|
|
|
|
Returns:
|
|
(str): Path to the generated temporary DDP file.
|
|
|
|
Notes:
|
|
The generated file is saved in the USER_CONFIG_DIR/DDP directory and includes:
|
|
- Trainer class import
|
|
- Configuration overrides from the trainer arguments
|
|
- Model path configuration
|
|
- Training initialization code
|
|
"""
|
|
module, name = f"{trainer.__class__.__module__}.{trainer.__class__.__name__}".rsplit(".", 1)
|
|
|
|
content = f"""
|
|
# Ultralytics Multi-GPU training temp file (should be automatically deleted after use)
|
|
from pathlib import Path, PosixPath # For model arguments stored as Path instead of str
|
|
overrides = {vars(trainer.args)}
|
|
|
|
if __name__ == "__main__":
|
|
from {module} import {name}
|
|
from ultralytics.utils import DEFAULT_CFG_DICT
|
|
|
|
cfg = DEFAULT_CFG_DICT.copy()
|
|
cfg.update(save_dir='') # handle the extra key 'save_dir'
|
|
trainer = {name}(cfg=cfg, overrides=overrides)
|
|
trainer.args.model = "{getattr(trainer.hub_session, "model_url", trainer.args.model)}"
|
|
results = trainer.train()
|
|
"""
|
|
(USER_CONFIG_DIR / "DDP").mkdir(exist_ok=True)
|
|
with tempfile.NamedTemporaryFile(
|
|
prefix="_temp_",
|
|
suffix=f"{id(trainer)}.py",
|
|
mode="w+",
|
|
encoding="utf-8",
|
|
dir=USER_CONFIG_DIR / "DDP",
|
|
delete=False,
|
|
) as file:
|
|
file.write(content)
|
|
return file.name
|
|
|
|
|
|
def generate_ddp_command(trainer: BaseTrainer) -> tuple[list[str], str]:
|
|
"""Generate command for distributed training.
|
|
|
|
Args:
|
|
trainer (ultralytics.engine.trainer.BaseTrainer): The trainer containing configuration for distributed training.
|
|
|
|
Returns:
|
|
cmd (list[str]): The command to execute for distributed training.
|
|
file (str): Path to the temporary file created for DDP training.
|
|
"""
|
|
import __main__ # noqa local import to avoid https://github.com/Lightning-AI/pytorch-lightning/issues/15218
|
|
|
|
if not trainer.resume:
|
|
shutil.rmtree(trainer.save_dir) # remove the save_dir
|
|
file = generate_ddp_file(trainer)
|
|
dist_cmd = "torch.distributed.run" if TORCH_1_9 else "torch.distributed.launch"
|
|
port = find_free_network_port()
|
|
cmd = [
|
|
sys.executable,
|
|
"-m",
|
|
dist_cmd,
|
|
"--nproc_per_node",
|
|
f"{trainer.world_size}",
|
|
"--master_port",
|
|
f"{port}",
|
|
file,
|
|
]
|
|
return cmd, file
|
|
|
|
|
|
def ddp_cleanup(trainer: BaseTrainer, file: str) -> None:
|
|
"""Delete temporary file if created during distributed data parallel (DDP) training.
|
|
|
|
This function checks if the provided file contains the trainer's ID in its name, indicating it was created as a
|
|
temporary file for DDP training, and deletes it if so.
|
|
|
|
Args:
|
|
trainer (ultralytics.engine.trainer.BaseTrainer): The trainer used for distributed training.
|
|
file (str): Path to the file that might need to be deleted.
|
|
|
|
Examples:
|
|
>>> trainer = YOLOTrainer()
|
|
>>> file = "/tmp/ddp_temp_123456789.py"
|
|
>>> ddp_cleanup(trainer, file)
|
|
"""
|
|
if f"{id(trainer)}.py" in file: # if temp_file suffix in file
|
|
os.remove(file)
|