Zarządzanie zasobami obliczeniowymi i danymi w Qiskit Serverless
Wersje pakietów
Kod na tej stronie został opracowany przy użyciu następujących wymagań. Zalecamy korzystanie z tych wersji lub nowszych.
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Dzięki Qiskit Serverless możesz zarządzać zasobami obliczeniowymi i danymi w swoim wzorcu Qiskit, w tym procesorami CPU, QPU i innymi akceleratorami obliczeniowymi.
Ustawianie szczegółowych statusów
Zadania Serverless przechodzą przez kilka etapów w trakcie przepływu pracy. Domyślnie następujące statusy są dostępne przez job.status():
QUEUED: zadanie oczekuje w kolejce na zasoby klasyczneINITIALIZING: zadanie jest konfigurowaneRUNNING: zadanie jest aktualnie wykonywane na zasobach klasycznychDONE: zadanie zostało pomyślnie ukończone
Możesz również ustawiać niestandardowe statusy, które szczegółowiej opisują konkretny etap przepływu pracy, w następujący sposób.
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
Po pomyślnym zakończeniu tego zadania (z save_result()), status zostanie automatycznie zaktualizowany do DONE.
Równoległe przepływy pracy
W przypadku zadań klasycznych, które można zrównoleglić, użyj dekoratora @distribute_task, aby zdefiniować wymagania obliczeniowe potrzebne do wykonania zadania. Zacznij od przypomnienia sobie przykładu transpile_remote.py z tematu Napisz swój pierwszy program Qiskit Serverless z następującym kodem.
Poniższy kod wymaga, abyś wcześniej zapisał swoje dane uwierzytelniające.
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
W tym przykładzie udekorowałeś funkcję transpile_remote() dekoratorem @distribute_task(target={"cpu": 1}). Po uruchomieniu tworzy to asynchroniczne równoległe zadanie robocze z pojedynczym rdzeniem CPU i zwraca referencję do śledzenia tego zadania. Aby pobrać wynik, przekaż referencję do funkcji get(). Możemy to wykorzystać do uruchamiania wielu równoległych zadań:
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Eksplorowanie różnych konfiguracji zadań
Możesz elastycznie przydzielać zasoby CPU, GPU i pamięć dla swoich zadań za pomocą @distribute_task(). Dla Qiskit Serverless na platformie IBM Quantum® Platform każdy program jest wyposażony w 16 rdzeni CPU i 32 GB RAM, które można dynamicznie przydzielać zgodnie z potrzebami.
Rdzenie CPU można przydzielać jako pełne rdzenie lub nawet ułamkowe alokacje, jak pokazano poniżej.
Pamięć jest przydzielana w liczbie bajtów. Pamiętaj, że w kilobajcie jest 1024 bajty, w megabajcie 1024 kilobajty, a w gigabajcie 1024 megabajty. Aby przydzielić 2 GB pamięci dla swojego zadania roboczego, musisz ustawić "mem": 2 * 1024 * 1024 * 1024.
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Zarządzanie danymi w ramach programu
Qiskit Serverless umożliwia zarządzanie plikami w katalogu /data we wszystkich programach. Wiąże się to z kilkoma ograniczeniami:
- Obsługiwane są obecnie tylko pliki
tarih5 - Jest to wyłącznie płaski storage
/data— nie można tworzyć podkatalogów/data/folder/
Poni żej pokazano, jak przesyłać pliki. Upewnij się, że uwierzytelniłeś się w Qiskit Serverless za pomocą swojego konta IBM Quantum (instrukcje znajdziesz w sekcji Wdrażanie na platformę IBM Quantum).
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
Następnie możesz wyświetlić wszystkie pliki w katalogu data. Te dane są dostępne dla wszystkich programów.
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
Można to zrobić z poziomu programu, używając file_download() do pobrania pliku do środowiska programu i rozpakowania archiwum tar.
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
W tym momencie twój program może pracować z plikami tak samo jak w lokalnym eksperymencie. Funkcje file_upload(), file_download() i file_delete() mogą być wywoływane zarówno z lokalnego eksperymentu, jak i z przesłanego programu, zapewniając spójne i elastyczne zarządzanie danymi.
Następne kroki
- Zobacz pełny przykład, który przenosi istniejący kod do Qiskit Serverless.
- Zapoznaj się z artykułem, w którym naukowcy wykorzystali Qiskit Serverless i kwantowe superkomputery do eksploracji chemii kwantowej.