Skip to content

How to implement a parallel version of trajectories collecting? #5

@fokx

Description

@fokx

I know this may not be an issue but I wish to get an answer from gurus in Python and Pytorch.(I am not a CS major student but loves Deep RL)
In homework 2's pdf, you said:

A serious bottleneck in the learning, for more complex environments, is the sample collection time. In infrastructure/rl_trainer.py, we only collect trajectories in a single thread, but this process can be fully parallelized across threads to get a useful speedup. Implement the parallelization and report on the difference in training time.

I have tried using multiprocessing in cs285.infrastructure.utils.sample_trajectories and

def sample_trajectories(env, policy, min_timesteps_per_batch, max_path_length, render=False, render_mode=('rgb_array')):
  import multiprocessing as mp
  try:
    mp.set_start_method('spawn')
  except RuntimeError:
    pass
  mp_paths_manager = mp.Manager()
  mp_paths = mp_paths_manager.list()
  mp_timesteps_this_batch = mp.Value('i', 0)
  enough_event = mp.Event()

  processes = []
  num_process = mp.cpu_count()
  # num_process = 6  # spwan new thread also requires new gpu memory

  for p in range(num_process):
    p = mp.Process(target=sample_trajectory,
                   args=(mp_timesteps_this_batch, min_timesteps_per_batch, mp_paths, enough_event,
                         env, policy, max_path_length, render, render_mode))
    p.start()
    processes.append(p)
  enough_event.wait()
  for p in processes:
    p.terminate()
  for p in processes:
    p.join()
  with mp_timesteps_this_batch.get_lock():
    mp_timesteps_this_batch = mp_timesteps_this_batch.value
  paths = list(mp_paths)
  return paths, mp_timesteps_this_batch

and modified cs285.infrastructure.utils.sample_trajectory with:

def sample_trajectory(mp_timesteps_this_batch, min_timesteps_per_batch, mp_paths, enough_event,
                      env, policy, max_path_length, render=False, render_mode=('rgb_array')):
  while True:
    # initialize env for the beginning of a new rollout
    ob = env.reset()  # HINT: should be the output of resetting the env

    ......

    path_to_append = Path(obs, image_obs, acs, rewards, next_obs, terminals)
    len_path_to_append = get_pathlength(path_to_append)
    with mp_timesteps_this_batch.get_lock():
      if mp_timesteps_this_batch.value >= min_timesteps_per_batch:
        enough_event.set()
      elif mp_timesteps_this_batch.value + len_path_to_append >= min_timesteps_per_batch:
        mp_paths.append(path_to_append)
        mp_timesteps_this_batch.value += len_path_to_append
        enough_event.set()
      else:
        mp_paths.append(path_to_append)
        mp_timesteps_this_batch.value += len_path_to_append

However, because sample_trajectory uses policy.get_action, which involves CUDA operations, there is a lot errors thrown such as:
CUDA error: an illegal memory access was encountered.
It seems OK if running completely with only CPU.
So, can anyone gives me a thought on how to implement a parallel version of trajectories collecting?
Thanks a lot!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions