Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slowdown for Dataframes input #71

Open
mwiewior opened this issue Jan 14, 2025 · 3 comments
Open

Slowdown for Dataframes input #71

mwiewior opened this issue Jan 14, 2025 · 3 comments
Labels
bug Something isn't working

Comments

@mwiewior
Copy link
Collaborator

mwiewior commented Jan 14, 2025

See results: https://biodatageeks.org/polars-bio/performance/#dataframes-comparison

Summary:

  1. I verified and excluded data sharing overheads - seems it's zero-copy as promised.
  2. Seems to be related to the output size - the larger output the larger this additional delay.
  3. Maybe there are 2 sources of this slowdown - 1 related to threading, the other has some other source.
  4. I tried to reproduce this issue but I didn't observed anything like that while switching between 3.10 and 3.12
  5. I tried adding pyo3_disable_reference_pool but no impact.

Problem:

  1. When passing DataFrames as an input it turns out that the computations for larger DFs are 2-3x slower in total for 1 thread.
  2. When analyzing that in more details it turns out that the sources of this slowdown is at least twofolds:
    a) per batch join is ~50% slower
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) is done building a hash table from 147 batches, 1194285 rows, took 41 ms
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) finished execution, total processed batches: 1215, total join time: 6242 ms
vs native
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) is done building a hash table from 147 batches, 1194285 rows, took 49 ms
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) finished execution, total processed batches: 1215, total join time: 4079 ms

b) additional delay - for native join time ~ total time:

INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) finished execution, total processed batches: 1215, total join time: 4079 ms
7-8
{
    "results": [
        {
            "name": "polars_bio",
            "min": 4.342850207933225,
            "max": 4.342850207933225,
            "mean": 4.342850207933225,
            "speedup": 1.0
        }
    ]
}

for DF inputs

INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(1) finished execution, total processed batches: 1215, total join time: 6379 ms
7-8
{
    "results": [
        {
            "name": "polars_bio_polars_eager",
            "min": 9.375052332994528,
            "max": 9.375052332994528,
            "mean": 9.375052332994528,
            "speedup": 1.0
        }
    ]
}

so there is another 3s spent somewhere...

Additional observation is that it might be related to threading - if we set DF target partition num =2

dataframes:

INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(29) finished execution, total processed batches: 1215, total join time: 2908 ms
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(30) finished execution, total processed batches: 1215, total join time: 3591 ms
7-8
{
    "results": [
        {
            "name": "polars_bio_polars_eager",
            "min": 3.7248092079535127,
            "max": 3.7248092079535127,
            "mean": 3.7248092079535127,
            "speedup": 1.0
        }
    ]
}

native:

INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(31) finished execution, total processed batches: 1215, total join time: 1740 ms
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(32) finished execution, total processed batches: 1215, total join time: 2212 ms
7-8
{
    "results": [
        {
            "name": "polars_bio",
            "min": 2.328427125001326,
            "max": 2.328427125001326,
            "mean": 2.328427125001326,
            "speedup": 1.0
        }
    ]
}

3 observations:

  1. total time ~ max (threadX processing time) 3591ms vs 3724ms so this additional "delay" is gone
  2. in a system monitor we see that in fact there 3 threads running (CPU ~300%) - like 1 for polars + 2 for DataFusion. This is unlike native approach where if set target partitions 2 we have constant ~200% CPU utilization
  3. because of point 2 - speedup is ~ 2.5x not ~2.0x
    when comparing to native
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(31) finished execution, total processed batches: 1215, total join time: 1740 ms
INFO:sequila_core.physical_planner.joins.interval_join:ThreadId(32) finished execution, total processed batches: 1215, total join time: 2212 ms

gap
for 1th
4079ms vs 6379ms,

for 2th
1740 ms vs 2908 ms
2212 ms vs 3591 ms

@mwiewior mwiewior added the bug Something isn't working label Jan 14, 2025
@mwiewior
Copy link
Collaborator Author

mwiewior commented Jan 15, 2025

One more strange finding data transfer Rust-> Python (DataFusion DF-> Polars DF)(cProfiler):
native 0s

 1215    0.010    0.000    0.010    0.000 {built-in method from_arrow_c_array}

~2.7s

 1215    2.744    0.002    2.744    0.002 {built-in method from_arrow_c_array}

@mwiewior
Copy link
Collaborator Author

The problem seems to be related to Memtable that we create from input DataFrames.
The operation is zero-copy but there is some significant overhead while running queries on top of that (maybe sth related to GIL?).
The current workaround is for larger dataframes use temp parquet files. Needs further investigation.

@pkhamutou
Copy link

I assume you mean this MemTable ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants