import os import socket import torch import torch.distributed as dist def main(): # Derive rank and world size from Flux environment rank = int(os.environ["FLUX_TASK_RANK"]) world_size = int(os.environ["FLUX_JOB_SIZE"]) # MASTER_ADDR and MASTER_PORT must be set identically on all ranks # in the Flux launcher (do NOT try to set them per-rank here). master_addr = os.environ.get("MASTER_ADDR") master_port = os.environ.get("MASTER_PORT") if not master_addr or not master_port: raise RuntimeError( "MASTER_ADDR and MASTER_PORT must be set identically on all ranks. " "Set them in the flux launcher, for example:\n" " MASTER_ADDR=$(flux resource list -No host | head -n 1)\n" " MASTER_PORT=23457\n" " flux run ... --env=MASTER_ADDR=$MASTER_ADDR --env=MASTER_PORT=$MASTER_PORT ..." ) local_hostname = socket.gethostname() if rank == 0: print("Rank 0 on node {}".format(local_hostname), flush=True) # Export RANK and WORLD_SIZE for init_method='env://' os.environ["RANK"] = str(rank) os.environ["WORLD_SIZE"] = str(world_size) dist.init_process_group( backend="nccl", init_method="env://", rank=rank, world_size=world_size, ) # Map rank to a GPU device_index = rank % torch.cuda.device_count() torch.cuda.set_device(device_index) tensor = torch.tensor([rank], device="cuda") dist.all_reduce(tensor, op=dist.ReduceOp.SUM) print(f"Rank {rank}/{world_size} on {local_hostname}: tensor = {tensor.item()}", flush=True) dist.destroy_process_group() if __name__ == "__main__": main()