Skip to content

Aod grouping

AOD-compatible rectangular grid construction.

Provides :class:BusContext, which encapsulates the bus-level state and the divide-and-conquer algorithm for grouping lanes into valid AOD rectangular grids.

BusContext dataclass

BusContext(
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
    arch_spec: ArchSpec,
    pos_to_loc: dict[tuple[float, float], LocationAddress],
    collision_srcs: frozenset[LocationAddress],
)

Pre-computed context for AOD grid construction on a single bus.

Encapsulates the position map, collision set, bus identity, and the divide-and-conquer algorithm for building AOD-compatible rectangular grids. Create one per (move_type, bus_id, direction) group via :meth:from_tree.

The grid construction algorithm has two phases:

  1. Greedy init — O(n) sequential pass that forms initial clusters by greedily expanding rectangles one atom at a time.
  2. Merge — iterative passes that try to merge clusters. Clusters that don't participate in any merge are promoted to solved and removed from the active set, since merged clusters only grow and a cluster that can't merge now can never merge later.

build_aod_grids

build_aod_grids(
    entries: dict[int, LaneAddress],
) -> list[frozenset[LaneAddress]]

Build AOD-compatible rectangular grids from desired next-hop lanes.

Two phases: 1. Greedy sequential pass to form initial clusters (O(n)). 2. Merge pass to combine clusters that the greedy ordering missed. Clusters that cannot merge are marked solved and removed from the active set so they don't slow down later rounds.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def build_aod_grids(
    self,
    entries: dict[int, LaneAddress],
) -> list[frozenset[LaneAddress]]:
    """Build AOD-compatible rectangular grids from desired next-hop lanes.

    Two phases:
    1. Greedy sequential pass to form initial clusters (O(n)).
    2. Merge pass to combine clusters that the greedy ordering missed.
       Clusters that cannot merge are marked solved and removed from
       the active set so they don't slow down later rounds.
    """
    clusters = self.greedy_init(entries)
    solved = self.merge_clusters(clusters)

    return [moveset for xs, ys in solved if (moveset := self.rect_to_lanes(xs, ys))]

from_tree classmethod

from_tree(
    tree: ConfigurationTree,
    occupied: frozenset[LocationAddress],
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
) -> BusContext

Build a BusContext from a ConfigurationTree and occupied set.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def from_tree(
    cls,
    tree: ConfigurationTree,
    occupied: frozenset[LocationAddress],
    move_type: MoveType,
    bus_id: int,
    direction: Direction,
) -> BusContext:
    """Build a BusContext from a ConfigurationTree and occupied set."""
    arch_spec = tree.arch_spec
    bus = (
        arch_spec.site_buses if move_type == MoveType.SITE else arch_spec.word_buses
    )[bus_id]

    if move_type == MoveType.SITE:
        src_locs = [
            LocationAddress(w, s) for w in arch_spec.has_site_buses for s in bus.src
        ]
    else:
        src_locs = [
            LocationAddress(w, s) for w in bus.src for s in arch_spec.has_word_buses
        ]

    pos_to_loc: dict[tuple[float, float], LocationAddress] = {}
    for loc in src_locs:
        pos = arch_spec.get_position(loc)
        pos_to_loc[pos] = loc

    # Bus src and dst are disjoint, so follow-moves cannot occur.
    collision: set[LocationAddress] = set()
    for loc in src_locs:
        if loc in occupied:
            lane = LaneAddress(
                move_type, loc.word_id, loc.site_id, bus_id, direction
            )
            _, dst = arch_spec.get_endpoints(lane)
            if dst in occupied:
                collision.add(loc)

    return cls(
        move_type=move_type,
        bus_id=bus_id,
        direction=direction,
        arch_spec=arch_spec,
        pos_to_loc=pos_to_loc,
        collision_srcs=frozenset(collision),
    )

greedy_init

greedy_init(
    entries: dict[int, LaneAddress],
) -> list[Cluster]

Form initial clusters via greedy sequential expansion.

Processes atoms in order and greedily expands a rectangle. Atoms that don't fit start a new cluster in the next pass. Repeats until all atoms are assigned or individually invalid.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def greedy_init(
    self,
    entries: dict[int, LaneAddress],
) -> list[Cluster]:
    """Form initial clusters via greedy sequential expansion.

    Processes atoms in order and greedily expands a rectangle. Atoms
    that don't fit start a new cluster in the next pass. Repeats
    until all atoms are assigned or individually invalid.
    """
    clusters: list[Cluster] = []
    remaining = dict(entries)

    while remaining:
        xs: set[float] = set()
        ys: set[float] = set()
        leftover: dict[int, LaneAddress] = {}

        for qid, lane in remaining.items():
            x, y = self.lane_position(lane)

            if x in xs and y in ys:
                continue

            new_xs = xs | {x}
            new_ys = ys | {y}

            if self.is_valid_rect(new_xs, new_ys):
                xs = new_xs
                ys = new_ys
            else:
                leftover[qid] = lane

        if xs and ys:
            clusters.append((xs, ys))
        else:
            break

        remaining = leftover

    return clusters

is_valid_rect

is_valid_rect(xs: set[float], ys: set[float]) -> bool

Check if every position in the X * Y rectangle is a valid bus source with no collision.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
106
107
108
109
110
111
112
113
114
def is_valid_rect(self, xs: set[float], ys: set[float]) -> bool:
    """Check if every position in the X * Y rectangle is a valid bus
    source with no collision."""
    for x in xs:
        for y in ys:
            loc = self.pos_to_loc.get((x, y))
            if loc is None or loc in self.collision_srcs:
                return False
    return True

lane_position

lane_position(lane: LaneAddress) -> tuple[float, float]

Get the physical (x, y) position of a lane's source.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
134
135
136
137
def lane_position(self, lane: LaneAddress) -> tuple[float, float]:
    """Get the physical (x, y) position of a lane's source."""
    src, _ = self.arch_spec.get_endpoints(lane)
    return self.arch_spec.get_position(src)

merge_clusters

merge_clusters(clusters: list[Cluster]) -> list[Cluster]

Merge clusters until no more merges are possible.

Each pass lets every active cluster try to absorb compatible neighbours. Clusters that don't participate in any merge are promoted to solved and removed from the active set — merged clusters only grow, so a cluster that can't merge now will never be able to merge later.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def merge_clusters(
    self,
    clusters: list[Cluster],
) -> list[Cluster]:
    """Merge clusters until no more merges are possible.

    Each pass lets every active cluster try to absorb compatible
    neighbours. Clusters that don't participate in any merge are
    promoted to solved and removed from the active set — merged
    clusters only grow, so a cluster that can't merge now will
    never be able to merge later.
    """
    solved: list[Cluster] = []

    while len(clusters) > 1:
        n = len(clusters)
        consumed: set[int] = set()
        merged_flags = [False] * n

        for i in range(n):
            if i in consumed:
                continue
            for j in range(i + 1, n):
                if j in consumed:
                    continue
                merged_xs = clusters[i][0] | clusters[j][0]
                merged_ys = clusters[i][1] | clusters[j][1]
                if self.is_valid_rect(merged_xs, merged_ys):
                    clusters[i] = (merged_xs, merged_ys)
                    consumed.add(j)
                    merged_flags[i] = True
                    merged_flags[j] = True

        if not any(merged_flags):
            break

        active: list[Cluster] = []
        for i in range(n):
            if i in consumed:
                continue
            if merged_flags[i]:
                active.append(clusters[i])
            else:
                solved.append(clusters[i])
        clusters = active

    solved.extend(clusters)
    return solved

rect_to_lanes

rect_to_lanes(
    xs: set[float], ys: set[float]
) -> frozenset[LaneAddress]

Convert an X * Y rectangle into the corresponding lane set.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/search/generators/aod_grouping.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def rect_to_lanes(self, xs: set[float], ys: set[float]) -> frozenset[LaneAddress]:
    """Convert an X * Y rectangle into the corresponding lane set."""
    lanes: list[LaneAddress] = []
    for x in xs:
        for y in ys:
            loc = self.pos_to_loc.get((x, y))
            if loc is not None:
                lanes.append(
                    LaneAddress(
                        self.move_type,
                        loc.word_id,
                        loc.site_id,
                        self.bus_id,
                        self.direction,
                    )
                )
    return frozenset(lanes)