import os import socket from time import sleep, time import sys import tempfile from urllib.parse import urlparse import torch import torch.distributed as dist import torch.nn as nn from torch.utils.data import TensorDataset from torch.distributed.optim import ZeroRedundancyOptimizer import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.net1 = nn.Linear(10, 10) self.relu = nn.ReLU() self.net2 = nn.Linear(10, 5) def forward(self, x): return self.net2(self.relu(self.net1(x))) def demo_basic(rank, local_rank): local_hostname = socket.gethostname() if rank == 0: print("Rank 0 on node {}".format(local_hostname), flush=True) # compute our local rank on the node and select a corresponding gpu, # this assumes we started exactly one rank per gpu on the node ngpus_per_node = torch.cuda.device_count() print(f"ngpus per node {ngpus_per_node}", flush=True) if ngpus_per_node > 0: # local_rank is passed in from spmd_main torch.cuda.set_device(local_rank) device = torch.device("cuda") print( f"Rank: {rank} on host {local_hostname} has local_rank: {local_rank} " f"cuda device {torch.cuda.current_device()}", flush=True, ) ddp_device_ids = [local_rank] else: local_rank = 0 device = torch.device("cpu") print( f"Rank: {rank} on host {local_hostname} has no CUDA, using CPU", flush=True, ) ddp_device_ids = None # First small ToyModel DDP example (kept from original) model = ToyModel().to(device) ddp_model = DDP(model, device_ids=ddp_device_ids) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) batch_size = 32 n_data_per_rank = batch_size * 60 image_size = 256 nc = 1 ndf = 256 num_epochs = 10 lr = 0.0002 class Discriminator(nn.Module): def __init__(self): super(Discriminator, self).__init__() self.main = nn.Sequential( # input is (nc) x 256 x 256 nn.Conv2d(nc, ndf, 4, 2, 1, bias=False), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf) x 128 x 128 nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False), nn.BatchNorm2d(ndf * 2), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf*2) x 64 x 64 nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False), nn.BatchNorm2d(ndf * 4), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf*4) x 32 x 32 nn.Conv2d(ndf * 4, ndf * 4, 4, 2, 1, bias=False), nn.BatchNorm2d(ndf * 4), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf*4) x 16 x 16 nn.Conv2d(ndf * 4, ndf * 4, 4, 2, 1, bias=False), nn.BatchNorm2d(ndf * 4), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf*4) x 8 x 8 nn.Conv2d(ndf * 4, ndf * 4, 4, 2, 1, bias=False), nn.BatchNorm2d(ndf * 4), nn.LeakyReLU(0.2, inplace=True), # state size. (ndf*4) x 4 x 4 nn.Conv2d(ndf * 4, 1, 4, 1, 0, bias=False), nn.Sigmoid() ) def forward(self, input): return self.main(input) # Fake dataset X = torch.rand((n_data_per_rank, nc, image_size, image_size)) Y = torch.randint(low=0, high=2, size=(n_data_per_rank, 1, 1, 1)).float() model = Discriminator() model.to(device) # move to GPU or CPU model = nn.parallel.DistributedDataParallel( model, device_ids=ddp_device_ids, ) criterion = nn.BCELoss() optimizer = ZeroRedundancyOptimizer( model.parameters(), optimizer_class=torch.optim.Adam, lr=lr, ) train_dataset = TensorDataset(X, Y) train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=batch_size, shuffle=True ) if rank == 0: print("Starting Training Loop...", flush=True) for epoch in range(num_epochs): t0 = time() for batch_idx, (data, target) in enumerate(train_loader): optimizer.zero_grad() data = data.to(device) target = target.to(device) # Use original ToyModel DDP for the regression loss outputs = ddp_model(torch.randn(20, 10).to(device)) labels = torch.randn(20, 5).to(device) loss = loss_fn(outputs, labels) loss.backward() optimizer.step() t1 = time() if rank == 0: print( f"Epoch {epoch:03d} Loss {loss:0.4f} Time {t1 - t0:0.4f}", flush=True, ) def spmd_main(rank, size, local_hostname, hostname): # equivalent to MPI init, but using Flux env vars flux_rank = int(os.environ["FLUX_TASK_RANK"]) flux_size = int(os.environ["FLUX_JOB_SIZE"]) # For init_method="env://", PyTorch looks at RANK and WORLD_SIZE os.environ["RANK"] = str(flux_rank) os.environ["WORLD_SIZE"] = str(flux_size) # Compute local_rank for this node ngpus_per_node = torch.cuda.device_count() if ngpus_per_node > 0: local_rank = flux_rank % ngpus_per_node else: local_rank = 0 # Now we can safely pass device_id to avoid the barrier() warning torch.distributed.init_process_group( "nccl", init_method="env://", world_size=flux_size, rank=flux_rank, device_id=local_rank, ) # lookup number of ranks in the job, and our rank size = torch.distributed.get_world_size() rank = torch.distributed.get_rank() # Basic sanity checks assert size == flux_size assert rank == flux_rank demo_basic(rank, local_rank) # Tear down the process group dist.barrier() dist.destroy_process_group() if __name__ == "__main__": try: rank = int(os.environ["FLUX_TASK_RANK"]) size = int(os.environ["FLUX_JOB_SIZE"]) except KeyError as e: raise RuntimeError( "This script must be run under Flux with FLUX_TASK_RANK and " "FLUX_JOB_SIZE set (e.g., via flux run / flux submit)." ) from e print("Rank {} of {} has been initialized.".format(rank, size), flush=True) # MASTER_ADDR / MASTER_PORT are now expected to be set by the launcher master_addr = os.environ.get("MASTER_ADDR") master_port = os.environ.get("MASTER_PORT") if not master_addr or not master_port: raise RuntimeError( "MASTER_ADDR and MASTER_PORT must be set identically on all ranks. " "Set them in the flux launcher, for example:\n" " MASTER_ADDR=$(flux resource list -n 0 -o host | head -n 1)\n" " MASTER_PORT=23457\n" " flux run ... --env=MASTER_ADDR=$MASTER_ADDR --env=MASTER_PORT=$MASTER_PORT ..." ) local_hostname = socket.gethostname() hostname = master_addr # kept for compatibility with spmd_main signature spmd_main(rank, size, local_hostname, hostname)