Multiplexing queries
In the Sources and Sinks page, we already discussed multiplexing as a way to
split a query into multiple sinks. This page will go a bit deeper in this concept, as it is
important to understand when combining LazyFrames with procedural programming constructs.
When dealing with eager dataframes, it is very common to keep state in a temporary variable. Let's
look at the following example. Below we create a DataFrame with 10 unique elements in a random
order (so that Polars doesn't hit any fast paths for sorted keys).
np.random.seed(0)
a = np.arange(0, 10)
np.random.shuffle(a)
df = pl.DataFrame({"n": a})
print(df)
shape: (10, 1)
┌─────┐
│ n │
│ --- │
│ i64 │
╞═════╡
│ 2 │
│ 8 │
│ 4 │
│ 9 │
│ 1 │
│ 6 │
│ 7 │
│ 3 │
│ 0 │
│ 5 │
└─────┘
Eager
If you deal with the Polars eager API, making a variable and iterating over that temporary
DataFrame gives the result you expect, as the result of the group-by is stored in df1. Even
though the output order is unstable, it doesn't matter as it is eagerly evaluated. The follow
snippet therefore doesn't raise and the assert passes.
# A group-by doesn't guarantee order
df1 = df.group_by("n").len()
# Take the lower half and the upper half in a list
out = [df1.slice(offset=i * 5, length=5) for i in range(2)]
# Assert df1 is equal to the sum of both halves
pl.testing.assert_frame_equal(df1, pl.concat(out))
Lazy
Now if we tried this naively with LazyFrames, this would fail.
lf1 = df.lazy().group_by("n").len()
out = [lf1.slice(offset=i * 5, length=5).collect() for i in range(2)]
pl.testing.assert_frame_equal(lf1.collect(), pl.concat(out))
AssertionError: DataFrames are different (value mismatch for column 'n')
[left]: [9, 2, 0, 5, 3, 1, 7, 8, 6, 4]
[right]: [0, 9, 6, 8, 2, 5, 4, 3, 1, 7]
The reason this fails is that lf1 doesn't contain the materialized result of
df.lazy().group_by("n").len(), it instead holds the query plan in that variable.
This means that every time we branch of this LazyFrame and call collect we re-evaluate the
group-by. Besides being expensive, this also leads to unexpected results if you assume that the
output is stable (which isn't the case here).
In the example above you are actually evaluating 2 query plans:
Plan 1
Plan 2
Combine the query plans
To circumvent this, we must give Polars the opportunity to look at all the query plans in a single
optimization and execution pass. This can be done by passing the diverging LazyFrame's to the
collect_all function.
lf1 = df.lazy().group_by("n").len()
out = [lf1.slice(offset=i * 5, length=5) for i in range(2)]
results = pl.collect_all([lf1] + out)
pl.testing.assert_frame_equal(results[0], pl.concat(results[1:]))
If we explain the combined queries with pl.explain_all, we can also observe that they are shared
under a single "SINK_MULTIPLE" evaluation and that the optimizer has recognized that parts of the
query come from the same subplan, indicated by the inserted "CACHE" nodes.
SINK_MULTIPLE
PLAN 0:
CACHE[id: dc7555a5-6c1a-4e8a-8e62-b334212758ac]
simple π 2/2 ["n", "len"]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT["n"] 1/1 COLUMNS
PLAN 1:
SLICE[offset: 0, len: 5]
CACHE[id: dc7555a5-6c1a-4e8a-8e62-b334212758ac]
simple π 2/2 ["n", "len"]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT["n"] 1/1 COLUMNS
PLAN 2:
SLICE[offset: 5, len: 5]
CACHE[id: dc7555a5-6c1a-4e8a-8e62-b334212758ac]
simple π 2/2 ["n", "len"]
AGGREGATE[maintain_order: false]
[len()] BY [col("n")]
FROM
DF ["n"]; PROJECT["n"] 1/1 COLUMNS
END SINK_MULTIPLE
Combining related subplans in a single execution unit with pl.collect_all can thus lead to large
performance increases and allows diverging query plans, storing temporary tables, and a more
procedural programming style.