import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, InputSpec
from tensorflow.keras import initializers
import numpy as np
from ..architectures import create_autoencoder_model
from ..architectures import create_convolutional_autoencoder_model_2d
from ..architectures import create_convolutional_autoencoder_model_3d
from sklearn.cluster import KMeans
[docs]class DeepEmbeddedClustering(Layer):
"""
Deep embedded lustering layer.
Arguments
---------
number_of_clusters : integer
Specifies which axis to normalize.
initial_cluster_weights : list
Initial clustering weights.
alpha : scalar
Parameter.
Returns
-------
Keras layer
A keras layer
"""
def __init__(self, number_of_clusters=10, initial_cluster_weights=None, alpha=1.0, **kwargs):
super(DeepEmbeddedClustering, self).__init__(**kwargs)
self.number_of_clusters = number_of_clusters
self.initial_cluster_weights = initial_cluster_weights
self.alpha = alpha
self.input_spec = InputSpec(ndim=2)
def build(self, input_shape):
self.input_spec = InputSpec(dtype=K.floatx(), shape=(None, input_shape[1]))
if len(input_shape) != 2:
raise ValueError("input_shape is not of length 2.")
self.clusters = self.add_weight(shape=(self.number_of_clusters, input_shape[1]),
initializer=initializers.glorot_uniform(),
name='clusters')
if self.initial_cluster_weights != None:
self.set_weights(self.initial_cluster_weights)
self.initial_cluster_weights = None
self.built = True
def call(self, inputs, mask=None):
# Uses Student t-distribution (same as t-SNE)
# inputs are the variable containing the data, shape=(number_of_samples, number_of_features)
q = 1.0 / (1.0 + (K.sum(K.square(K.expand_dims(inputs, axis = 1)
- self.clusters), axis=2) / self.alpha))
q **= ((self.alpha + 1.0) / 2.0)
q = K.transpose(K.transpose(q) / K.sum(q, axis=1))
return(q)
def compute_output_shape(self, input_shape):
return(input_shape[0], self.number_of_clusters)
def get_config(self):
config = {"momentum": self.momentum, "axis": self.axis}
base_config = super(DeepEmbeddedClustering, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
[docs]class DeepEmbeddedClusteringModel(object):
"""
Deep embedded clustering with and without convolutions.
Arguments
---------
number_of_units_per_layer : integer
Autoencoder number of units per layer.
number_of_clusters : integer
Number of clusters.
alpha : scalar
Parameter
initializer : string
Initializer for autoencoder.
Returns
-------
Keras model
A keras clustering model.
"""
def __init__(self, number_of_units_per_layer=None, number_of_clusters=10, alpha=1.0,
initializer='glorot_uniform', convolutional=False, input_image_size=None):
super(DeepEmbeddedClusteringModel, self).__init__()
self.number_of_units_per_layer = number_of_units_per_layer
self.number_of_clusters = number_of_clusters
self.alpha = alpha
self.initializer = initializer
self.convolutional = convolutional
self.input_image_size = input_image_size
self.autoencoder = None
self.encoder = None
if self.convolutional == True:
if self.input_image_size == None:
raise ValueError("Need to specify the input image size for CNN.")
if len(self.input_image_size) == 3: # 2-D
self.autoencoder, self.encoder = create_convolutional_autoencoder_model_2d(
input_image_size=self.input_image_size,
number_of_filters_per_layer=self.number_of_units_per_layer)
else:
self.autoencoder, self.encoder = create_convolutional_autoencoder_model_3d(
input_image_size=self.input_image_size,
number_of_filters_per_layer=self.number_of_units_per_layer)
else:
self.autoencoder, self.encoder = create_autoencoder_model(
self.number_of_units_per_layer, initializer=self.initializer)
clustering_layer = DeepEmbeddedClustering(
self.number_of_clusters, name="clustering")(self.encoder.output)
if self.convolutional == True:
self.model = Model(inputs=self.encoder.input, outputs=[clustering_layer, self.autoencoder.output])
else:
self.model = Model(inputs=self.encoder.input, outputs=clustering_layer)
def pretrain(self, x, optimizer='adam', epochs=200, batch_size=256):
self.autoencoder.compile(optimizer=optimizer, loss='mse')
self.autoencoder.fit(x, x, batch_size=batch_size, epochs=epochs)
def load_weights(self, weights):
self.model.load_weights(weights)
def extract_features(self, x):
self.encoder.predict(x, verbose=0)
def predict_cluster_labels(self, x):
cluster_probabilities = self.model.predict(x, verbose=0)
return(cluster_probabilities.argmax(1))
def target_distribution(self, q):
weight = q**2 / q.sum(0)
p = (weight.T / weight.sum(1)).T
return(p)
def compile(self, optimizer='sgd', loss='kld', loss_weights=None):
self.model.compile(optimizer=optimizer, loss=loss, loss_weights=loss_weights)
def fit(self, x, max_number_of_iterations=2e4, batch_size=256, tolerance=1e-3, update_interval=140):
# Initialize clusters using k-means
kmeans = KMeans(n_clusters=self.number_of_clusters, n_init=20)
current_prediction = kmeans.fit_predict(self.encoder.predict(x))
previous_prediction = np.copy(current_prediction)
self.model.get_layer(name='clustering').set_weights([kmeans.cluster_centers_])
# Deep clustering
loss = 100000000
index = 0
index_array = np.arange(x.shape[0])
for i in range(max_number_of_iterations):
if i % update_interval == 0:
if self.convolutional == True:
q, _ = self.model.predict(x, verbose=0)
else:
q = self.model.predict(x, verbose=0)
p = self.target_distribution(q)
# Met stopping criterion
current_prediction = q.argmax(1)
delta_label = (np.sum(current_prediction != previous_prediction).astype(np.float32) /
current_prediction.shape[0])
previous_prediction = np.copy(current_prediction)
print("Iteration ", i, " (out of ", max_number_of_iterations,
"): loss = ", loss, ", delta_label = ", delta_label)
if i > 0 and delta_label < tolerance:
break
batch_indices = index_array[index * batch_size:min((index + 1) * batch_size, x.shape[0])]
if self.convolutional == True:
if len(self.input_image_size) == 3:
loss = self.model.train_on_batch(x=x[batch_indices,:,:,:],
y=[p[batch_indices,:], x[batch_indices,:,:,:]])
else:
loss = self.model.train_on_batch(x=x[batch_indices,:,:,:,:],
y=[p[batch_indices,:,:], x[batch_indices,:,:,:,:]])
else:
loss = self.model.train_on_batch(x=x[batch_indices,:], y=p[batch_indices,:])
if (index + 1) * batch_size <= x.shape[0]:
index += 1
else:
index = 0
return(current_prediction)