Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] RL multinode #152

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/rl_gsm8k.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ vllm_config:
# VLLM get log probs OOM https://github.com/vllm-project/vllm/issues/5907
--enable-chunked-prefill: ""

output_dir: outputs/rl_gsm8k_deepspeed
output_dir: outputs/rl_gsm8k_deepspeed_llama31_70b_new_save_rl_no_kl_lr1e-6_1node
accelerate_cfg_path: conf/accelerate/accelerate_base.yaml
use_deepspeed: false

Expand Down
25 changes: 25 additions & 0 deletions conf/rl_llama31_70b.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defaults:
- rl_gsm8k
- _self_

finetune:
rl:
algo: reinforce
kl_coef: 0.0
reward_minus_kl_coef: 0.0
use_advantages: false
relu_log_p_weights: true
train_batch_size: 1
gradient_accumulation_passes: 32
learning_rate: 1e-6
force_restart: true
max_agent_forks: 5000
model_path: /mnt/llmd/base_models/Meta-Llama-3.1-70B-Instruct
n_workers_per_gpu: 16
get_logprobs_workers_per_gpu: 1
gpus_per_model_instance: 4
use_rejection_sampling: true
test_every_n_iterations: 10
attempts: 8
dataset_name: gsm8k
use_deepspeed: true
File renamed without changes.
365 changes: 195 additions & 170 deletions examples/rl_gsm8k/orchestrate_rl.py

Large diffs are not rendered by default.

77 changes: 59 additions & 18 deletions examples/rl_gsm8k/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(
self.stderr_file: Optional[TextIO] = None
self.stats = {}

# Add node rank awareness
self.node_rank = int(os.environ.get("RANK", 0))
self.port_offset = self.node_rank * 1000 # Ensure different port ranges for each node
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why different port ranges for each node?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically we should be able to update the sync port if the one selected by the toolkit environment is being used. But that's not true and I found out that the reason for those clashes is the subprocess subshell instead.

Copy link
Collaborator

@rizar rizar Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. These vllms are running on different nodes, aren't they?

self.port = port + self.port_offset

def get_base_urls(self) -> list[str]:
return [
f"http://127.0.0.1:{port}" for port in self.ports
Expand Down Expand Up @@ -133,9 +138,9 @@ def _start_service(self) -> None:

threads = []

for i, device_number in enumerate(generate_cuda_device_strings(torch.cuda.device_count(), self.gpus_per_model_instance )):
for i, device_number in enumerate(generate_cuda_device_strings(torch.cuda.device_count(), self.gpus_per_model_instance)):
# Adjust port based on both node rank and GPU index
port = self.port + i
# start_llm(device_number, port, assistant_procs, ports)
thread = threading.Thread(target=self._start_llm, args=(device_number, port))
threads.append(thread)
thread.start()
Expand Down Expand Up @@ -354,47 +359,83 @@ def launch_training(
ValueError: If no GPUs are available
RuntimeError: If training process fails
"""
# environment variables
GLOBAL_RANK = int(os.environ.get("RANK", 0))
MASTER_PORT = int(os.environ.get("MASTER_PORT"))
MASTER_ADDRESS = os.environ.get("MASTER_ADDR")
# this is same as number_of_replicas
WORLD_SIZE = int(os.environ.get("WORLD_SIZE", 2))

# Check GPU availability
num_gpus = torch.cuda.device_count()
num_gpus = torch.cuda.device_count() * int(os.environ.get("WORLD_SIZE", 1))
print('###############################')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use logger.info, and maybe smth like messages, "I'm rank X, training on Y GPU"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just sanity checking, it will be deleted soon. It was there to check that that number is the same as Num processes, but things are clear now.

print(f"Number of GPUs: {num_gpus}")
print('###############################')
is_multinode = num_gpus > 8
if num_gpus == 0:
raise ValueError("No GPUs available for finetuning")

base_cmd = [
"accelerate",
"launch",
"--mixed_precision=bf16",
"--config_file",
accelerate_cfg_path,
"examples/rl_gsm8k/run_finetune.py",
"--config-dir",
config_dir,
"--config-name",
config_name,
"--mixed_precision=bf16",
]

if num_gpus > 1:
if use_deepspeed:
base_cmd[2:2] = [
base_cmd.extend([
"--num_processes",
str(num_gpus),
])
if is_multinode:
base_cmd.extend([
"--num_machines",
str(WORLD_SIZE),
"--machine_rank",
str(GLOBAL_RANK),
"--main_process_ip",
MASTER_ADDRESS,
"--main_process_port",
str(MASTER_PORT),
])
base_cmd.extend([
"--use_deepspeed",
"--deepspeed_config_file",
"conf/accelerate/deepspeed_stage3_bf16.json",
]
"conf/accelerate/ds_multinode.json",
])
if is_multinode:
base_cmd.extend([
"--deepspeed_multinode_launcher",
"standard",
"--same_network",
])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would training multi-node training without accelerate work without DeepSpeed? If not, should add an exception.

else:
base_cmd[2:2] = [
base_cmd.extend([
"--multi_gpu",
"--num_processes",
str(num_gpus),
]
])

base_cmd.extend([
"examples/rl_gsm8k/run_finetune.py",
"--config-dir",
config_dir,
"--config-name",
config_name,
])

logger.info(f"Launching training with command: {' '.join(base_cmd)}")
# try:
# os.execvp(base_cmd[0], base_cmd)
# except Exception as e:
# raise RuntimeError(f"Failed to launch training: {str(e)}")
try:
subprocess.run(
base_cmd,
check=True, # Raises CalledProcessError if return code != 0
text=True,
capture_output=False,
env=os.environ.copy(), # Ensure subprocess inherits environment variables
shell=False,
check=True, # Raises CalledProcessError if return code != 0
)

except subprocess.CalledProcessError as e:
Expand Down
8 changes: 6 additions & 2 deletions tapeagents/finetune/checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def save_model_and_tokenizer(
tokenizer: transformers.PreTrainedTokenizer | transformers.PreTrainedTokenizerFast,
lora: bool = False,
safe_serialization: bool = False,
convert_to_hf: bool = False,
):
logger.info("Saving model and tokenizer")
with get_temporary_folder_and_move(output_dir) as temp_dir:
Expand All @@ -294,6 +295,7 @@ def save_model_and_tokenizer(
unwrap=True,
lora=lora,
safe_serialization=safe_serialization,
convert_to_hf=convert_to_hf,
)
save_tokenizer_only(temp_dir, tokenizer)

Expand All @@ -304,6 +306,7 @@ def save_model_only(
unwrap: bool = True,
lora: bool = False,
safe_serialization: bool = False,
convert_to_hf: bool = False,
):
"""
Save model weights and config.
Expand All @@ -320,6 +323,7 @@ def save_model_only(

The accelerate version must be called on *all* accelerate processes because all of them must save their shards.
The DeepSpeed version is only called on the main process because the checkpointing and conversion mechanism will gather the shards from all processes.
The DeepSpeed conversion is only done for the `current` weights
"""
assert not os.path.exists(output_dir) or output_dir.is_dir(), f"output_dir {output_dir} must be a directory"
accelerator.wait_for_everyone()
Expand All @@ -331,8 +335,8 @@ def save_model_only(
# saving using DeepSpeed's checkpoint mechanism
model.save_checkpoint(save_dir=output_dir)

# convert to HF format on main process
if accelerator.is_main_process:
# Only convert to HF format if requested (e.g. for inference checkpoints)
if convert_to_hf and accelerator.is_main_process:
from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict
logger.info("Converting DeepSpeed checkpoint to HF format")

Expand Down
5 changes: 4 additions & 1 deletion tapeagents/finetune/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,16 @@ def toggle_sync(sync: bool):

if time_to_save:
# Overwrite latest model at pytorch_model.bin (for later JGA evaluation *and* for resuming training)
# conversion to HF format is done only for DeepSpeed models
save_model_and_tokenizer(
current_dir,
model,
tokenizer,
args.lora.enabled,
safe_serialization=args.use_safetensors,
convert_to_hf=True,
)
# Save training state to training_state.pt (for resuming).
# Save training state to training_state.pt (for resuming)
save_training_state(
training_state_dir,
model,
Expand Down Expand Up @@ -298,6 +300,7 @@ def toggle_sync(sync: bool):
tokenizer,
args.lora.enabled,
safe_serialization=args.use_safetensors,
convert_to_hf=False,
)
dt = log_time(dt, "finetune/interim_save")
try:
Expand Down
10 changes: 7 additions & 3 deletions tapeagents/finetune/logging_.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ def init_wandb(
cfg: DictConfig,
run_dir: Path,
config_for_wandb: DictConfig | dict,
) -> wandb_run.Run:
"""Initialize W&B.
) -> wandb_run.Run | None:
"""Initialize W&B on the main process only.

config_for_wandb is the configuration that will be logged to W&B.

Returns None if not on main process.
"""
# Only initialize on main process (rank 0)
if os.environ.get('RANK', '0') != '0':
return None

if config_for_wandb is None:
config_for_wandb = cfg.dict()

Expand Down
17 changes: 12 additions & 5 deletions tapeagents/finetune/optim.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,27 @@ def get_grouped_params(

def get_optimizer(name, model, learning_rate, weight_decay):
grouped_params = get_grouped_params(model, weight_decay)
# Common optimizer parameters
optimizer_kwargs = {
"lr": learning_rate,
"betas": (0.9, 0.999),
"eps": 1e-8,
"weight_decay": weight_decay
}

match name:
case "adamw_torch":
optimizer = AdamW(grouped_params, lr=learning_rate)
optimizer = AdamW(grouped_params, **optimizer_kwargs)
case "cpuadam":
import deepspeed.ops.adam
optimizer = deepspeed.ops.adam.DeepSpeedCPUAdam(grouped_params, **optimizer_kwargs)
case "adafactor":
optimizer = Adafactor(
grouped_params,
lr=learning_rate,
relative_step=False,
scale_parameter=False,
)
case "cpuadam":
import deepspeed.ops.adam

optimizer = deepspeed.ops.adam.DeepSpeedCPUAdam(grouped_params, lr=learning_rate)
case "lion":
optimizer = Lion(grouped_params, lr=learning_rate)
case _:
Expand Down