Source code for dyconnmap.cluster.mng

# -*- coding: utf-8 -*-
""" Merge Neural Gas

Merge Neural Gas (MNG) [Strickert2003]_ is similar to the original Neural Gas algorithm, but each node,
has an additional context vector (:math:`c`) associated; and the best matching unit is
determined by a linear combination of both the weight and context vector (thus the merge),
from the previous iteration.

Similar to Neural Gas, :math:`N` nodes have their weights (:math:`w`) and context vectors (:math:`c`) randomly initialized.
When the network is presented with a new input sequence :math:`v`, the distance and the corresponding rank of each node :math:`n` is estimated.

1. The distance is computed by: :math:`d_i(n) = (1 - \\alpha) * ||v-w_i||^2 + ||c(n)-c_i||^2` and the context by: :math:`c(n) = (1 - \\beta) * w_{I_{n}-1} + \\beta * c_{I_{n}-1}`.

 * :math:`\\alpha` is the balancing factor between :math:`w` and :math:`c`

 * :math:`\\beta` is the merging degree between two subsequent iterations

 * :math:`I_{n}-1` denotes the previous iteration

2. Each node is adapted with:

 a. :math:`{\\Delta}w_i = e_w(k) * h_{\\lambda(k)} (r(d_i,d)) * (v-w_i)` and its context by
 b. :math:`{\\Delta}c_i = e_w(k) * h_{\\lambda(k)} (r(d_i,d)) * (c(n)-c_i)`

  * :math:`r(d_i, d)` denotes the rank of the :math:`i`-th node

As with Neural Gas, :math:`h_{\\lambda(k)}(r(d_i, d)) = exp(\\frac{-r(d_i, d)}{\\lambda(k)})` represents the neighborhood ranking function.
:math:`\\lambda(k)` denotes the influence of the neighbood: :math:`{\\lambda_0(\\frac{\\lambda_T}{\\lambda_0})}^{(\\frac{t}{T_{max}})}`.


Notes
-----

|

----

.. [Strickert2003] Strickert, M., & Hammer, B. (2003, September). Neural gas for sequences. In Proceedings of the Workshop on Self-Organizing Maps (WSOM’03) (pp. 53-57).
"""
# Author: Avraam Marimpis <avraam.marimpis@gmail.com>

import numpy as np
from sklearn.metrics.pairwise import pairwise_distances

from .cluster import BaseCluster


[docs]class MergeNeuralGas(BaseCluster): """ Merge Neural Gas Parameters ---------- n_protos : int The number of prototypes iterations : int The maximum iterations merge_coeffs : list of length 2 The merging coefficients epsilon : list of length 2 The initial and final training rates. lrate : list of length 2 The initial and final rearning rates. n_jobs : int Number of parallel jobs (will be passed to scikit-learn)). rng : object An object of type numpy.random.RandomState. Attributes ---------- protos : array-like, shape(n_protos, n_features) The prototypical vectors distortion : float The normalized distortion error """ def __init__( self, n_protos=10, iterations=1024, # merge_coeffs=[0.1, 0.0], merge_coeffs=None, # epsilon=[10, 0.001], epsilon=None, # lrate=[0.5, 0.005], lrate=None, n_jobs=1, rng=None, ): self.n_protos = n_protos self.iterations = iterations if merge_coeffs is None: merge_coeffs = [0.1, 0.0] self.a, self.b = merge_coeffs if epsilon is None: epsilon = [10, 0.001] self.epsilon_i, self.epsilon_f = epsilon if lrate is None: lrate = [0.5, 0.005] self.lrate_i, self.lrate_f = lrate self.n_jobs = n_jobs self.protos = None self.context = None self.distortion = 0.0 if rng is None: self.rng = np.random.RandomState() else: self.rng = rng self.__symbols = None self.__encoding = None
[docs] def fit(self, data): """ :param data: :return: """ [n_samples, n_obs] = data.shape self.protos = data[self.rng.choice(n_samples, self.n_protos),] # w self.context = np.zeros(self.protos.shape) # c ct = np.zeros((1, n_obs)) wr = ct cr = wr for iteration in range(self.iterations): sample = data[self.rng.choice(n_samples, 1),] ct = (1 - self.a) * wr + self.b * cr t = iteration / float(self.iterations) lrate = self.lrate_i * (self.lrate_f / float(self.lrate_i)) ** t epsilon = self.epsilon_i * (self.lrate_f / float(self.lrate_i)) ** t d = (1 - self.a) * pairwise_distances( sample, self.protos ) + self.a * pairwise_distances(ct, self.context) I = np.argsort(np.argsort(d)) min_id = np.where(I == 0)[0] H = np.exp(-I / epsilon).ravel() diff_w = sample - self.protos diff_c = ct - self.context for i in range(self.n_protos): self.protos[i, :] += lrate * H[i] * diff_w[i, :] self.context[i, :] += lrate * H[i] * diff_c[i, :] wr = self.protos[min_id] cr = self.context[min_id] return self
[docs] def encode(self, data, metric="euclidean"): return super().encode(data, metric, sort=False)