Skip to content

Commit

Permalink
merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
doichanj committed Aug 9, 2024
2 parents 78ce8b0 + ad7b86f commit 1082ac9
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 130 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ jobs:
- name: Run Tests
run: |
set -e
pip check
rm -rf qiskit_aer
stestr run --slowest
shell: bash
Expand Down
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,3 @@ docs/_build/
docs/stubs/
docs/api/

# Ignore DASK temporary files
dask-worker-space/*
60 changes: 3 additions & 57 deletions docs/howtos/parallel.rst
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
.. _dask:
.. _threadpool:

Running with Threadpool and DASK
Running with Threadpool
================================

Qiskit Aer runs simulation jobs on a single-worker Python multiprocessing ThreadPool executor
so that all parallelization is handled by low-level OpenMP and CUDA code.
However to customize job-level parallel execution of multiple circuits a user can specify
a custom multiprocessing executor and control the splitting of circuits using
the ``executor`` and ``max_job_size`` backend options.
For large scale job parallelization on HPC clusters Qiskit Aer executors support
the distributed Clients from the `DASK <http://dask.org>`__.

Installation of DASK packages with Aer
---------------------------------------

If you want to install dask client at the same time as Qiskit Aer,
please add the ``dask`` extra as follows.
This option installs Aer, dask, and distributed packages.

.. code-block:: sh
pip install .[dask]

Usage of executor
-----------------

To use Threadpool or DASK as an executor, you need to set
To use Threadpool as an executor, you need to set
``executor`` and ``max_job_size`` by ``set_options`` function.
If both ``executor`` (default None) and ``max_job_size`` (default None) are set,
Aer splits the multiple circuits to some chunk of circuits and submits them to the executor.
Expand Down Expand Up @@ -69,44 +56,3 @@ Example: Threadpool execution
qbackend.set_options(max_job_size=1)
result = qbackend.run(circ_list).result()
Example: Dask execution
'''''''''''''''''''''''

The Dask client uses ``multiprocessing`` so you need to
guard it by an ``if __name__ == "__main__":`` block.

.. code-block:: python
import qiskit
from qiskit_aer import AerSimulator
from dask.distributed import LocalCluster, Client
from math import pi
def q_exec():
# Generate circuits
circ = qiskit.QuantumCircuit(15, 15)
circ.h(0)
circ.cx(0, 1)
circ.cx(1, 2)
circ.p(pi/2, 2)
circ.measure([0, 1, 2], [0, 1 ,2])
circ2 = qiskit.QuantumCircuit(15, 15)
circ2.h(0)
circ2.cx(0, 1)
circ2.cx(1, 2)
circ2.p(pi/2, 2)
circ2.measure([0, 1, 2], [0, 1 ,2])
circ_list = [circ, circ2]
exc = Client(address=LocalCluster(n_workers=2, processes=True))
# Set executor and max_job_size
qbackend = AerSimulator()
qbackend.set_options(executor=exc)
qbackend.set_options(max_job_size=1)
result = qbackend.run(circ_list).result()
if __name__ == '__main__':
q_exec()
6 changes: 4 additions & 2 deletions qiskit_aer/backends/aer_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ def _inline_initialize(self, circ, optype):
if isinstance(optype, set) and Initialize not in optype:
return circ

for inst, _, _ in circ.data:
for datum in circ.data:
inst = datum.operation
if isinstance(inst, Initialize) and (
(not isinstance(inst.params[0], complex)) or (len(inst.params) == 1)
):
Expand All @@ -131,7 +132,8 @@ def _inline_initialize(self, circ, optype):

new_circ = circ.copy()
new_circ.data = []
for inst, qargs, cargs in circ.data:
for datum in circ.data:
inst, qargs, cargs = datum.operation, datum.qubits, datum.clbits
if isinstance(inst, Initialize) and (
(not isinstance(inst.params[0], complex)) or (len(inst.params) == 1)
):
Expand Down
3 changes: 2 additions & 1 deletion qiskit_aer/backends/aerbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ def _assemble_noise_model(self, circuits, optypes, **run_options):
):
updated_circ = False
new_data = []
for inst, qargs, cargs in circ.data:
for datum in circ.data:
inst, qargs, cargs = datum.operation, datum.qubits, datum.clbits
if isinstance(inst, QuantumChannelInstruction):
updated_circ = True
if not updated_noise:
Expand Down
2 changes: 1 addition & 1 deletion qiskit_aer/jobs/aerjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
circuits(list of QuantumCircuit): circuits to execute.
parameter_binds(list): parameters for circuits.
run_options(dict): run_options to execute.
executor(ThreadPoolExecutor or dask.distributed.client):
executor(ThreadPoolExecutor):
The executor to be used to submit the job.
Raises:
Expand Down
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
else True
)

extras_requirements = {"dask": ["dask", "distributed"]}

requirements = [
"qiskit>=1.1.0",
"numpy>=1.16.3",
Expand Down Expand Up @@ -110,7 +108,6 @@
install_requires=requirements,
include_package_data=False,
package_data={"qiskit_aer": ["VERSION.txt"], "qiskit_aer.library": ["*.csv"]},
extras_require=extras_requirements,
cmake_args=cmake_args,
keywords="qiskit, simulator, quantum computing, backend",
zip_safe=False,
Expand Down
63 changes: 0 additions & 63 deletions test/terra/backends/aer_simulator/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@
from qiskit_aer.jobs import AerJob
from test.terra.backends.simulator_test_case import SimulatorTestCase, supported_methods


DASK = False

try:
from dask.distributed import LocalCluster, Client

DASK = True
except ImportError:
DASK = False


def run_random_circuits(backend, shots=None, **run_options):
"""Test random circuits on different executor fictures"""
job_size = 10
Expand Down Expand Up @@ -107,58 +96,6 @@ def backend(self, **options):
return super().backend(executor=self._test_executor, **options)


@ddt
class TestDaskExecutor(CBFixture):
"""Tests of Dask executor"""

@classmethod
def setUpClass(cls):
super().setUpClass()
if DASK:
cls._test_executor = Client(address=LocalCluster(n_workers=1, processes=True))

def setUp(self):
super().setUp()
if not DASK:
self.skipTest("Dask not installed, skipping ClusterBackend-dask tests")

@supported_methods(["statevector"], [None, 1, 2, 3])
def test_random_circuits_job(self, method, device, max_job_size):
"""Test random circuits with custom executor."""
shots = 4000
backend = self.backend(method=method, device=device, max_job_size=max_job_size)
result, circuits, targets = run_random_circuits(backend, shots=shots)
self.assertSuccess(result)
self.compare_counts(result, circuits, targets, hex_counts=False, delta=0.05 * shots)

@supported_methods(["statevector"], [None, 1, 1, 1], [None, 100, 500, 1000])
def test_noise_circuits_job(self, method, device, max_job_size, max_shot_size):
"""Test random circuits with custom executor."""
shots = 4000
backend = self.backend(
method=method, device=device, max_job_size=max_job_size, max_shot_size=max_shot_size
)

circuits = ref_kraus_noise.kraus_gate_error_circuits()
noise_models = ref_kraus_noise.kraus_gate_error_noise_models()
targets = ref_kraus_noise.kraus_gate_error_counts(shots)

for circuit, noise_model, target in zip(circuits, noise_models, targets):
backend.set_options(noise_model=noise_model)
result = backend.run(circuit, shots=shots).result()
self.assertSuccess(result)
self.compare_counts(result, [circuit], [target], delta=0.05 * shots)

@supported_methods(["statevector"], [None, 1, 2, 3])
def test_result_time_val(self, method, device, max_job_size):
"""Test random circuits with custom executor."""
shots = 4000
backend = self.backend(method=method, device=device, max_job_size=max_job_size)
result, _, _ = run_random_circuits(backend, shots=shots)
self.assertSuccess(result)
self.assertGreaterEqual(result.time_taken, 0)


@ddt
class TestThreadPoolExecutor(CBFixture):
"""Tests of ThreadPool executor"""
Expand Down

0 comments on commit 1082ac9

Please sign in to comment.