Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global context not imported in transform_spec function with reader_pool_type="process" #524

Open
kaiwenw opened this issue Mar 27, 2020 · 5 comments

Comments

@kaiwenw
Copy link

kaiwenw commented Mar 27, 2020

Hello,
I'm blocked on trying to perform some preprocessing (i.e. transform) on the data before returning from the (Pytorch) dataloader. However, my preprocessing function's return type is a dataclass, rather than a dictionary. My solution is to "flatten" the typed dataclass object to a dictionary in transform, then "unflatten" it back to a dataclass object in collate_fn, which is a bit messy (involves this flattening and unflattening, as well as specifying a new schema of the flattened dictionary).
BTW: I also know that I can simply preprocess within collate_fn, but that is only on the master thread and cannot be parallelized; I would prefer to have preprocessing run in parallel.

So first, I was wondering if there's a better way to do this with Petastorm?

In the code example below, I drafted a simple example with this method. When I ran it with reader_pool_type="thread", I saw no parallelization on different cores, which I suspect is from the GIL issue with python. So I tried reader_pool_type="process", which gives me the error "NameError: name 'PreprocessedTrainingBatch' is not defined". Essentially, the global context is not visible in the transform function. For reference, I also tried running examples/mnist/tests/pytorch_example.py with reader_pool_type="process" and it failed with a similar error, saying that the package "transforms" isn't defined.

My second question is if setting reader_pool_type="process" is the way to get it running on multiple cores and if there's a fix to this issue with the transform functions?

Thanks a lot!
Kaiwen
@selitvin
transform.txt

@selitvin
Copy link
Collaborator

Hi.

Second question: the solution is easy: don't have the classes that are supposed to be used on the worker in the main module, as it breaks serialization. I was able to fix it by moving if __name__ == "__main__": clause into a separate module.

In your data structure, you are using only vectors and appears that you do not need multidimensional arrays. In that case, it could be better for you to use vanilla Parquet stores, without Petatorm metadata:

def make_dataset():
    def row_generator(x):
        return {
            "state": [x] * 64,
            "action": x % 4,
            "reward": 1.,
            "mdp_id": x,
            "action_probability": [x] * 4,
        }

    spark = SparkSession.builder.config("spark.driver.memory", "2g").master("local[2]").getOrCreate()
    
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    rows_rdd = (
        sc.parallelize(range(DATASET_SIZE))
        .map(row_generator)
    )

    df = spark.createDataFrame(rows_rdd)
    df = df.repartition(10)
    df.write.mode("overwrite").parquet(PETASTORM_URI)

In order to read a vanilla Parquet store with Petastorm, you can:

def time_with_transform():
    preprocessor = MockDiscreteDQNBatchPreprocessor()
    transform = TransformSpec(transform_row(preprocessor))
    data_reader = make_batch_reader(PETASTORM_URI, num_epochs=None, reader_pool_type=READER_POOL_TYPE, transform_spec=transform)

    # collate_fn = collate_and_unflatten(PreprocessedTrainingBatch)
    with DataLoader(data_reader, batch_size=batch_size) as dataloader:
        for _ in range(10):
            next(data_reader)
        start_time = time.time()
        for i, data in enumerate(dataloader):
            if i == 0:
                print(f"{i} data {type(data)}")
            if i >50:
                break
        print(f"{i} data {type(data)}")


    end_time = time.time()
    print(f"with transform: {end_time-start_time} secs")
  • If your consumer is PyTorch code, you don't care about properly declaring your schema. That feature is important for Tensorflow, as we need to know the types to construct computational graph.
  • Working with make_batch_reader would be faster, as you can do vector processing in transform function on the entire pandas dataframe at once.
  • Modified time measuring code so it does not include the setup time for the reader and started measuring after some warmup cycles.
  • I might have missed your main point about data classes though.... I don't know if this is applicable to your use case, but I think it would be best to leave your data as pd.Dataframe (that is converted to pyarrow table by Petastorm) until you can actually feed it into model, to speed up the system.

Here is how I modified your code.

@kaiwenw
Copy link
Author

kaiwenw commented Mar 31, 2020

Hi @selitvin, thanks for the detailed response!

For my use case, each row will probably be 1,000 to 10,000 floats. Do you think Petastorm would make sense in that case?

My consumer is always PyTorch, but I found when using Petastorm, there wasn't a way around it, when:

  • making the dataset via materialize_dataset
  • specifying transform output schema (when I added new fields in transform, it would give me an error saying unexpected fields).

Yes, I agree that it's fastest to leave data as pd.Dataframe. However, my existing codebase has a bunch of preprocessors that return a dataclass rather than a dictionary, which is why I wanted to do the flattening conversion. So to use the original preprocessors unchanged, I'd have to do this conversion to dictionary, which causes two challenges:

  1. need to specify output schema via edited fields, and removed fields. This is messy right now.
  2. need to flatten to a dictionary and then unflatten in collate_fn.

Do you see a better way to make this work with petastorm?

Thanks a lot!

@selitvin
Copy link
Collaborator

selitvin commented Apr 1, 2020

My consumer is always PyTorch, but I found when using Petastorm, there wasn't a way around it, when:
In my example I do create a TransformSpec without defining the output schema. Is it different from your case?

If you have no choice and you'd need to go from a dataframe to list of dataclasses and back to dataframe, then indeed, working with make_reader might end up being faster. It might also be possible to iron-out the code path from after the transform to self.publish_func(all_cols) in process in petastorm/py_dict_reader_worker.py, then you might be able to get your datastructs forwarded to your main process. Hacking this should not be too hard of an endeavor. If you find this useful, we can think of a way to land this kind of a change.

@kaiwenw
Copy link
Author

kaiwenw commented Apr 1, 2020

For defining the output schema, your example worked because the fields of the transformed dictionary is the same as before. In my case, if I want to "flatten" the dataclass, I would end up with differently named fields than before (as you can see from as_flatten_dict and from_flatten_dict).

Also, why would make_reader end up faster with this dataclass example instead of make_batch_reader if the row size stays the same?

@selitvin
Copy link
Collaborator

selitvin commented Apr 2, 2020

Pretty soon #512 we will have an automatic mechanism in place to derive the schema of the data produced by the transform. I hope it will simplify your code (feel free to try infer_schema_from_a_row=True on that development branch - it worked for me).

The reason why the code might run faster with make_reader (I am guessing here, so I might be misleading you), is that make_batch_reader would result in the following chain of transformations in your case:
pyarrow_table -> pandas -> user_transform (to datastruct, your custom func, back to padnas frame) -> pyarrow_table -> to main process -> convert to datastruct.

While with some massaging, make_reader would:
pyarrow_table -> list of dicts -> user transform (dict to datastruct) -> to main process -> no need to do additional transform

So bottom line, going from pandas frame to lists of objects and back and then to list of objects again, seems expensive to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants