Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking issue for the new streaming engine #20947

Open
38 of 56 tasks
coastalwhite opened this issue Jan 28, 2025 · 14 comments
Open
38 of 56 tasks

Tracking issue for the new streaming engine #20947

coastalwhite opened this issue Jan 28, 2025 · 14 comments
Labels
new-streaming Features for or dependent on the new streaming engine

Comments

@coastalwhite
Copy link
Collaborator

coastalwhite commented Jan 28, 2025

Note

TLDR
Starting 1.23.0, the old streaming engine will no longer be maintained and might start becoming less usable with time. People currently using it should pin their versions to 1.22.0 or before.

The old streaming engine is being deprecated, and the new streaming engine is coming to replace it soon™. The new streaming engine is far along, but does not yet have 100% feature parity with the old streaming engine. At the same time, the maintenance burden of maintaining both the old and new streaming engine is becoming too large. Therefore, we are deprecating the old streaming engine starting on Polars version 1.23.0 and telling people to pin their Polars versions to any version before 1.23.0.

This issue tracks the progress on the new streaming engine. All non-supported (unless mentioned otherwise) features fall back to the in-memory engine.

The streaming engine can be used already by using .collect(new_streaming=True) or by setting the POLARS_FORCE_NEW_STREAMING=1. The physical plan can be visualized as a dot graph by using POLARS_VISUALIZE_PHYSICAL_PLAN=filename.dot.

Sources

Sinks

Out-of-core

  • Group-by
  • Equi-join
  • Sort

Streaming Nodes

Aggregates

  • Sum
  • Mean
  • Min/max
  • Last/first
  • Var/std
  • NUnique/count
  • Implode
  • Median/quantile

Plan translation to streaming

  • Literal series in selections
  • Aggregates in selections
  • Sorts in selections
  • Filters in selections
  • .over() to group-by + join
  • .replace() to map (small replacement dictionary) or join (large replacement dictionary)

Datatypes

  • Everything

Other

  • Show plan with explain
  • Add support for profile
coastalwhite added a commit to coastalwhite/polars that referenced this issue Jan 28, 2025
Information on the new streaming engine: pola-rs#20947.
@coastalwhite coastalwhite changed the title Tracking issue for new streaming engine Tracking issue for the new streaming engine Jan 28, 2025
@orlp orlp added the new-streaming Features for or dependent on the new streaming engine label Jan 28, 2025
@deanm0000
Copy link
Collaborator

what's the difference between out-of-core group_by/equi-join and streaming nodes group_by/equi-join?

@orlp
Copy link
Collaborator

orlp commented Jan 28, 2025

@deanm0000 Out-of-core operators will automatically spill to disk when the data you're processing is larger than available memory.

@daviskirk
Copy link
Contributor

non-supported (unless mentioned otherwise) features fall back to the in-memory engine

What happens to the other non-supported features like the broken categoricals? Is there any way to opt out of streaming as soon as they are encountered?

@orlp
Copy link
Collaborator

orlp commented Jan 28, 2025

@daviskirk Categoricals are currently the only thing that are simply broken on the new streaming engine (to my knowledge). Almost all unsupported things automatically fall back to the eager engine (just for the parts of the computation that aren't supported), some give a hard error.

My plan is to fix categoricals on the new streaming engine soon.

@jhirsch-mhp

This comment has been minimized.

@deanm0000
Copy link
Collaborator

I saw that var/std are supported in new streaming so I did

import os
os.environ["POLARS_FORCE_NEW_STREAMING"]="1"
import numpy as np
import polars as pl
df=pl.DataFrame({"a":np.random.random(100)})
print(df.lazy().select(pl.col("a").std()).explain(streaming=True))

 SELECT [col("a").std()] FROM
  STREAMING:
    DF ["a"]; PROJECT 1/1 COLUMNS

The std operation is outside of the streaming. I'm not sure how to tell if that's because explain doesn't give a new_streaming query plan or if something is awry with the new_streaming var/cov.

@coastalwhite
Copy link
Collaborator Author

Explain does not yet reflect the new streaming engine. To see what it is actually doing, use POLARS_VISUALIZE_PHYSICAL_PLAN=/path/to/graph.dot. I added explain to the list of things that needs to be done.

@ion-elgreco
Copy link
Contributor

Sinks should also be possible with deltalake since v0.25 if you pass a RecordBatchStream to the writer

@jayceslesar
Copy link

Any insight as to when this will be available with sink_* methods?

@ritchie46
Copy link
Member

Any insight as to when this will be available with sink_* methods?

Next release the sinks will point to the new streaming engine.

@MPJansen
Copy link

MPJansen commented Mar 6, 2025

Any insight as to when this will be available with sink_* methods?

Next release the sinks will point to the new streaming engine.

Hi!

Just checking, does that imply that writing partitioned lazyframes will be supported by the sink_ methods?
Or is that out of scope? If so, is that something Polars is interested in supporting?

KR

@coastalwhite
Copy link
Collaborator Author

coastalwhite commented Mar 6, 2025

Any insight as to when this will be available with sink_* methods?

Next release the sinks will point to the new streaming engine.

Hi!

Just checking, does that imply that writing partitioned lazyframes will be supported by the sink_ methods? Or is that out of scope? If so, is that something Polars is interested in supporting?

KR

I am unsure what you mean. If you mean partitioned sinking, yes. We already have #21573, but will support other variants as well. If you mean multiple sinks in separate lazy frames, not yet, but that will be supported eventually.

@gdementen
Copy link

Will the new streaming engine improve the out-of-core/larger-than-ram capabilities of Polars? I will soon (within 3 months) have to decide which solution to use for a new project with larger than RAM datasets (100-200Gb) and would love to use Polars, which I already use with great results for smaller datasets. Your benchmarks usually involve smaller datasets and when seeing the benchmarks at https://duckdblabs.github.io/db-benchmark/, I got the impression that Polars currently has troubles with those dataset sizes but I am hopeful the new streaming engine will change that. Are my hopes unfounded? (sorry if this is not the correct place to ask such a question)

@coastalwhite
Copy link
Collaborator Author

Currently, the new streaming engine is not really out-of-core yet. This will change in the near future and yes, we should get a lot better at out-of-core.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new-streaming Features for or dependent on the new streaming engine
Projects
None yet
Development

No branches or pull requests

10 participants