Skip to content

Index

Move generators for the configuration search tree.

A MoveGenerator produces candidate move sets from a configuration node. Different implementations enable different search strategies — exhaustive enumeration, goal-directed search, greedy grid growing, etc.

All generators yield candidate frozenset[LaneAddress]. Validation (lane validity, collision checks, transposition table lookups) is performed by ConfigurationTree.apply_move_set and higher-level helpers such as ConfigurationTree.expand_node, so generators are free to over-generate — invalid candidates are filtered out when moves are applied to the tree.

BusContext dataclass

BusContext(
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
    arch_spec: ArchSpec,
    pos_to_loc: dict[tuple[float, float], LocationAddress],
    collision_srcs: frozenset[LocationAddress],
)

Pre-computed context for AOD grid construction on a single bus.

Encapsulates the position map, collision set, bus identity, and the divide-and-conquer algorithm for building AOD-compatible rectangular grids. Create one per (move_type, bus_id, direction) group via :meth:from_tree.

The grid construction algorithm has two phases:

  1. Greedy init — O(n) sequential pass that forms initial clusters by greedily expanding rectangles one atom at a time.
  2. Merge — iterative passes that try to merge clusters. Clusters that don't participate in any merge are promoted to solved and removed from the active set, since merged clusters only grow and a cluster that can't merge now can never merge later.

build_aod_grids

build_aod_grids(
    entries: dict[int, LaneAddress],
) -> list[frozenset[LaneAddress]]

Build AOD-compatible rectangular grids from desired next-hop lanes.

Two phases: 1. Greedy sequential pass to form initial clusters (O(n)). 2. Merge pass to combine clusters that the greedy ordering missed. Clusters that cannot merge are marked solved and removed from the active set so they don't slow down later rounds.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def build_aod_grids(
    self,
    entries: dict[int, LaneAddress],
) -> list[frozenset[LaneAddress]]:
    """Build AOD-compatible rectangular grids from desired next-hop lanes.

    Two phases:
    1. Greedy sequential pass to form initial clusters (O(n)).
    2. Merge pass to combine clusters that the greedy ordering missed.
       Clusters that cannot merge are marked solved and removed from
       the active set so they don't slow down later rounds.
    """
    clusters = self.greedy_init(entries)
    solved = self.merge_clusters(clusters)

    return [moveset for xs, ys in solved if (moveset := self.rect_to_lanes(xs, ys))]

from_tree classmethod

from_tree(
    tree: ConfigurationTree,
    occupied: frozenset[LocationAddress],
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
) -> BusContext

Build a BusContext from a ConfigurationTree and occupied set.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def from_tree(
    cls,
    tree: ConfigurationTree,
    occupied: frozenset[LocationAddress],
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
) -> BusContext:
    """Build a BusContext from a ConfigurationTree and occupied set."""
    arch_spec = tree.arch_spec
    bus = (
        arch_spec.site_buses if move_type == MoveType.SITE else arch_spec.word_buses
    )[bus_id]

    if move_type == MoveType.SITE:
        src_locs = [
            LocationAddress(w, s) for w in arch_spec.has_site_buses for s in bus.src
        ]
    else:
        src_locs = [
            LocationAddress(w, s) for w in bus.src for s in arch_spec.has_word_buses
        ]

    pos_to_loc: dict[tuple[float, float], LocationAddress] = {}
    for loc in src_locs:
        pos = arch_spec.get_position(loc)
        pos_to_loc[pos] = loc

    # Bus src and dst are disjoint, so follow-moves cannot occur.
    collision: set[LocationAddress] = set()
    for loc in src_locs:
        if loc in occupied:
            lane = LaneAddress(
                move_type, loc.word_id, loc.site_id, bus_id, direction
            )
            _, dst = arch_spec.get_endpoints(lane)
            if dst in occupied:
                collision.add(loc)

    return cls(
        move_type=move_type,
        bus_id=bus_id,
        direction=direction,
        arch_spec=arch_spec,
        pos_to_loc=pos_to_loc,
        collision_srcs=frozenset(collision),
    )

greedy_init

greedy_init(
    entries: dict[int, LaneAddress],
) -> list[Cluster]

Form initial clusters via greedy sequential expansion.

Processes atoms in order and greedily expands a rectangle. Atoms that don't fit start a new cluster in the next pass. Repeats until all atoms are assigned or individually invalid.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def greedy_init(
    self,
    entries: dict[int, LaneAddress],
) -> list[Cluster]:
    """Form initial clusters via greedy sequential expansion.

    Processes atoms in order and greedily expands a rectangle. Atoms
    that don't fit start a new cluster in the next pass. Repeats
    until all atoms are assigned or individually invalid.
    """
    clusters: list[Cluster] = []
    remaining = dict(entries)

    while remaining:
        xs: set[float] = set()
        ys: set[float] = set()
        leftover: dict[int, LaneAddress] = {}

        for qid, lane in remaining.items():
            x, y = self.lane_position(lane)

            if x in xs and y in ys:
                continue

            new_xs = xs | {x}
            new_ys = ys | {y}

            if self.is_valid_rect(new_xs, new_ys):
                xs = new_xs
                ys = new_ys
            else:
                leftover[qid] = lane

        if xs and ys:
            clusters.append((xs, ys))
        else:
            break

        remaining = leftover

    return clusters

is_valid_rect

is_valid_rect(xs: set[float], ys: set[float]) -> bool

Check if every position in the X * Y rectangle is a valid bus source with no collision.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
106
107
108
109
110
111
112
113
114
def is_valid_rect(self, xs: set[float], ys: set[float]) -> bool:
    """Check if every position in the X * Y rectangle is a valid bus
    source with no collision."""
    for x in xs:
        for y in ys:
            loc = self.pos_to_loc.get((x, y))
            if loc is None or loc in self.collision_srcs:
                return False
    return True

lane_position

lane_position(lane: LaneAddress) -> tuple[float, float]

Get the physical (x, y) position of a lane's source.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
134
135
136
137
def lane_position(self, lane: LaneAddress) -> tuple[float, float]:
    """Get the physical (x, y) position of a lane's source."""
    src, _ = self.arch_spec.get_endpoints(lane)
    return self.arch_spec.get_position(src)

merge_clusters

merge_clusters(clusters: list[Cluster]) -> list[Cluster]

Merge clusters until no more merges are possible.

Each pass lets every active cluster try to absorb compatible neighbours. Clusters that don't participate in any merge are promoted to solved and removed from the active set — merged clusters only grow, so a cluster that can't merge now will never be able to merge later.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def merge_clusters(
    self,
    clusters: list[Cluster],
) -> list[Cluster]:
    """Merge clusters until no more merges are possible.

    Each pass lets every active cluster try to absorb compatible
    neighbours. Clusters that don't participate in any merge are
    promoted to solved and removed from the active set — merged
    clusters only grow, so a cluster that can't merge now will
    never be able to merge later.
    """
    solved: list[Cluster] = []

    while len(clusters) > 1:
        n = len(clusters)
        consumed: set[int] = set()
        merged_flags = [False] * n

        for i in range(n):
            if i in consumed:
                continue
            for j in range(i + 1, n):
                if j in consumed:
                    continue
                merged_xs = clusters[i][0] | clusters[j][0]
                merged_ys = clusters[i][1] | clusters[j][1]
                if self.is_valid_rect(merged_xs, merged_ys):
                    clusters[i] = (merged_xs, merged_ys)
                    consumed.add(j)
                    merged_flags[i] = True
                    merged_flags[j] = True

        if not any(merged_flags):
            break

        active: list[Cluster] = []
        for i in range(n):
            if i in consumed:
                continue
            if merged_flags[i]:
                active.append(clusters[i])
            else:
                solved.append(clusters[i])
        clusters = active

    solved.extend(clusters)
    return solved

rect_to_lanes

rect_to_lanes(
    xs: set[float], ys: set[float]
) -> frozenset[LaneAddress]

Convert an X * Y rectangle into the corresponding lane set.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def rect_to_lanes(self, xs: set[float], ys: set[float]) -> frozenset[LaneAddress]:
    """Convert an X * Y rectangle into the corresponding lane set."""
    lanes: list[LaneAddress] = []
    for x in xs:
        for y in ys:
            loc = self.pos_to_loc.get((x, y))
            if loc is not None:
                lanes.append(
                    LaneAddress(
                        self.move_type,
                        loc.word_id,
                        loc.site_id,
                        self.bus_id,
                        self.direction,
                    )
                )
    return frozenset(lanes)

EntropyNode

Bases: Protocol

Minimal node metadata required by HeuristicMoveGenerator.

ExhaustiveMoveGenerator dataclass

ExhaustiveMoveGenerator(
    max_x_capacity: int | None = None,
    max_y_capacity: int | None = None,
)

Enumerates all valid AOD grids from the configuration.

For each (move_type, bus_id, direction) group, finds all source positions, enumerates grid subsets within AOD capacity, and yields the full grid of lane addresses.

Pre-filters grids where an occupied source has an occupied destination (collision) as an optimization.

NOTE for Rust port: replace itertools.combinations with Gosper's hack for bitmask enumeration of exactly-k-set-bits subsets. This avoids iterating all 2^n masks when capacity is small. See #298.

max_x_capacity class-attribute instance-attribute

max_x_capacity: int | None = None

Maximum number of unique X positions the AOD can address.

max_y_capacity class-attribute instance-attribute

max_y_capacity: int | None = None

Maximum number of unique Y positions the AOD can address.

GreedyMoveGenerator dataclass

GreedyMoveGenerator(target: dict[int, LocationAddress])

Greedy shortest-path move generator.

For each unresolved qubit, computes the shortest path to its target while routing around all other atoms (but not itself) and takes the first lane. Those first-step lanes are grouped by (move_type, bus_id, direction) and partitioned into AOD-compatible rectangular grids via BusContext.

target instance-attribute

target: dict[int, LocationAddress]

Mapping of qubit ID to target location.

generate

generate(
    node: ConfigurationNode, tree: ConfigurationTree
) -> Iterator[frozenset[LaneAddress]]

Yield candidate move sets.

Steps: 1. For each qubit, find shortest path to target routing around all other atoms (but not itself). 2. Take the first lane from each path. 3. Group by (move_type, bus_id, direction). 4. Build AOD-compatible rectangular grids per group.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/greedy.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def generate(
    self,
    node: ConfigurationNode,
    tree: ConfigurationTree,
) -> Iterator[frozenset[LaneAddress]]:
    """Yield candidate move sets.

    Steps:
    1. For each qubit, find shortest path to target routing around
       all other atoms (but not itself).
    2. Take the first lane from each path.
    3. Group by (move_type, bus_id, direction).
    4. Build AOD-compatible rectangular grids per group.
    """
    occupied = node.occupied_locations | tree.blocked_locations

    first_lanes: dict[int, LaneAddress] = {}
    for qid, current in node.configuration.items():
        target_loc = self.target.get(qid)
        if target_loc is None or current == target_loc:
            continue

        result = tree.path_finder.find_path(
            current, target_loc, occupied=occupied - {current}
        )
        if result is None:
            continue

        lanes, _ = result
        if lanes:
            first_lanes[qid] = lanes[0]

    if not first_lanes:
        return

    groups: dict[
        tuple[MoveType, int, Direction],
        dict[int, LaneAddress],
    ] = {}
    for qid, lane in first_lanes.items():
        key = (lane.move_type, lane.bus_id, lane.direction)
        groups.setdefault(key, {})[qid] = lane

    for (mt, bid, d), entries in groups.items():
        ctx = BusContext.from_tree(tree, occupied, mt, bid, d)
        yield from ctx.build_aod_grids(entries)

HeuristicMoveGenerator dataclass

HeuristicMoveGenerator(
    scorer: CandidateScorer,
    params: SearchParams,
    search_nodes: dict[int, EntropyNode],
)

Generates ranked candidate movesets using entropy-weighted scoring.

Implements the MoveGenerator protocol. The traversal sets search_nodes before the search loop begins so the generator can read per-node entropy.

search_nodes instance-attribute

search_nodes: dict[int, EntropyNode]

Mapping of id(ConfigurationNode) -> entropy metadata node.

generate

generate(
    node: ConfigurationNode, tree: ConfigurationTree
) -> Iterator[frozenset[LaneAddress]]

Yield candidate movesets ranked by moveset score (descending).

Steps: 1. Get entropy from traversal metadata node. 2. Score all (qubit, move_type, bus_id, direction) pairs. 3. Per-qubit: keep top C triples. 4. Group by (move_type, bus_id, direction), collect positive-score qubits. 5. Resolve conflicts within each group (greedy by score). 6. Build moveset per group, score with score_moveset, sort descending.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/heuristic.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def generate(
    self,
    node: ConfigurationNode,
    tree: ConfigurationTree,
) -> Iterator[frozenset[LaneAddress]]:
    """Yield candidate movesets ranked by moveset score (descending).

    Steps:
    1. Get entropy from traversal metadata node.
    2. Score all (qubit, move_type, bus_id, direction) pairs.
    3. Per-qubit: keep top C triples.
    4. Group by (move_type, bus_id, direction), collect positive-score qubits.
    5. Resolve conflicts within each group (greedy by score).
    6. Build moveset per group, score with score_moveset, sort descending.
    """
    search_node = self.search_nodes.get(id(node))
    entropy = search_node.entropy if search_node is not None else 1

    scores = self.scorer.score_all_qubit_bus_pairs(node, entropy, tree)
    if not scores:
        return

    # Per-qubit: keep top C (move_type, bus_id, direction) triples
    qubit_top: dict[int, list[tuple[MoveType, int, Direction, float]]] = {}
    for (qid, mt, bid, d), score in scores.items():
        qubit_top.setdefault(qid, []).append((mt, bid, d, score))

    for qid in qubit_top:
        qubit_top[qid].sort(key=lambda x: x[3], reverse=True)
        qubit_top[qid] = qubit_top[qid][: self.params.top_c]

    # Group by (move_type, bus_id, direction): collect positive-scoring qubits
    groups: dict[
        tuple[MoveType, int, Direction],
        list[tuple[int, float]],
    ] = {}
    for qid, top_triples in qubit_top.items():
        for mt, bid, d, score in top_triples:
            if score > 0:
                groups.setdefault((mt, bid, d), []).append((qid, score))

    # Fallback: if no group has positive-scoring qubits, use best single entry
    if not groups:
        best_key = max(scores, key=lambda key: scores[key])
        qid, mt, bid, d = best_key
        groups[(mt, bid, d)] = [(qid, scores[best_key])]

    # Build one moveset per group with conflict resolution
    occupied = node.occupied_locations | tree.blocked_locations
    candidates: list[tuple[float, frozenset[LaneAddress]]] = []

    for (mt, bid, d), qubit_scores in groups.items():
        # Sort qubits by score descending for greedy conflict resolution
        qubit_scores.sort(key=lambda x: x[1], reverse=True)

        lanes: list[LaneAddress] = []
        used_dsts: set[LocationAddress] = set()

        for qid, _ in qubit_scores:
            loc = node.configuration[qid]
            lane = tree.lane_for_source(mt, bid, d, loc)
            if lane is None:
                continue
            _, dst = tree.arch_spec.get_endpoints(lane)
            if dst in occupied:
                continue
            # Destination conflict: another qubit in this group targets same dst
            if dst in used_dsts:
                continue
            # Lane compatibility check with existing lanes in this group
            if lanes and not all(
                tree.arch_spec.compatible_lanes(lane, existing)
                for existing in lanes
            ):
                continue
            lanes.append(lane)
            used_dsts.add(dst)

        if not lanes:
            continue

        moveset = frozenset(lanes)
        ms_score = self.scorer.score_moveset(moveset, node, tree)
        candidates.append((ms_score, moveset))

    # Sort by moveset score descending
    candidates.sort(key=lambda x: x[0], reverse=True)

    for _, moveset in candidates:
        yield moveset

MoveGenerator

Bases: Protocol

Interface for generating candidate move sets from a configuration.

Implementations yield candidate move sets. Validation and node creation are handled by ConfigurationTree.expand_node.

generate

generate(
    node: ConfigurationNode, tree: ConfigurationTree
) -> Iterator[frozenset[LaneAddress]]

Yield candidate move sets from the given configuration.

Parameters:

Name Type Description Default
node ConfigurationNode

The configuration to generate moves from.

required
tree ConfigurationTree

The configuration tree (provides arch_spec, path_finder).

required

Yields:

Type Description
frozenset[LaneAddress]

frozenset[LaneAddress] — each candidate parallel move set.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/base.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def generate(
    self,
    node: ConfigurationNode,
    tree: ConfigurationTree,
) -> Iterator[frozenset[LaneAddress]]:
    """Yield candidate move sets from the given configuration.

    Args:
        node: The configuration to generate moves from.
        tree: The configuration tree (provides arch_spec, path_finder).

    Yields:
        frozenset[LaneAddress] — each candidate parallel move set.
    """
    ...