Source code for geomexp.kernels.kernels

"""Kernel functions and kernelised clustering algorithms.

This module provides kernel function abstractions, a fully implemented kernel K-means algorithm,
and the kernelised geometric expectile clustering algorithm. In the dual representation, centroids
and index vectors are expressed as linear combinations of feature-mapped data points, so all
computations reduce to operations on the Gram matrix :math:`K_{ij} = k(X_i, X_j)`.
"""

from abc import ABC, abstractmethod

import numpy as np
from scipy.special import gamma, kv

from geomexp.clustering.clustering_base import BaseClusterer, ClusterResult
from geomexp.utils.validation import (
    validate_index_radius,
    validate_positive_float,
    validate_positive_int,
)


[docs] class Kernel(ABC): """Abstract base class for kernel functions. A kernel :math:`k : \\mathcal{X} \\times \\mathcal{X} \\to \\mathbb{R}` implicitly defines a feature map :math:`\\varphi` into a reproducing kernel Hilbert space :math:`\\mathcal{H}` such that :math:`k(x, y) = \\langle \\varphi(x), \\varphi(y) \\rangle_{\\mathcal{H}}`. """ @abstractmethod def __call__(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: """Evaluate the kernel on all pairs of rows. Args: X: Array of shape ``(n_x, d)``. Y: Array of shape ``(n_y, d)``. Returns: Gram sub-matrix of shape ``(n_x, n_y)`` with entry :math:`K_{ij} = k(X_i, Y_j)`. """
[docs] def gram_matrix(self, X: np.ndarray) -> np.ndarray: """Compute the full Gram matrix :math:`K_{ij} = k(X_i, X_j)`. Args: X: Data array of shape ``(n, d)``. Returns: Symmetric positive semi-definite matrix of shape ``(n, n)``. """ return self(X, X)
[docs] class LinearKernel(Kernel): """Linear (inner product) kernel: :math:`k(x, y) = \\langle x, y \\rangle`.""" def __call__(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: return np.asarray(X @ Y.T)
[docs] class RBFKernel(Kernel): """Radial basis function (Gaussian) kernel. .. math:: k(x, y) = \\exp\\!\\bigl(-\\gamma \\|x - y\\|^2\\bigr) Attributes: gamma: Bandwidth parameter :math:`\\gamma > 0`. """ def __init__(self, gamma: float = 1) -> None: validate_positive_float(gamma, "gamma") self.gamma = gamma def __call__(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: dist_sq = ( np.sum(X**2, axis=1, keepdims=True) - 2 * X @ Y.T + np.sum(Y**2, axis=1, keepdims=True).T ) np.maximum(dist_sq, 0, out=dist_sq) return np.asarray(np.exp(-self.gamma * dist_sq))
[docs] class PolynomialKernel(Kernel): """Polynomial kernel. .. math:: k(x, y) = (\\langle x, y \\rangle + c)^p Attributes: degree: Polynomial degree :math:`p`. coef0: Additive constant :math:`c`. """ def __init__(self, degree: int = 3, coef0: float = 1) -> None: validate_positive_int(degree, "degree") self.degree = degree self.coef0 = coef0 def __call__(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: return np.asarray((X @ Y.T + self.coef0) ** self.degree)
[docs] class MaternKernel(Kernel): """Matern kernel implementing the Matern covariance function. .. math:: k(x, y) = \\frac{2^{1-\\nu}}{\\Gamma(\\nu)} \\left(\\frac{\\|x - y\\|}{\\ell}\\right)^{\\nu} K_{\\nu}\\!\\left(\\frac{\\|x - y\\|}{\\ell}\\right) where :math:`K_{\\nu}` is the modified Bessel function of the second kind. Special cases: :math:`\\nu = 0.5` gives the exponential kernel, :math:`\\nu = 1.5` and :math:`\\nu = 2.5` have closed-form expressions, and :math:`\\nu \\to \\infty` recovers the RBF (squared-exponential) kernel. Attributes: nu: Smoothness parameter :math:`\\nu > 0`. lengthscale: Lengthscale parameter :math:`\\ell > 0`. """ def __init__(self, nu: float = 1.5, lengthscale: float = 1) -> None: """Initialize Matern kernel. Args: nu: Smoothness parameter. Common values: 0.5, 1.5, 2.5, ``np.inf``. lengthscale: Lengthscale parameter (must be positive). """ validate_positive_float(lengthscale, "lengthscale") self.nu = nu self.lengthscale = lengthscale def __call__(self, X: np.ndarray, Y: np.ndarray) -> np.ndarray: dists = np.sqrt(np.maximum(np.sum((X[:, None, :] - Y[None, :, :]) ** 2, axis=-1), 0)) dists /= self.lengthscale if self.nu == 0.5: return np.asarray(np.exp(-dists)) if self.nu == np.inf: return np.asarray(np.exp(-0.5 * dists**2)) if self.nu == 1.5: sqrt3d = np.sqrt(3) * dists return np.asarray((1 + sqrt3d) * np.exp(-sqrt3d)) if self.nu == 2.5: sqrt5d = np.sqrt(5) * dists return np.asarray((1 + sqrt5d + 5 * dists**2 / 3) * np.exp(-sqrt5d)) K = np.ones_like(dists) nonzero = dists > 0 factor = (2 ** (1 - self.nu)) / gamma(self.nu) K[nonzero] = factor * (dists[nonzero] ** self.nu) * kv(self.nu, dists[nonzero]) return np.asarray(K)
def _kmeanspp_feature_space( gram: np.ndarray, n_clusters: int, rng: np.random.RandomState ) -> np.ndarray: """K-means++ initialisation in feature space using the precomputed Gram matrix. Args: gram: Gram matrix of shape ``(n, n)``. n_clusters: Number of seed indices to select. rng: Random state. Returns: Integer array of shape ``(n_clusters,)`` with selected data indices. """ n = gram.shape[0] diag_K = np.diag(gram) init_indices = np.empty(n_clusters, dtype=int) init_indices[0] = rng.choice(n) for k in range(1, n_clusters): min_dist_sq = np.full(n, np.inf) for j in range(k): d2 = diag_K - 2 * gram[:, init_indices[j]] + diag_K[init_indices[j]] np.maximum(d2, 0, out=d2) np.minimum(min_dist_sq, d2, out=min_dist_sq) total = min_dist_sq.sum() if not np.isfinite(total) or total <= 0: remaining = np.setdiff1d(np.arange(n), init_indices[:k], assume_unique=False) init_indices[k] = rng.choice(remaining) else: probs = min_dist_sq / total init_indices[k] = rng.choice(n, p=probs) return init_indices def _multi_restart_loop( clusterer: BaseClusterer, X: np.ndarray, n_init: int, init_fn: object, iteration_fn: object, objective_fn: object, convergence_fn: object, extract_fn: object, ) -> ClusterResult: """Run a multi-restart loop, returning the result with the lowest objective. This is a shared helper for :class:`KernelKMeans` and :class:`KernelGeometricExpectileClustering`. """ best_result: ClusterResult | None = None base_seed = clusterer.random_state if clusterer.random_state is not None else 0 for trial in range(n_init): clusterer._rng = np.random.RandomState(base_seed + trial) state = init_fn(X) # type: ignore[operator] obj_old = objective_fn(X, state) # type: ignore[operator] converged = False n_iter = 0 for n_iter in range(clusterer.max_iter): # noqa: B007 state = iteration_fn(X, state) # type: ignore[operator] obj_new = objective_fn(X, state) # type: ignore[operator] if abs(obj_old - obj_new) <= clusterer.tol or convergence_fn(state): # type: ignore[operator] converged = True break obj_old = obj_new result = extract_fn(state, obj_new, n_iter + 1, converged) # type: ignore[operator] if best_result is None or result.objective < best_result.objective: best_result = result assert best_result is not None return best_result
[docs] class KernelKMeans(BaseClusterer): """Kernel K-means clustering (dual representation). Minimises the within-cluster sum of squared distances in the feature space :math:`\\mathcal{H}` induced by a kernel :math:`k`: .. math:: J = \\sum_{k=1}^K \\sum_{i \\in C_k} \\|\\varphi(X_i) - c_k\\|_{\\mathcal{H}}^2 where :math:`c_k = \\frac{1}{|C_k|} \\sum_{j \\in C_k} \\varphi(X_j)` is the cluster centroid in feature space. Since :math:`\\varphi` may not have an explicit form, all computations are expressed through the Gram matrix :math:`K_{ij} = k(X_i, X_j)`: .. math:: \\|\\varphi(X_i) - c_k\\|^2 = K_{ii} - \\frac{2}{|C_k|} \\sum_{j \\in C_k} K_{ij} + \\frac{1}{|C_k|^2} \\sum_{j,l \\in C_k} K_{jl} With a linear kernel this recovers standard K-means. Attributes: n_clusters: Number of clusters. kernel: Kernel function. n_init: Number of random restarts (best result is kept). max_iter: Maximum number of iterations. tol: Convergence tolerance. random_state: Random seed. Example: >>> import numpy as np >>> from geomexp.kernels import KernelKMeans, RBFKernel >>> X = np.random.randn(100, 2) >>> model = KernelKMeans(n_clusters=3, kernel=RBFKernel(gamma=0.5)) >>> result = model.fit(X) >>> print(result.assignments.shape) (100,) """ def __init__( self, n_clusters: int, kernel: Kernel, n_init: int = 10, max_iter: int = 100, tol: float = 1e-7, random_state: int | None = None, ) -> None: """Initialize kernel K-means clusterer. Args: n_clusters: Number of clusters. kernel: Kernel function instance. n_init: Number of independent restarts. The run with the lowest objective is returned. max_iter: Maximum number of iterations per restart. tol: Convergence tolerance. random_state: Random seed. """ super().__init__( n_clusters=n_clusters, max_iter=max_iter, tol=tol, random_state=random_state ) self.kernel = kernel self.n_init = n_init
[docs] def fit(self, X: np.ndarray) -> ClusterResult: """Fit kernel K-means with multiple random restarts. The Gram matrix is computed once and reused across all ``n_init`` restarts. Args: X: Data array of shape ``(n_samples, n_features)``. Returns: ClusterResult from the best restart. """ X = self._validate_input(X) self._gram = self.kernel.gram_matrix(X) return _multi_restart_loop( self, X, self.n_init, self._initialize, self._fit_iteration, self._compute_objective, self._additional_convergence_check, self._extract_result, )
def _initialize(self, X: np.ndarray) -> dict[str, object]: """Initialize weight vectors via k-means++ in feature space.""" n = len(X) init_indices = _kmeanspp_feature_space(self._gram, self.n_clusters, self._rng) center_weights = np.zeros((self.n_clusters, n)) for k, idx in enumerate(init_indices): center_weights[k, idx] = 1 return { "center_weights": center_weights, "assignments": self._assign_kernel(self._gram, center_weights), "prev_assignments": None, } def _assign_kernel(self, gram: np.ndarray, center_weights: np.ndarray) -> np.ndarray: """Assign points using squared distance in feature space. Computes :math:`\\|\\varphi(X_i) - c_k\\|^2 = K_{ii} - 2(Kw_k)_i + w_k^\\top K w_k` for each point and cluster, then assigns to the nearest. """ diag_K = np.diag(gram) costs = np.empty((gram.shape[0], self.n_clusters)) for k in range(self.n_clusters): Kw = gram @ center_weights[k] costs[:, k] = diag_K - 2 * Kw + float(center_weights[k] @ Kw) return np.asarray(np.argmin(costs, axis=1)) def _fit_iteration(self, X: np.ndarray, state: dict[str, object]) -> dict[str, object]: """One iteration: assign, handle empties, update weights, re-assign.""" assignments = state["assignments"] center_weights = state["center_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) state["prev_assignments"] = assignments.copy() state["assignments"] = self._assign_kernel(self._gram, center_weights) assignments = state["assignments"] assert isinstance(assignments, np.ndarray) n = self._gram.shape[0] if any(np.sum(assignments == k) == 0 for k in range(self.n_clusters)): cw = center_weights.copy() for k in range(self.n_clusters): if np.sum(assignments == k) == 0: cw[k] = 0 cw[k, self._rng.choice(n)] = 1 state["center_weights"] = cw state["assignments"] = self._assign_kernel(self._gram, cw) assignments = state["assignments"] assert isinstance(assignments, np.ndarray) new_cw = np.zeros((self.n_clusters, n)) for k in range(self.n_clusters): mask = assignments == k n_k = int(np.sum(mask)) if n_k > 0: new_cw[k, mask] = 1 / n_k state["center_weights"] = new_cw state["assignments"] = self._assign_kernel(self._gram, new_cw) return state def _additional_convergence_check(self, state: dict[str, object]) -> bool: """Check partition stability: stop if assignments unchanged.""" prev = state.get("prev_assignments") curr = state.get("assignments") if isinstance(prev, np.ndarray) and isinstance(curr, np.ndarray): return bool(np.array_equal(prev, curr)) return False def _compute_objective(self, X: np.ndarray, state: dict[str, object]) -> float: """Compute within-cluster sum of squared feature-space distances. .. math:: J = \\frac{1}{n} \\sum_{k=1}^K \\sum_{i \\in C_k} \\bigl(K_{ii} - 2(Kw_k)_i + w_k^\\top K w_k\\bigr) """ assignments = state["assignments"] center_weights = state["center_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) diag_K = np.diag(self._gram) objective: float = 0 for k in range(self.n_clusters): mask = assignments == k if not np.any(mask): continue Kw = self._gram @ center_weights[k] objective += float(np.sum(diag_K[mask] - 2 * Kw[mask] + center_weights[k] @ Kw)) return objective / len(X) def _extract_result( self, state: dict[str, object], objective: float, n_iterations: int, converged: bool ) -> ClusterResult: assignments = state["assignments"] center_weights = state["center_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) return ClusterResult( assignments=assignments, centers=center_weights, objective=objective, n_iterations=n_iterations, converged=converged, metadata={"center_weights": center_weights}, )
[docs] class KernelGeometricExpectileClustering(BaseClusterer): """Kernelised geometric expectile clustering (Algorithm 2). In the feature space :math:`\\mathcal{H}` induced by a kernel :math:`k`, centroids and index vectors lie in the span of the data (Proposition 3.1): .. math:: c_k = \\sum_{j=1}^n \\beta_{kj}\\,\\varphi(X_j), \\quad u_k = \\sum_{j=1}^n \\alpha_{kj}\\,\\varphi(X_j). All norms and inner products reduce to operations on the Gram matrix :math:`K_{ij} = k(X_i, X_j)`: - Squared residual norm: :math:`d_{ik}^2 = K_{ii} - 2(K\\beta_k)_i + \\beta_k^\\top K \\beta_k` - Index-residual inner product: :math:`p_{ik} = (K\\alpha_k)_i - \\alpha_k^\\top K \\beta_k` Setting :math:`r = 0` recovers kernel K-means. Attributes: n_clusters: Number of clusters. kernel: Kernel function. index_radius: Radius constraint :math:`r \\in [0, 1)`. n_init: Number of random restarts (best result is kept). max_iter: Maximum number of outer iterations. tol: Convergence tolerance on objective change. center_lr: Learning rate for gradient descent on centroid weights. center_steps: Number of gradient steps per centroid update. random_state: Random seed. Example: >>> import numpy as np >>> from geomexp.kernels import KernelGeometricExpectileClustering, RBFKernel >>> X = np.random.randn(100, 2) >>> model = KernelGeometricExpectileClustering( ... n_clusters=3, kernel=RBFKernel(gamma=0.5), index_radius=0.5 ... ) >>> result = model.fit(X) >>> print(result.metadata["index_weights"].shape) (3, 100) """ def __init__( self, n_clusters: int, kernel: Kernel, index_radius: float = 0.5, n_init: int = 10, max_iter: int = 100, tol: float = 1e-7, center_lr: float = 0.1, center_steps: int = 50, random_state: int | None = None, ) -> None: """Initialize kernelised geometric expectile clusterer. Args: n_clusters: Number of clusters. kernel: Kernel function instance. index_radius: Radius constraint :math:`r \\in [0, 1)` on index vectors. n_init: Number of independent restarts. max_iter: Maximum number of iterations per restart. tol: Convergence tolerance on objective change. center_lr: Learning rate for gradient descent on centroid weight vectors. center_steps: Number of gradient steps per centroid update. random_state: Random seed for reproducibility. Raises: ValueError: If parameters are invalid. """ validate_index_radius(index_radius) validate_positive_float(center_lr, "center_lr") validate_positive_int(center_steps, "center_steps") validate_positive_int(n_init, "n_init") super().__init__( n_clusters=n_clusters, max_iter=max_iter, tol=tol, random_state=random_state ) self.kernel = kernel self.index_radius = index_radius self.n_init = n_init self.center_lr = center_lr self.center_steps = center_steps
[docs] def fit(self, X: np.ndarray) -> ClusterResult: """Fit kernelised geometric expectile clustering with multiple random restarts. The Gram matrix is computed once and reused across all ``n_init`` restarts. Args: X: Data array of shape ``(n_samples, n_features)``. Returns: ClusterResult from the best restart. """ X = self._validate_input(X) self._gram = self.kernel.gram_matrix(X) self._diag_K = np.diag(self._gram) return _multi_restart_loop( self, X, self.n_init, self._initialize, self._fit_iteration, self._compute_objective, self._additional_convergence_check, self._extract_result, )
def _initialize(self, X: np.ndarray) -> dict[str, object]: """Initialize centroid weights via k-means++ in feature space, index weights to zero.""" n = len(X) init_indices = _kmeanspp_feature_space(self._gram, self.n_clusters, self._rng) center_weights = np.zeros((self.n_clusters, n)) for k, idx in enumerate(init_indices): center_weights[k, idx] = 1 return { "center_weights": center_weights, "index_weights": np.zeros((self.n_clusters, n)), "assignments": self._assign_by_distance(center_weights), "prev_assignments": None, } def _residual_quantities( self, center_weights_k: np.ndarray, index_weights_k: np.ndarray ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute residual squared norms, norms, and index inner products for one cluster. Args: center_weights_k: Centroid weight vector :math:`\\beta_k` of shape ``(n,)``. index_weights_k: Index weight vector :math:`\\alpha_k` of shape ``(n,)``. Returns: Tuple ``(d_sq, d, p)`` each of shape ``(n,)`` where :math:`d_{ik}^2` is the squared residual norm, :math:`d_{ik}` the norm, and :math:`p_{ik}` the index-residual inner product. """ Kb = self._gram @ center_weights_k bKb = float(center_weights_k @ Kb) d_sq = self._diag_K - 2 * Kb + bKb np.maximum(d_sq, 0, out=d_sq) Ka = self._gram @ index_weights_k return d_sq, np.sqrt(d_sq), Ka - float(index_weights_k @ Kb) def _assign_by_distance(self, center_weights: np.ndarray) -> np.ndarray: """Assign points by squared feature-space distance (used for initialisation).""" costs = np.empty((self._gram.shape[0], self.n_clusters)) for k in range(self.n_clusters): Kb = self._gram @ center_weights[k] costs[:, k] = self._diag_K - 2 * Kb + float(center_weights[k] @ Kb) return np.asarray(np.argmin(costs, axis=1)) def _assign_expectile( self, center_weights: np.ndarray, index_weights: np.ndarray ) -> np.ndarray: """Assign each point to the cluster minimising the expectile loss in feature space.""" costs = np.empty((self._gram.shape[0], self.n_clusters)) for k in range(self.n_clusters): d_sq, d, p = self._residual_quantities(center_weights[k], index_weights[k]) costs[:, k] = 0.5 * (d_sq + d * p) return np.asarray(np.argmin(costs, axis=1)) def _fit_iteration(self, X: np.ndarray, state: dict[str, object]) -> dict[str, object]: """One outer iteration: assign, handle empties, update centers, update indices.""" assignments = state["assignments"] assert isinstance(assignments, np.ndarray) state["prev_assignments"] = assignments.copy() center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) state["assignments"] = self._assign_expectile(center_weights, index_weights) state = self._handle_empty_clusters(state) state = self._update_centers(state) state = self._update_indices(state) center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) state["assignments"] = self._assign_expectile(center_weights, index_weights) return state def _handle_empty_clusters(self, state: dict[str, object]) -> dict[str, object]: """Reinitialise empty clusters to random data points with zero index weights.""" assignments = state["assignments"] center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) if not any(np.sum(assignments == k) == 0 for k in range(self.n_clusters)): return state cw, iw = center_weights.copy(), index_weights.copy() for k in range(self.n_clusters): if np.sum(assignments == k) == 0: cw[k] = 0 cw[k, self._rng.choice(self._gram.shape[0])] = 1 iw[k] = 0 state["center_weights"] = cw state["index_weights"] = iw state["assignments"] = self._assign_expectile(cw, iw) return state def _update_centers(self, state: dict[str, object]) -> dict[str, object]: """Update centroid weight vectors via gradient descent on the cluster-wise risk. For each cluster, takes ``center_steps`` gradient steps on :math:`\\beta_k` using the Hilbert gradient. With a single cluster member, the centroid is set directly. """ assignments = state["assignments"] center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) new_cw = center_weights.copy() for k in range(self.n_clusters): mask = assignments == k n_k = int(np.sum(mask)) if n_k <= 1: if n_k == 1: new_cw[k] = 0 new_cw[k, mask] = 1 continue beta = new_cw[k].copy() for _ in range(self.center_steps): beta -= self.center_lr * self._center_gradient_weights( beta, index_weights[k], mask, n_k ) new_cw[k] = beta state["center_weights"] = new_cw return state def _center_gradient_weights( self, beta: np.ndarray, alpha: np.ndarray, mask: np.ndarray, n_k: int ) -> np.ndarray: """Compute the Hilbert gradient weight vector for centroid update. The Hilbert gradient of :math:`\\ell_{u_k}(\\varphi(X_i) - c_k)` w.r.t. :math:`c_k` has weight vector :math:`g_i = (\\beta - e_i)(1 + p_i / 2d_i) - d_i \\alpha / 2`. This method returns the mean over cluster members. """ Kb = self._gram @ beta d_sq = self._diag_K - 2 * Kb + float(beta @ Kb) np.maximum(d_sq, 0, out=d_sq) d = np.sqrt(d_sq) Ka = self._gram @ alpha p = Ka - float(alpha @ Kb) active = mask & (d > 1e-12) if not np.any(active): return np.zeros_like(beta) w = np.zeros(len(d)) w[active] = 1 + 0.5 * p[active] / d[active] return (float(np.sum(w)) * beta - w - 0.5 * float(np.sum(d[active])) * alpha) / n_k def _update_indices(self, state: dict[str, object]) -> dict[str, object]: """Update index weight vectors via best-response (equations 3.2--3.3). For each cluster, computes :math:`\\delta_k` and normalises to obtain :math:`\\alpha_k^* = -r \\, \\delta_k / \\sqrt{\\delta_k^\\top K \\delta_k}`. """ assignments = state["assignments"] center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) new_iw = index_weights.copy() if self.index_radius == 0: state["index_weights"] = new_iw return state for k in range(self.n_clusters): mask = assignments == k if not np.any(mask): continue _, d, _ = self._residual_quantities(center_weights[k], index_weights[k]) delta = np.zeros(self._gram.shape[0]) delta[mask] = d[mask] delta -= float(np.sum(d[mask])) * center_weights[k] norm_delta = np.sqrt(max(float(delta @ (self._gram @ delta)), 0)) if norm_delta > 1e-12: new_iw[k] = -self.index_radius * delta / norm_delta state["index_weights"] = new_iw return state def _compute_objective(self, X: np.ndarray, state: dict[str, object]) -> float: """Compute empirical risk in feature space. .. math:: \\hat{\\mathcal{R}} = \\frac{1}{n} \\sum_{k=1}^K \\sum_{i \\in C_k} \\tfrac{1}{2}\\bigl(d_{ik}^2 + d_{ik}\\,p_{ik}\\bigr) """ assignments = state["assignments"] center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) objective: float = 0 for k in range(self.n_clusters): mask = assignments == k if not np.any(mask): continue d_sq, d, p = self._residual_quantities(center_weights[k], index_weights[k]) objective += float(np.sum(0.5 * (d_sq[mask] + d[mask] * p[mask]))) return objective / len(X) def _additional_convergence_check(self, state: dict[str, object]) -> bool: """Check partition stability: stop if assignments unchanged.""" prev = state.get("prev_assignments") curr = state.get("assignments") if isinstance(prev, np.ndarray) and isinstance(curr, np.ndarray): return bool(np.array_equal(prev, curr)) return False def _extract_result( self, state: dict[str, object], objective: float, n_iterations: int, converged: bool ) -> ClusterResult: assignments = state["assignments"] center_weights = state["center_weights"] index_weights = state["index_weights"] assert isinstance(assignments, np.ndarray) assert isinstance(center_weights, np.ndarray) assert isinstance(index_weights, np.ndarray) return ClusterResult( assignments=assignments, centers=center_weights, objective=objective, n_iterations=n_iterations, converged=converged, metadata={"center_weights": center_weights, "index_weights": index_weights}, )