Skip to content

Path

PathFinder dataclass

PathFinder(spec: ArchSpec)

physical_address_map class-attribute instance-attribute

physical_address_map: dict[LocationAddress, int] = field(
    init=False, default_factory=dict
)

Map from (zone_id, word_id, site_id) tuple to graph node index.

physical_addresses class-attribute instance-attribute

physical_addresses: list[LocationAddress] = field(
    init=False, default_factory=list
)

Map from graph node index to (zone_id, word_id, site_id) tuple.

site_graph class-attribute instance-attribute

site_graph: PyDiGraph = field(
    init=False, default_factory=PyDiGraph
)

Graph representing all sites and edges as lanes.

extract_lanes_from_path

extract_lanes_from_path(
    path: list[int],
) -> tuple[LaneAddress, ...]

Given a path as node indices, extract the lane addresses.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/arch/path.py
132
133
134
135
136
137
138
139
140
141
142
143
144
def extract_lanes_from_path(self, path: list[int]) -> tuple[LaneAddress, ...]:
    """Given a path as node indices, extract the lane addresses."""
    if len(path) < 2:
        raise ValueError("Path must have at least two nodes to extract lanes.")
    lanes = []
    for start_node, end_node in zip(path, path[1:]):
        lane = self.site_graph.get_edge_data(start_node, end_node)
        if lane is None:
            raise ValueError(
                f"No lane exists between nodes {start_node} and {end_node}."
            )
        lanes.append(lane)
    return tuple(lanes)

extract_locations_from_path

extract_locations_from_path(
    path: list[int],
) -> tuple[LocationAddress, ...]

Given a path as node indices, extract the location addresses.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/arch/path.py
146
147
148
149
150
def extract_locations_from_path(
    self, path: list[int]
) -> tuple[LocationAddress, ...]:
    """Given a path as node indices, extract the location addresses."""
    return tuple(self.physical_addresses[ele] for ele in path)

find_path

find_path(
    start: LocationAddress,
    end: LocationAddress,
    occupied: frozenset[LocationAddress] = frozenset(),
    path_heuristic: Callable[
        [
            tuple[LaneAddress, ...],
            tuple[LocationAddress, ...],
        ],
        float,
    ] = lambda _, __: 0.0,
    edge_weight: (
        Callable[[LaneAddress], float] | None
    ) = None,
) -> (
    tuple[
        tuple[LaneAddress, ...], tuple[LocationAddress, ...]
    ]
    | None
)

Find a weighted shortest path from start to end.

Parameters:

Name Type Description Default
start LocationAddress

The starting location.

required
end LocationAddress

The ending location.

required
occupied frozenset[LocationAddress]

Locations to exclude when searching for a path. If this excludes start or end, no path is returned.

frozenset()
path_heuristic Callable[[tuple[LaneAddress, ...], tuple[LocationAddress, ...]], float]

A tie-breaker over candidate shortest paths, evaluated on the candidate location sequence.

lambda _, __: 0.0
edge_weight Callable[[LaneAddress], float] | None

Optional edge weight function used for shortest-path costs. Defaults to Metrics.get_lane_duration_us when not provided.

None

Returns:

Type Description
tuple[tuple[LaneAddress, ...], tuple[LocationAddress, ...]] | None

A tuple containing: - The selected path as LaneAddress values. - The same path as LocationAddress values (including start and end).

tuple[tuple[LaneAddress, ...], tuple[LocationAddress, ...]] | None

Returns None when no valid path exists.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/arch/path.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def find_path(
    self,
    start: LocationAddress,
    end: LocationAddress,
    occupied: frozenset[LocationAddress] = frozenset(),
    path_heuristic: Callable[
        [tuple[LaneAddress, ...], tuple[LocationAddress, ...]], float
    ] = lambda _, __: 0.0,
    edge_weight: Callable[[LaneAddress], float] | None = None,
) -> tuple[tuple[LaneAddress, ...], tuple[LocationAddress, ...]] | None:
    """Find a weighted shortest path from start to end.

    Args:
        start: The starting location.
        end: The ending location.
        occupied: Locations to exclude when searching for a path. If this excludes
            `start` or `end`, no path is returned.
        path_heuristic: A tie-breaker over candidate shortest paths, evaluated on
            the candidate location sequence.
        edge_weight: Optional edge weight function used for shortest-path costs.
            Defaults to `Metrics.get_lane_duration_us` when not provided.

    Returns:
        A tuple containing:
            - The selected path as `LaneAddress` values.
            - The same path as `LocationAddress` values (including start and end).
        Returns `None` when no valid path exists.
    """
    start_node = self.physical_address_map.get(start)
    end_node = self.physical_address_map.get(end)
    if start_node is None or end_node is None:
        return None
    if start == end:
        return (), (start,)

    available_nodes = [
        node
        for node, address in enumerate(self.physical_addresses)
        if address not in occupied
    ]
    subgraph, node_map = self.site_graph.subgraph_with_nodemap(available_nodes)
    original_to_subgraph = {original: sub for sub, original in node_map.items()}

    if (
        start_node not in original_to_subgraph
        or end_node not in original_to_subgraph
    ):
        return None

    if edge_weight is None:
        resolved_edge_weight = self.metrics.get_lane_duration_us
    else:
        resolved_edge_weight = edge_weight

    try:
        path_nodes = nx.all_shortest_paths(
            subgraph,
            original_to_subgraph[start_node],
            original_to_subgraph[end_node],
            weight_fn=resolved_edge_weight,
        )
    except nx.NoPathFound:
        return None
    original_paths = ([node_map[node] for node in path] for path in path_nodes)
    paths = [
        (
            self.extract_lanes_from_path(original_path),
            self.extract_locations_from_path(original_path),
        )
        for original_path in original_paths
        if len(original_path) >= 2
    ]
    if len(paths) == 0:
        return None
    return min(paths, key=lambda p: path_heuristic(*p))

get_endpoints

get_endpoints(lane: LaneAddress)

Get the start and end LocationAddress for a given LaneAddress.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/arch/path.py
165
166
167
168
169
def get_endpoints(self, lane: LaneAddress):
    """Get the start and end LocationAddress for a given LaneAddress."""
    if lane in self.end_points_cache:
        return self.end_points_cache[lane]
    return None, None

get_lane

get_lane(
    start: LocationAddress, end: LocationAddress
) -> LaneAddress | None

Get the LaneAddress connecting two LocationAddress sites.

Source code in .venv/lib/python3.12/site-packages/bloqade/lanes/arch/path.py
152
153
154
155
156
157
158
159
160
161
162
163
def get_lane(
    self, start: LocationAddress, end: LocationAddress
) -> LaneAddress | None:
    """Get the LaneAddress connecting two LocationAddress sites."""
    start_node = self.physical_address_map.get(start)
    end_node = self.physical_address_map.get(end)
    if start_node is None or end_node is None:
        return None
    edge_data = self.site_graph.get_edge_data(start_node, end_node)
    if edge_data is None:
        return None
    return edge_data