Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions qiskit_machine_learning/optimizers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
"""

from .adam_amsgrad import ADAM
from .nadam import NAdam
from .aqgd import AQGD
from .cg import CG
from .cobyla import COBYLA
Expand Down Expand Up @@ -157,6 +158,7 @@
"OptimizerResult",
"Minimizer",
"ADAM",
"NAdam",
"AQGD",
"CG",
"COBYLA",
Expand Down
148 changes: 148 additions & 0 deletions qiskit_machine_learning/optimizers/nadam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2019, 2025.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""NAdam Optimizer"""

from __future__ import annotations
import os
import numpy as np
from typing import Callable
from .optimizer import Optimizer, OptimizerResult, POINT, OptimizerSupportLevel


class NAdam(Optimizer):
"""NAdam optimizer (Nesterov-accelerated Adaptive Moment Estimation)."""

def __init__(
self,
maxiter: int = 200,
tol: float = 1e-6,
lr: float = 0.001,
beta_1: float = 0.9,
beta_2: float = 0.999,
eps: float = 1e-8,
noise_factor: float = 1e-8,
callback: Callable | None = None,
snapshot_dir: str | None = None,
) -> None:
"""Initialize NAdam optimizer."""
super().__init__()
self.maxiter = maxiter
self.tol = tol
self.lr = lr
self.beta_1 = beta_1
self.beta_2 = beta_2
self.eps = eps
self.noise_factor = noise_factor
self.callback = callback
self.snapshot_dir = snapshot_dir

# Internal state
self._m = None
self._v = None
self._t = 0

def get_support_level(self):
"""Return the support level for NAdam optimizer."""
return {
"gradient": OptimizerSupportLevel.ignored,
"bounds": OptimizerSupportLevel.ignored,
"initial_point": OptimizerSupportLevel.required,
}

@property
def settings(self):
"""Return optimizer settings as a dictionary."""
return {
"maxiter": self.maxiter,
"tol": self.tol,
"lr": self.lr,
"beta_1": self.beta_1,
"beta_2": self.beta_2,
"eps": self.eps,
"noise_factor": self.noise_factor,
"callback": self.callback,
"snapshot_dir": self.snapshot_dir,
}

def minimize(
self,
fun: Callable[[POINT], float],
x0: POINT,
jac: Callable[[POINT], POINT] | None = None,
bounds: list[tuple[float, float]] | None = None,
) -> OptimizerResult:
"""Minimize the scalar function using NAdam."""

result = OptimizerResult()
x = np.array(x0, dtype=float)
self._m = np.zeros_like(x)
self._v = np.zeros_like(x)
self._t = 0

for i in range(self.maxiter):
self._t += 1

# Compute gradient numerically
grad = self.gradient_num_diff(x, fun, self.eps)

# Add optional stochastic noise
grad += self.noise_factor * np.random.randn(*grad.shape)

# NAdam update rule
m_hat = self.beta_1 * self._m + (1 - self.beta_1) * grad
v_hat = self.beta_2 * self._v + (1 - self.beta_2) * (grad ** 2)

m_corr = m_hat / (1 - self.beta_1 ** self._t)
v_corr = v_hat / (1 - self.beta_2 ** self._t)

# Nesterov momentum
x_update = self.lr * (self.beta_1 * m_corr + (1 - self.beta_1) * grad / (1 - self.beta_1 ** self._t)) / (np.sqrt(v_corr) + self.eps)
x -= x_update

# Update state
self._m = m_hat
self._v = v_hat

fval = fun(x)

# Callback
if self.callback is not None:
self.callback(self._t, x, fval)

# Save snapshot
if self.snapshot_dir is not None:
np.save(os.path.join(self.snapshot_dir, f"nadam_m_{i}.npy"), self._m)
np.save(os.path.join(self.snapshot_dir, f"nadam_v_{i}.npy"), self._v)
np.save(os.path.join(self.snapshot_dir, f"nadam_x_{i}.npy"), x)

# Check convergence
if np.linalg.norm(x_update) < self.tol:
break

result.x = x
result.fun = fun(x)
result.nfev = self._t
result.nit = self._t

return result

def load_params(self, snapshot_dir: str):
"""Load optimizer state from snapshot files."""
last_iter = max([int(f.split("_")[-1].split(".")[0])
for f in os.listdir(snapshot_dir) if f.startswith("nadam_x_")], default=-1)
if last_iter >= 0:
self._m = np.load(os.path.join(snapshot_dir, f"nadam_m_{last_iter}.npy"))
self._v = np.load(os.path.join(snapshot_dir, f"nadam_v_{last_iter}.npy"))
# x can also be restored if needed
# x = np.load(os.path.join(snapshot_dir, f"nadam_x_{last_iter}.npy"))
self._t = last_iter + 1
96 changes: 96 additions & 0 deletions test/optimizers/test_nadam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2019, 2025.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Test of NAdam optimizer"""

import unittest
import tempfile
import numpy as np
from ddt import ddt

# Import the test case base class
from test.algorithms_test_case import QiskitAlgorithmsTestCase
from qiskit_machine_learning.optimizers import NAdam
from qiskit_machine_learning.utils import algorithm_globals


@ddt
class TestOptimizerNAdam(QiskitAlgorithmsTestCase):
"""Test NAdam optimizer"""

def setUp(self):
super().setUp()
algorithm_globals.random_seed = 42
self.quadratic_objective = lambda x: x[0] ** 2 + x[1] ** 2
self.initial_point = np.array([1.0, 1.0])

def test_optimizer_minimize(self):
"""Test NAdam optimizer minimize method"""
nadam = NAdam(maxiter=200, tol=1e-6, lr=1e-1)
result = nadam.minimize(self.quadratic_objective, self.initial_point)
self.assertAlmostEqual(result.fun, 0.0, places=6)
self.assertTrue(np.allclose(result.x, np.zeros_like(self.initial_point), atol=1e-2))

def test_optimizer_with_noise(self):
"""Test NAdam optimizer with noise factor"""
nadam = NAdam(maxiter=150, tol=1e-6, lr=1e-1, noise_factor=1e-2)
result = nadam.minimize(self.quadratic_objective, self.initial_point)
self.assertAlmostEqual(result.fun, 0.0, places=4)
self.assertTrue(np.allclose(result.x, np.zeros_like(self.initial_point), atol=1e-2))

def test_save_load_params(self):
"""Test save and load optimizer parameters"""
with tempfile.TemporaryDirectory() as tmpdir:
nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1, snapshot_dir=tmpdir)
nadam.minimize(self.quadratic_objective, self.initial_point)
new_nadam = NAdam(snapshot_dir=tmpdir)
new_nadam.load_params(tmpdir)

self.assertTrue(np.allclose(nadam._m, new_nadam._m))
self.assertTrue(np.allclose(nadam._v, new_nadam._v))
self.assertEqual(nadam._t, new_nadam._t)

def test_settings(self):
"""Test settings property"""
nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1)
settings = nadam.settings
self.assertEqual(settings["maxiter"], 100)
self.assertEqual(settings["tol"], 1e-6)
self.assertEqual(settings["lr"], 1e-1)
self.assertEqual(settings["beta_1"], 0.9)
self.assertEqual(settings["beta_2"], 0.999)
self.assertEqual(settings["eps"], 1e-8)
self.assertEqual(settings["noise_factor"], 1e-8)
# NAdam does not have amsgrad, so just check key safely
self.assertIsNone(settings.get("amsgrad"))
self.assertEqual(settings["snapshot_dir"], None)

def test_callback(self):
"""Test using the callback."""
history = {"ite": [], "weights": [], "fvals": []}

def callback(n_t, weight, fval):
history["ite"].append(n_t)
history["weights"].append(weight)
history["fvals"].append(fval)

nadam = NAdam(maxiter=100, tol=1e-6, lr=1e-1, callback=callback)
nadam.minimize(self.quadratic_objective, self.initial_point)

expected_types = [int, np.ndarray, float]
for i, (key, values) in enumerate(history.items()):
self.assertTrue(all(isinstance(value, expected_types[i]) for value in values))
self.assertEqual(len(history[key]), 100)


if __name__ == "__main__":
unittest.main()
Loading