Gestire le risorse di calcolo e dati con Qiskit Serverless
Versioni dei pacchetti
Il codice in questa pagina è stato sviluppato utilizzando i seguenti requisiti. Si consiglia di usare queste versioni o versioni più recenti.
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Con Qiskit Serverless puoi gestire le risorse di calcolo e dati nel tuo pattern Qiskit, tra cui CPU, QPU e altri acceleratori di calcolo.
Impostare stati dettagliati​
I carichi di lavoro Serverless attraversano diverse fasi durante un workflow. Per impostazione predefinita, i seguenti stati sono visualizzabili tramite job.status():
QUEUED: il carico di lavoro è in coda per le risorse classicheINITIALIZING: il carico di lavoro è in fase di configurazioneRUNNING: il carico di lavoro è attualmente in esecuzione sulle risorse classicheDONE: il carico di lavoro è stato completato con successo
Puoi anche impostare stati personalizzati che descrivono più in dettaglio la fase specifica del workflow, come mostrato di seguito.
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
Al termine del carico di lavoro (tramite save_result()), lo stato verrà aggiornato automaticamente a DONE.
Workflow paralleli​
Per le attività classiche che possono essere parallelizzate, usa il decoratore @distribute_task per definire i requisiti di calcolo necessari per eseguire un'attività . Inizia riprendendo l'esempio transpile_remote.py dall'argomento Scrivi il tuo primo programma Qiskit Serverless con il codice seguente.
Il codice seguente richiede che tu abbia già salvato le tue credenziali.
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
In questo esempio, hai decorato la funzione transpile_remote() con @distribute_task(target={"cpu": 1}). Durante l'esecuzione, questo crea un task worker parallelo asincrono con un singolo core CPU e restituisce un riferimento per monitorare il worker. Per recuperare il risultato, passa il riferimento alla funzione get(). Possiamo usarlo per eseguire più task in parallelo:
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Esplorare diverse configurazioni di task​
Puoi allocare in modo flessibile CPU, GPU e memoria per i tuoi task tramite @distribute_task(). Per Qiskit Serverless sulla piattaforma IBM Quantum®, ogni programma dispone di 16 core CPU e 32 GB di RAM, che possono essere allocati dinamicamente in base alle necessità .
I core CPU possono essere allocati come core interi o anche come allocazioni frazionarie, come mostrato di seguito.
La memoria viene allocata in numero di byte. Ricorda che ci sono 1024 byte in un kilobyte, 1024 kilobyte in un megabyte e 1024 megabyte in un gigabyte. Per allocare 2 GB di memoria per il tuo worker, devi impostare "mem": 2 * 1024 * 1024 * 1024.
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Gestire i dati nel tuo programma​
Qiskit Serverless ti permette di gestire file nella directory /data attraverso tutti i tuoi programmi. Questo comporta alcune limitazioni:
- Sono supportati oggi solo i file
tareh5 - Si tratta solo di uno storage piatto
/datae non può avere sottodirectory/data/folder/
Di seguito viene mostrato come caricare i file. Assicurati di esserti autenticato a Qiskit Serverless con il tuo account IBM Quantum (consulta Deploy su IBM Quantum Platform per le istruzioni).
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
Successivamente, puoi elencare tutti i file nella tua directory data. Questi dati sono accessibili a tutti i programmi.
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
Questo può essere fatto da un programma usando file_download() per scaricare il file nell'ambiente del programma e decomprimere il tar.
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
A questo punto, il tuo programma può interagire con i file come faresti in un esperimento locale. file_upload(), file_download() e file_delete() possono essere chiamati dal tuo esperimento locale o dal tuo programma caricato, per una gestione dei dati coerente e flessibile.
Passi successivi​
- Consulta un esempio completo che porta il codice esistente su Qiskit Serverless.
- Leggi un articolo scientifico in cui i ricercatori hanno usato Qiskit Serverless e il supercalcolo quantistico-centrico per esplorare la chimica quantistica.