|
13 | 13 | import numpy as np |
14 | 14 | import pytest |
15 | 15 | from scipy.linalg import eigh |
| 16 | +from scipy.spatial.distance import cdist |
| 17 | +from sklearn.neighbors import NearestNeighbors |
16 | 18 |
|
17 | 19 | logging.basicConfig(level=logging.INFO, format="%(message)s") |
18 | 20 |
|
@@ -161,6 +163,200 @@ def linear_discriminant_analysis( |
161 | 163 | raise AssertionError |
162 | 164 |
|
163 | 165 |
|
| 166 | +def locally_linear_embedding( |
| 167 | + features: np.ndarray, dimensions: int, n_neighbors: int = 12, reg: float = 1e-3 |
| 168 | +) -> np.ndarray: |
| 169 | + """ |
| 170 | + Locally Linear Embedding (LLE). |
| 171 | +
|
| 172 | + For more details, see: https://en.wikipedia.org/wiki/Nonlinear_dimensionality_reduction#Locally_linear_embedding |
| 173 | + Parameters: |
| 174 | + * features: the features extracted from the dataset (shape: [n_features, n_samples]) |
| 175 | + * dimensions: target dimension for embedding |
| 176 | + * n_neighbors: number of neighbors to consider for each point |
| 177 | + * reg: regularization constant |
| 178 | +
|
| 179 | + >>> test_locally_linear_embedding() |
| 180 | + """ |
| 181 | + if not features.any(): |
| 182 | + logging.error("Dataset empty") |
| 183 | + raise AssertionError |
| 184 | + |
| 185 | + # Transpose to have shape [n_samples, n_features] for easier processing |
| 186 | + X = features.T.astype(np.float64) # Ensure float64 to avoid dtype issues |
| 187 | + n_samples, n_features = X.shape |
| 188 | + |
| 189 | + # Find k-nearest neighbors |
| 190 | + knn = NearestNeighbors(n_neighbors=n_neighbors + 1) |
| 191 | + knn.fit(X) |
| 192 | + distances, indices = knn.kneighbors(X) |
| 193 | + |
| 194 | + # Remove the first index (point itself) |
| 195 | + indices = indices[:, 1:] |
| 196 | + |
| 197 | + # Create weight matrix W |
| 198 | + W = np.zeros((n_samples, n_samples)) |
| 199 | + |
| 200 | + for i in range(n_samples): |
| 201 | + # Get neighbors (excluding the point itself) |
| 202 | + neighbors = indices[i] |
| 203 | + # Center the neighbors |
| 204 | + Z = X[neighbors] - X[i] |
| 205 | + # Local covariance matrix - ensure float64 |
| 206 | + C = np.dot(Z, Z.T).astype(np.float64) |
| 207 | + |
| 208 | + # Regularization |
| 209 | + trace = np.trace(C) |
| 210 | + if trace > 0: |
| 211 | + reg_value = reg * trace |
| 212 | + else: |
| 213 | + reg_value = reg |
| 214 | + |
| 215 | + # Ensure we're working with floats for the diagonal update |
| 216 | + C = C.astype(np.float64) |
| 217 | + np.fill_diagonal(C, C.diagonal() + reg_value) |
| 218 | + |
| 219 | + # Solve for weights |
| 220 | + try: |
| 221 | + w = np.linalg.solve(C, np.ones(n_neighbors)) |
| 222 | + except np.linalg.LinAlgError: |
| 223 | + # If singular, use pseudoinverse |
| 224 | + w = np.linalg.pinv(C).dot(np.ones(n_neighbors)) |
| 225 | + |
| 226 | + # Normalize weights |
| 227 | + w /= np.sum(w) |
| 228 | + W[i, neighbors] = w |
| 229 | + |
| 230 | + # Create cost matrix M = (I - W)^T (I - W) |
| 231 | + I = np.eye(n_samples) |
| 232 | + M = (I - W).T.dot(I - W) |
| 233 | + |
| 234 | + # Compute eigenvectors - use all and then select |
| 235 | + eigenvalues, eigenvectors = eigh(M) |
| 236 | + |
| 237 | + # Sort eigenvalues and take the ones after the first (skip the zero eigenvalue) |
| 238 | + idx = np.argsort(eigenvalues)[1:dimensions+1] # Skip first (zero) eigenvalue |
| 239 | + embedding = eigenvectors[:, idx].T |
| 240 | + |
| 241 | + logging.info("Locally Linear Embedding computed") |
| 242 | + return embedding |
| 243 | + |
| 244 | + |
| 245 | +def multidimensional_scaling( |
| 246 | + features: np.ndarray, dimensions: int, metric: bool = True |
| 247 | +) -> np.ndarray: |
| 248 | + """ |
| 249 | + Multidimensional Scaling (MDS). |
| 250 | +
|
| 251 | + For more details, see: https://en.wikipedia.org/wiki/Multidimensional_scaling |
| 252 | + Parameters: |
| 253 | + * features: the features extracted from the dataset (shape: [n_features, n_samples]) |
| 254 | + * dimensions: target dimension for embedding |
| 255 | + * metric: if True, use metric MDS (classical), if False, use non-metric MDS |
| 256 | +
|
| 257 | + >>> test_multidimensional_scaling() |
| 258 | + """ |
| 259 | + if not features.any(): |
| 260 | + logging.error("Dataset empty") |
| 261 | + raise AssertionError |
| 262 | + |
| 263 | + # Transpose to have shape [n_samples, n_features] |
| 264 | + X = features.T |
| 265 | + n_samples = X.shape[0] |
| 266 | + |
| 267 | + if metric: |
| 268 | + # Classical MDS |
| 269 | + # Compute distance matrix |
| 270 | + D = cdist(X, X, metric='euclidean') |
| 271 | + D_squared = D ** 2 |
| 272 | + |
| 273 | + # Double centering |
| 274 | + H = np.eye(n_samples) - np.ones((n_samples, n_samples)) / n_samples |
| 275 | + B = -0.5 * H.dot(D_squared).dot(H) |
| 276 | + |
| 277 | + # Eigen decomposition - get all eigenvectors and select top ones |
| 278 | + eigenvalues, eigenvectors = eigh(B) |
| 279 | + |
| 280 | + # Sort in descending order and take top dimensions |
| 281 | + idx = np.argsort(eigenvalues)[::-1][:dimensions] |
| 282 | + eigenvalues = eigenvalues[idx] |
| 283 | + eigenvectors = eigenvectors[:, idx] |
| 284 | + |
| 285 | + # Embedding |
| 286 | + embedding = eigenvectors * np.sqrt(eigenvalues) |
| 287 | + |
| 288 | + else: |
| 289 | + |
| 290 | + # Initialize random configuration |
| 291 | + rng = np.random.RandomState(42) |
| 292 | + embedding = rng.randn(n_samples, dimensions) |
| 293 | + |
| 294 | + # Simple gradient descent (very basic implementation) |
| 295 | + D_original = cdist(X, X, metric='euclidean') |
| 296 | + |
| 297 | + for iteration in range(100): |
| 298 | + D_embedded = cdist(embedding, embedding, metric='euclidean') |
| 299 | + |
| 300 | + # Stress (loss function) |
| 301 | + stress = np.sum((D_original - D_embedded) ** 2) |
| 302 | + |
| 303 | + |
| 304 | + # Simple gradient update |
| 305 | + grad = np.zeros_like(embedding) |
| 306 | + for i in range(n_samples): |
| 307 | + for j in range(n_samples): |
| 308 | + if i != j: |
| 309 | + diff = embedding[i] - embedding[j] |
| 310 | + dist = np.linalg.norm(diff) |
| 311 | + if dist > 1e-10: |
| 312 | + grad[i] += 2 * (D_embedded[i, j] - D_original[i, j]) * (diff / dist) |
| 313 | + |
| 314 | + embedding -= 0.01 * grad / n_samples |
| 315 | + |
| 316 | + logging.info("Multidimensional Scaling computed") |
| 317 | + return embedding.T # Transpose back to match original format |
| 318 | + |
| 319 | + |
| 320 | +def test_locally_linear_embedding() -> None: |
| 321 | + """Test function for Locally Linear Embedding""" |
| 322 | + # Use float data to avoid dtype issues |
| 323 | + features = np.array([[1.0, 2.0, 3.0, 4.0], |
| 324 | + [2.0, 3.0, 4.0, 5.0], |
| 325 | + [3.0, 4.0, 5.0, 6.0]]) |
| 326 | + dimensions = 2 |
| 327 | + |
| 328 | + try: |
| 329 | + embedding = locally_linear_embedding(features, dimensions, n_neighbors=2) |
| 330 | + assert embedding.shape[0] == dimensions |
| 331 | + assert embedding.shape[1] == features.shape[1] |
| 332 | + except Exception as e: |
| 333 | + logging.error(f"LLE test failed: {e}") |
| 334 | + raise |
| 335 | + |
| 336 | + |
| 337 | +def test_multidimensional_scaling() -> None: |
| 338 | + """Test function for Multidimensional Scaling""" |
| 339 | + features = np.array([[1.0, 2.0, 3.0, 4.0], |
| 340 | + [2.0, 3.0, 4.0, 5.0], |
| 341 | + [3.0, 4.0, 5.0, 6.0]]) |
| 342 | + dimensions = 2 |
| 343 | + |
| 344 | + try: |
| 345 | + # Test metric MDS |
| 346 | + embedding_metric = multidimensional_scaling(features, dimensions, metric=True) |
| 347 | + assert embedding_metric.shape[0] == dimensions |
| 348 | + assert embedding_metric.shape[1] == features.shape[1] |
| 349 | + |
| 350 | + # Test non-metric MDS |
| 351 | + embedding_nonmetric = multidimensional_scaling(features, dimensions, metric=False) |
| 352 | + assert embedding_nonmetric.shape[0] == dimensions |
| 353 | + assert embedding_nonmetric.shape[1] == features.shape[1] |
| 354 | + |
| 355 | + except Exception as e: |
| 356 | + logging.error(f"MDS test failed: {e}") |
| 357 | + raise |
| 358 | + |
| 359 | + |
164 | 360 | def test_linear_discriminant_analysis() -> None: |
165 | 361 | # Create dummy dataset with 2 classes and 3 features |
166 | 362 | features = np.array([[1, 2, 3, 4, 5], [2, 3, 4, 5, 6], [3, 4, 5, 6, 7]]) |
|
0 commit comments