Skip to content

Split static placement

SplitStaticPlacement dataclass

SplitStaticPlacement(
    split_policy: Callable[[Block], list[Block]],
)

Bases: RewriteRule


              flowchart TD
              bloqade.lanes.rewrite.split_static_placement.SplitStaticPlacement[SplitStaticPlacement]

              

              click bloqade.lanes.rewrite.split_static_placement.SplitStaticPlacement href "" "bloqade.lanes.rewrite.split_static_placement.SplitStaticPlacement"
            

Split a StaticPlacement body into multiple StaticPlacement statements.

The policy receives the body block (with its fully-threaded state chain) and returns a list of new blocks. Each block becomes one StaticPlacement. If the policy returns ≤1 block the rewriter is a no-op.

The policy is responsible for state threading: each output block must start with a block argument of StateType and end with place.Yield. The last block must carry the classical results from the original Yield.

cz_layer_split_policy

cz_layer_split_policy(body_block: Block) -> list[ir.Block]

Split a StaticPlacement body into CZ-anchored groups (policy A).

Accumulates SQ gates (R, Rz, StarRz); when a CZ is encountered, flushes [accumulated SQ + CZ] as one group. Remaining SQ after the last CZ forms the final group. Returns the original block unchanged if there is at most one CZ (no split needed).

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/rewrite/split_static_placement.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def cz_layer_split_policy(body_block: ir.Block) -> list[ir.Block]:
    """Split a StaticPlacement body into CZ-anchored groups (policy A).

    Accumulates SQ gates (R, Rz, StarRz); when a CZ is encountered, flushes
    [accumulated SQ + CZ] as one group. Remaining SQ after the last CZ
    forms the final group. Returns the original block unchanged if there
    is at most one CZ (no split needed).
    """
    old_yield = body_block.last_stmt
    assert isinstance(old_yield, place.Yield)
    classical_results = tuple(old_yield.classical_results)

    _supported = (place.R, place.Rz, place.StarRz, place.CZ)
    for stmt in body_block.stmts:
        if not isinstance(stmt, (*_supported, place.Yield)):
            return [body_block]

    stmts: list[_GateStmt] = [s for s in body_block.stmts if isinstance(s, _supported)]

    if not any(isinstance(s, place.CZ) for s in stmts):
        return [body_block]

    groups: list[list[_GateStmt]] = []
    sq_accum: list[_GateStmt] = []

    for stmt in stmts:
        if isinstance(stmt, place.CZ):
            groups.append(sq_accum + [stmt])
            sq_accum = []
        else:
            sq_accum.append(stmt)

    if sq_accum:
        groups.append(sq_accum)

    if len(groups) <= 1:
        return [body_block]

    new_blocks: list[ir.Block] = []
    for group_idx, group in enumerate(groups):
        new_block = ir.Block()
        curr_state = new_block.args.append_from(StateType, "entry_state")

        for stmt in group:
            remapped = stmt.from_stmt(stmt, args=(curr_state, *stmt.args[1:]))
            new_block.stmts.append(remapped)
            curr_state = remapped.state_after

        is_last = group_idx == len(groups) - 1
        extra = classical_results if is_last else ()
        new_block.stmts.append(place.Yield(curr_state, *extra))
        new_blocks.append(new_block)

    return new_blocks