Skip to content

Exclusive

ExclusiveRemoteTask dataclass

ExclusiveRemoteTask(
    _task_ir: QuEraTaskSpecification | None,
    _metadata: Dict[str, ParamType],
    _parallel_decoder: ParallelDecoder | None,
    _http_handler: HTTPHandlerABC = HTTPHandler(),
    _task_id: str | None = None,
    _task_result_ir: QuEraTaskResults | None = None,
)

Bases: CustomRemoteTaskABC

pull

pull(poll_interval: float = 20)

Blocking pull to get the task result. poll_interval is the time interval to poll the task status. Please ensure that it is relatively large, otherwise the server could get overloaded with queries.

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def pull(self, poll_interval: float = 20):
    """
    Blocking pull to get the task result.
    poll_interval is the time interval to poll the task status.
    Please ensure that it is relatively large, otherwise
    the server could get overloaded with queries.
    """

    while True:
        if self._task_result_ir.task_status is QuEraTaskStatusCode.Unsubmitted:
            raise ValueError("Task ID not found.")

        if self._task_result_ir.task_status in [
            QuEraTaskStatusCode.Completed,
            QuEraTaskStatusCode.Partial,
            QuEraTaskStatusCode.Failed,
            QuEraTaskStatusCode.Unaccepted,
            QuEraTaskStatusCode.Cancelled,
        ]:
            return self

        status = self.status()
        if status in [QuEraTaskStatusCode.Completed, QuEraTaskStatusCode.Partial]:
            self._task_result_ir = self._http_handler.fetch_results(self._task_id)
            return self

        time.sleep(poll_interval)

HTTPHandler

HTTPHandler(
    zapier_webhook_url: str = None,
    zapier_webhook_key: str = None,
    vercel_api_url: str = None,
)

Bases: HTTPHandlerABC

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    zapier_webhook_url: str = None,
    zapier_webhook_key: str = None,
    vercel_api_url: str = None,
):
    self.zapier_webhook_url = zapier_webhook_url or os.environ["ZAPIER_WEBHOOK_URL"]
    self.zapier_webhook_key = zapier_webhook_key or os.environ["ZAPIER_WEBHOOK_KEY"]
    self.verrcel_api_url = vercel_api_url or os.environ["VERCEL_API_URL"]

fetch_results

fetch_results(task_id: str)

Fetch the task results from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def fetch_results(self, task_id: str):
    response = request(
        "GET",
        self.verrcel_api_url,
        params={
            "searchPattern": task_id,
            "magicToken": self.zapier_webhook_key,
            "useRegex": False,
        },
    )
    if response.status_code != 200:
        print(f"HTTP request failed with status code: {response.status_code}")
        print("HTTP responce: ", response.text)
        return None

    response_data = response.json()
    # Get "matched" from the response
    matches = response_data.get("matches", None)
    # The return is a list of dictionaries
    # Verify if the list contains only one element
    if matches is None:
        print("No task found with the given ID.")
        return None
    elif len(matches) > 1:
        print("Multiple tasks found with the given ID.")
        return None
    record = matches[0]
    if record.get("status") == "Completed":
        googledoc = record.get("resultsFileUrl")

        # convert the preview URL to download URL
        googledoc = convert_preview_to_download(googledoc)
        res = get(googledoc)
        res.raise_for_status()
        data = res.json()

        task_results = QuEraTaskResults(**data)
    return task_results

query_task_status

query_task_status(task_id: str)

Query the task status from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def query_task_status(self, task_id: str):
    response = request(
        "GET",
        self.verrcel_api_url,
        params={
            "searchPattern": task_id,
            "magicToken": self.zapier_webhook_key,
            "useRegex": False,
        },
    )
    if response.status_code != 200:
        return "HTTP Request Failed."
    response_data = response.json()
    # Get "matched" from the response
    matches = response_data.get("matches", None)
    # The return is a list of dictionaries
    # Verify if the list contains only one element
    if matches is None:
        print("No task found with the given ID.")
        return "Task searching Failed"
    elif len(matches) > 1:
        print("Multiple tasks found with the given ID.")
        return "Task searching Failed"

    record = matches[0]

    # Extract the status from the first dictionary
    status = record.get("status")

    if status == "Failed validation":
        googledoc = record.get("resultsFileUrl")

        # convert the preview URL to download URL
        googledoc = convert_preview_to_download(googledoc)
        res = get(googledoc)
        res.raise_for_status()
        data = res.json()
        # get the "statusCode" and "message" from the data and print them out.
        status_code = data.get("statusCode", "NA")
        message = data.get("message", "NA")
        print(
            f"Task validation failed with status code: {status_code}, message: {message}"
        )

    return status

submit_task_via_zapier

submit_task_via_zapier(
    task_ir: QuEraTaskSpecification,
    task_id: str,
    task_note: str,
)

Submit a task and add task_id to the task fields for querying later.

Parameters:

Name Type Description Default
task_ir QuEraTaskSpecification

The task to be submitted.

required
task_id str

The task id to be added to the task fields.

required

returns response: The response from the Zapier webhook. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def submit_task_via_zapier(
    self, task_ir: QuEraTaskSpecification, task_id: str, task_note: str
):
    # implement http request logic to submit task via Zapier
    request_options = dict(params={"key": self.zapier_webhook_key, "note": task_id})

    # for metadata, task_ir in self._compile_single(shots, use_experimental, args):
    json_request_body = task_ir.json(exclude_none=True, exclude_unset=True)

    request_options.update(data=json_request_body)
    response = request("POST", self.zapier_webhook_url, **request_options)

    if response.status_code == 200:
        response_data = response.json()
        submit_status = response_data.get("status", None)
        return submit_status
    else:
        print(f"HTTP request failed with status code: {response.status_code}")
        print("HTTP responce: ", response.text)
        return "HTTP Request Failed"

HTTPHandlerABC

fetch_results abstractmethod

fetch_results(task_id: str)

Fetch the task results from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
50
51
52
53
54
55
56
57
58
59
60
61
62
@abc.abstractmethod
def fetch_results(task_id: str):
    """Fetch the task results from the AirTable.

    args:
        task_id: The task id to be queried.

    returns
        response: The response from the AirTable. used for error handling

    """

    ...

query_task_status abstractmethod

query_task_status(task_id: str)

Query the task status from the AirTable.

Parameters:

Name Type Description Default
task_id str

The task id to be queried.

required

returns response: The response from the AirTable. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
37
38
39
40
41
42
43
44
45
46
47
48
@abc.abstractmethod
def query_task_status(task_id: str):
    """Query the task status from the AirTable.

    args:
        task_id: The task id to be queried.

    returns
        response: The response from the AirTable. used for error handling

    """
    ...

submit_task_via_zapier abstractmethod

submit_task_via_zapier(
    task_ir: QuEraTaskSpecification, task_id: str
)

Submit a task and add task_id to the task fields for querying later.

Parameters:

Name Type Description Default
task_ir QuEraTaskSpecification

The task to be submitted.

required
task_id str

The task id to be added to the task fields.

required

returns response: The response from the Zapier webhook. used for error handling

Source code in .venv/lib/python3.12/site-packages/bloqade/analog/task/exclusive.py
23
24
25
26
27
28
29
30
31
32
33
34
35
@abc.abstractmethod
def submit_task_via_zapier(task_ir: QuEraTaskSpecification, task_id: str):
    """Submit a task and add task_id to the task fields for querying later.

    args:
        task_ir: The task to be submitted.
        task_id: The task id to be added to the task fields.

    returns
        response: The response from the Zapier webhook. used for error handling

    """
    ...