Skip to content

Multiplexing queries

In the Sources and Sinks page, we already discussed multiplexing as a way to split a query into multiple sinks. This page will go a bit deeper in this concept, as it is important to understand when combining LazyFrames with procedural programming constructs.

When dealing with eager dataframes, it is very common to keep state in a temporary variable. Let's look at the following example. Below we create a DataFrame with 10 unique elements in a random order (so that Polars doesn't hit any fast paths for sorted keys).

np.random.seed(0)
a = np.arange(0, 10)
np.random.shuffle(a)
df = pl.DataFrame({"n": a})
print(df)
shape: (10, 1)
┌─────┐
│ n   │
│ --- │
│ i64 │
╞═════╡
│ 2   │
│ 8   │
│ 4   │
│ 9   │
│ 1   │
│ 6   │
│ 7   │
│ 3   │
│ 0   │
│ 5   │
└─────┘

Eager

If you deal with the Polars eager API, making a variable and iterating over that temporary DataFrame gives the result you expect, as the result of the group-by is stored in df1. Even though the output order is unstable, it doesn't matter as it is eagerly evaluated. The follow snippet therefore doesn't raise and the assert passes.

# A group-by doesn't guarantee order
df1 = df.group_by("n").len()

# Take the lower half and the upper half in a list
out = [df1.slice(offset=i * 5, length=5) for i in range(2)]

# Assert df1 is equal to the sum of both halves
pl.testing.assert_frame_equal(df1, pl.concat(out))

Lazy

Now if we tried this naively with LazyFrames, this would fail.

lf1 = df.lazy().group_by("n").len()

out = [lf1.slice(offset=i * 5, length=5).collect() for i in range(2)]

pl.testing.assert_frame_equal(lf1.collect(), pl.concat(out))
AssertionError: DataFrames are different (value mismatch for column 'n')
[left]:  [9, 2, 0, 5, 3, 1, 7, 8, 6, 4]
[right]: [0, 9, 6, 8, 2, 5, 4, 3, 1, 7]

The reason this fails is that lf1 doesn't contain the materialized result of df.lazy().group_by("n").len(), it instead holds the query plan in that variable.

This means that every time we branch of this LazyFrame and call collect we re-evaluate the group-by. Besides being expensive, this also leads to unexpected results if you assume that the output is stable (which isn't the case here).

In the example above you are actually evaluating 2 query plans:

Plan 1

Plan 2

Combine the query plans

To circumvent this, we must give Polars the opportunity to look at all the query plans in a single optimization and execution pass. This can be done by passing the diverging LazyFrame's to the collect_all function.

lf1 = df.lazy().group_by("n").len()

out = [lf1.slice(offset=i * 5, length=5) for i in range(2)]
results = pl.collect_all([lf1] + out)

pl.testing.assert_frame_equal(results[0], pl.concat(results[1:]))

If we explain the combined queries with pl.explain_all, we can also observe that they are shared under a single "SINK_MULTIPLE" evaluation and that the optimizer has recognized that parts of the query come from the same subplan, indicated by the inserted "CACHE" nodes.

SINK_MULTIPLE
  PLAN 0:
    CACHE[id: 0, cache_hits: 2]
      AGGREGATE[maintain_order: false]
        [len()] BY [col("n")]
        FROM
        DF ["n"]; PROJECT */1 COLUMNS
  PLAN 1:
    SLICE[offset: 0, len: 5]
      CACHE[id: 0, cache_hits: 2]
        AGGREGATE[maintain_order: false]
          [len()] BY [col("n")]
          FROM
          DF ["n"]; PROJECT */1 COLUMNS
  PLAN 2:
    SLICE[offset: 5, len: 5]
      CACHE[id: 0, cache_hits: 2]
        AGGREGATE[maintain_order: false]
          [len()] BY [col("n")]
          FROM
          DF ["n"]; PROJECT */1 COLUMNS
END SINK_MULTIPLE

Combining related subplans in a single execution unit with pl.collect_all can thus lead to large performance increases and allows diverging query plans, storing temporary tables, and a more procedural programming style.