Muchas veces es necesario pasarle a una red neuronal conjuntos de datos discretos, como categorías, elementos de grupos o conjuntos, un ejemplo de esto son las palabras en un texto. Como estas no pueden ser pasadas directamente a una red neuronal, es necesario convertirlas en una representacion matemática.

Estas representaciones son llamados Embeddings que básicamente son vectores densos en donde un vector se corresoponde con una palabra, permitiendo que durante el entrenamiento de una red neuronal, esta capture información semántica de las palabras utilizando estas representaciones.

import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

Para poder utilizar embeddings en una red neuronal debemos agregar soporte para la indexación de estos vectores a la clase Tensor. Se debe asegurar que durante la retropropagación los gradientes sean puestos en las mismas filas indexadas en la propagación hacia adelante. Esto se resuelve manteniendo los indices utilizados en el forward pass. Para esto se agregan las siguientes lineas de código:

El siguiente metodo agrega los indices al Tensor

def index_select(self, indices):
    if(self.autograd):
        new = Tensor(self.data[indices.data],
                      autograd=True, 
                      creators=[self],
                      creation_op='index_select')
        # se agregan los indices como atributos del tensor
        new.index_select_indices = indices
        return new
    return Tensor(self.data[indices.data])

Se agrega a backward() la logica para retropropagar el gradiente a los indices

if(self.creation_op="index_select"):
    new_grad = np.zeros_like(self.creators[0].data)
    indices_ = self.index_select_indices.data.flatten()
    grad_ = grad.data.reshape(len(indices_), -1)
    for i in range(len(indices_)):
        new_grad[indices_[i]] = new_grad[indices_[i]] + grad_[i]
    self.creators[0].backward(Tensor(new_grad))

Esta condicion realiza lo siguiente:

  1. Inicializa un nuevo gradiente new_grad del tamaño correcto (el tamaño de la matriz original que fue indexada).
  2. Luego se aplanan los indices con flatten() para poder iterar sobre ellos.
  3. Colapsa el gradiente a una lista simple de filas grad_ (la lista de indices en indices_ y la lista de vectores grad_ estarán en orden).
  4. Iterar sobre cada indice, agregarlo a la fila correcta del nuevo gradiente que estamos creando y retropropagar con backward() en el self.creator[0]

Adicionalmente, al agregar las siguientes lineas de código:

if (self.autograd):
    if grad is None:
        grad = Tensor(np.ones_like(self.data))

Es posible llamar a la función backward() sin pasar un gradiente

import numpy as np


class Tensor(object):

    def __init__(self, data,
                 autograd=False,
                 creators=None,
                 creation_op=None,
                 id=None):
        '''
        Inicializa un tensor utilizando numpy

        @data: una lista de numeros
        @creators: lista de tensores que participarion en la creacion de un nuevo tensor
        @creators_op: la operacion utilizada para combinar los tensores en el nuevo tensor
        @autograd: determina si se realizara backprop o no sobre el tensor
        @id: identificador del tensor, para poder dar seguimiento a los hijos y padres del mismo
        '''
        self.data = np.array(data)
        self.creation_op = creation_op
        self.creators = creators
        self.grad = None
        self.autograd = autograd
        self.children = {}
        # se asigna un id al tensor
        if (id is None):
            id = np.random.randint(0, 100000)
        self.id = id

        # se hace un seguimiento de cuantos hijos tiene un tensor
        # si los creadores no es none
        if (creators is not None):
            # para cada tensor padre
            for c in creators:
                # se verifica si el tensor padre posee el id del tensor hijo
                # en caso de no estar, agrega el id del tensor hijo al tensor padre
                if (self.id not in c.children):
                    c.children[self.id] = 1
                # si el tensor ya se encuentra entre los hijos del padre
                # y vuelve a aparece, se incrementa en uno
                # la cantidad de apariciones del tensor hijo
                else:
                    c.children[self.id] += 1

    def all_children_grads_accounted_for(self):
        '''
        Verifica si un tensor ha recibido la cantidad
        correcta de gradientes por cada uno de sus hijos
        '''
        # print('tensor id:', self.id)
        for id, cnt in self.children.items():
            if (cnt != 0):
                return False
        return True

    def backward(self, grad, grad_origin=None):
        '''
        Funcion que propaga recursivamente el gradiente a los creators o padres del tensor

        @grad: gradiente
        @grad_orign
        '''
        if (self.autograd):
            if grad is None:
                grad = Tensor(np.ones_like(self.data))
            if (grad_origin is not None):
                # Verifica para asegurar si se puede hacer retropropagacion
                if (self.children[grad_origin.id] == 0):
                    raise Exception("No se puede retropropagar mas de una vez")
                # o si se está esperando un gradiente, en dicho caso se decrementa
                else:
                    # el contador para ese hijo
                    self.children[grad_origin.id] -= 1

        # acumula el gradiente de multiples hijos
        if (self.grad is None):
            self.grad = grad
        else:
            self.grad += grad

        if (self.creators is not None and
                (self.all_children_grads_accounted_for() or grad_origin is None)):

            if (self.creation_op == 'neg'):
                self.creators[0].backward(self.grad.__neg__())

            if (self.creation_op == 'add'):
                # al recibir self.grad, empieza a realizar backprop
                self.creators[0].backward(self.grad, grad_origin=self)
                self.creators[1].backward(self.grad, grad_origin=self)

            if (self.creation_op == "sub"):
                self.creators[0].backward(Tensor(self.grad.data), self)
                self.creators[1].backward(Tensor(self.grad.__neg__().data), self)

            if (self.creation_op == "mul"):
                new = self.grad * self.creators[1]
                self.creators[0].backward(new, self)
                new = self.grad * self.creators[0]
                self.creators[1].backward(new, self)

            if (self.creation_op == "mm"):
                layer = self.creators[0]  # activaciones => layer
                weights = self.creators[1]  # pesos = weights
                # c0 = self.creators[0]                       # activaciones => layer
                # c1 = self.creators[1]                       # pesos = weights
                # new = self.grad.mm(c1.transpose())  # grad = delta => delta x weights.T
                new = Tensor.mm(self.grad, weights.transpose())  # grad = delta => delta x weights.T
                layer.backward(new)
                # c0.backward(new)
                # new = self.grad.transpose().mm(c0).transpose() # (delta.T x layer).T = layer.T x delta
                new = Tensor.mm(layer.transpose(), self.grad)  # layer.T x delta
                weights.backward(new)
                # c1.backward(new)

            if (self.creation_op == "transpose"):
                self.creators[0].backward(self.grad.transpose())

            if ("sum" in self.creation_op):
                dim = int(self.creation_op.split("_")[1])
                self.creators[0].backward(self.grad.expand(dim, self.creators[0].data.shape[dim]))

            if ("expand" in self.creation_op):
                dim = int(self.creation_op.split("_")[1])
                self.creators[0].backward(self.grad.sum(dim))
                
            if(self.creation_op == "index_select"):
                # new_grad es un vector de 0s que luego contendra 
                # los gradientes para cada embedding
                new_grad = np.zeros_like(self.creators[0].data)
                # se obtienen los indices en un vector unidimensional
                indices_ = self.index_select_indices.data.flatten()
                grad_ = grad.data.reshape(len(indices_), -1)
                for i in range(len(indices_)):
                    new_grad[indices_[i]] = new_grad[indices_[i]] + grad_[i]
                self.creators[0].backward(Tensor(new_grad))
            
            if (self.creation_op == "sigmoid"):
                ones = Tensor(np.ones_like(self.grad.data))
                self.creators[0].backward(self.grad * (self * (ones - self)))
                
            if (self.creation_op == "tanh"):
                ones = Tensor(np.ones_like(self.grad.data))
                self.creators[0].backward(self.grad * (ones - (self * self)))
                
            if (self.creation_op == 'relu'):
                mask = Tensor(self.data > 0)
                self.creators[0].backward(self.grad * mask)

    def __neg__(self):
        if (self.autograd):
            return Tensor(self.data * -1,
                          autograd=True,
                          creators=[self],
                          creation_op='neg')
        return Tensor(self.data * -1)

    def __add__(self, other):
        '''
        @other: un Tensor
        '''
        if (self.autograd and other.autograd):
            return Tensor(self.data + other.data,
                          autograd=True,
                          creators=[self, other],
                          creation_op='add')
        return Tensor(self.data + other.data)

    def __sub__(self, other):
        '''
        @other: un Tensor
        '''
        if (self.autograd and other.autograd):
            return Tensor(self.data - other.data,
                          autograd=True,
                          creators=[self, other],
                          creation_op='sub')
        return Tensor(self.data - other.data)

    def __mul__(self, other):
        '''
        @other: un Tensor
        '''
        if (self.autograd and other.autograd):
            return Tensor(self.data * other.data,
                          autograd=True,
                          creators=[self, other],
                          creation_op="mul")
        return Tensor(self.data * other.data)

    def sum(self, dim):
        '''
        Suma atravez de dimensiones, si tenemos una matriz 2x3 y
        aplicamos sum(0) sumara todos los valores de las filas
        dando como resultado un vector 1x3, en cambio si se aplica
        sum(1) el resultado es un vector 2x1

        @dim: dimension para la suma
        '''
        if (self.autograd):
            return Tensor(self.data.sum(dim),
                          autograd=True,
                          creators=[self],
                          creation_op="sum_" + str(dim))
        return Tensor(self.data.sum(dim))

    def expand(self, dim, copies):
        '''
        Se utiliza para retropropagar a traves de una suma sum().
        Copia datos a lo largo de una dimension
        '''

        trans_cmd = list(range(0, len(self.data.shape)))
        trans_cmd.insert(dim, len(self.data.shape))
        new_data = self.data.repeat(copies).reshape(list(self.data.shape) + [copies]).transpose(trans_cmd)

        if (self.autograd):
            return Tensor(new_data,
                          autograd=True,
                          creators=[self],
                          creation_op="expand_" + str(dim))
        return Tensor(new_data)

    def transpose(self):
        if (self.autograd):
            return Tensor(self.data.transpose(),
                          autograd=True,
                          creators=[self],
                          creation_op="transpose")

        return Tensor(self.data.transpose())

    def mm(self, x):
        if (self.autograd):
            return Tensor(self.data.dot(x.data),
                          autograd=True,
                          creators=[self, x],
                          creation_op="mm")
        return Tensor(self.data.dot(x.data))
    
    def index_select(self, indices):
        if(self.autograd):
            new = Tensor(self.data[indices.data],
                          autograd=True, 
                          creators=[self],
                          creation_op='index_select')
            # se agregan los indices como atributos del tensor
            new.index_select_indices = indices
            return new
        return Tensor(self.data[indices.data])
    
    def sigmoid(self):
        if (self.autograd):
            return Tensor(1/(1+np.exp(-self.data)),
                          autograd=True,
                          creators=[self],creation_op='sigmoid')
        return Tensor(1/(1+np.exp(-self.data)))
    
    def tanh(self):
        if (self.autograd):
            return Tensor(np.tanh(self.data),
                          autograd=True,
                          creators=[self],
                          creation_op='tanh')
        return Tensor(np.tanh(self.data))
    
    def relu(self):
        ones_and_zeros = self.data > 0
        if (self.autograd):
            return Tensor(self.data * ones_and_zeros, 
                          autograd=True, 
                          creators=[self], 
                          creation_op='relu')
        return Tensor(self.data * ones_and_zeros)
    
    def __repr__(self):
        return str(self.data.__repr__())

    def __str__(self):
        return str(self.data.__str__())

Intuicion detras de los Embeddings de bag of words

x = Tensor([[1,0,0,0],     # estoy
            [0,1,0,0],     # mal
            [0,0,1,0],     # bien
            [0,0,0,1]],    # normal
           autograd=True)
y = x.index_select(indices=Tensor([[0,2]]))
print(y)
[[[1 0 0 0]
  [0 0 1 0]]]
z = y.sum(1)
print(z)
[[1 0 1 0]]
z.backward(Tensor([1,1,1,1]))
print('Tensor z:\n',z,'\n',z.creation_op, '\n', z.grad, '\n')

print('Tensor y:\n',y,'\n',y.creation_op, '\n', y.grad, '\n')

print('Tensor x:\n',x,'\n',x.creation_op, '\n', x.grad, '\n')
Tensor z:
 [[1 0 1 0]] 
 sum_1 
 [1 1 1 1] 

Tensor y:
 [[[1 0 0 0]
  [0 0 1 0]]] 
 index_select 
 [[1 1]
 [1 1]
 [1 1]
 [1 1]] 

Tensor x:
 [[1 0 0 0]
 [0 1 0 0]
 [0 0 1 0]
 [0 0 0 1]] 
 None 
 [[1 1 1 1]
 [0 0 0 0]
 [1 1 1 1]
 [0 0 0 0]] 

# Lo que ocurre internamente en el Tensor y
aux = np.array([[1,1],[1,1],[1,1],[1,1]])
indices = np.array([0,2]).flatten()
grad = aux.reshape(len(indices), -1)
print(aux)
print(indices)
print(grad)
[[1 1]
 [1 1]
 [1 1]
 [1 1]]
[0 2]
[[1 1 1 1]
 [1 1 1 1]]
class Layer(object):

    def __init__(self):
        self.parameters = list()

    def get_parameters(self):
        return self.parameters


class Linear(Layer):

    def __init__(self, n_inputs, n_outputs):
        super().__init__()
        W = np.random.randn(n_inputs, n_outputs) * np.sqrt(2.0 / (n_inputs))
        self.weight = Tensor(W, autograd=True)
        self.bias = Tensor(np.zeros(n_outputs), autograd=True)

        self.parameters.append(self.weight)
        self.parameters.append(self.bias)

    def forward(self, input):
        return Tensor.mm(input, self.weight) + self.bias.expand(0, len(input.data))


class Tanh(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.tanh()


class Sigmoid(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.sigmoid()


class Relu(Layer):
    def __init__(self):
        super().__init__()

    def forward(self, input):
        return input.relu()


class Sequential(Layer):

    def __init__(self, layers=list()):
        super().__init__()
        self.layers = layers

    def add(self, layer):
        self.layers.append(layer)

    def forward(self, input):
        for layer in self.layers:
            input = layer.forward(input)
        return input

    def get_parameters(self):
        params = list()
        for l in self.layers:
            params += l.get_parameters()
        return params


class MSELoss(Layer):

    def __init__(self):
        super().__init__()

    def forward(self, pred, target):
        return ((pred - target) * (pred - target)).sum(0)
class SGD(object):

    def __init__(self, parameters, alpha=0.1):
        self.parameters = parameters
        self.alpha = alpha

    def zero(self):
        for p in self.parameters:
            p.grad.data *= 0

    def step(self, zero=True):
        for p in self.parameters:
            p.data = p.data - (self.alpha * p.grad.data)

            if(zero):
                p.grad.data *= 0

Definiendo una capa de Embedding

class Embedding(Layer):
    def __init__(self, vocab_size, dim):
        super().__init__()
        self.vocab_size = vocab_size
        self.dim = dim
        # Esta inicializacion randomica es la convencion 
        # para inicializar embeddings de word2vec
        weight = Tensor((np.random.rand(vocab_size, dim) - 0.5) / dim, autograd=True)
        self.weight = weight
        # se agregan los pesos a los parametros de la capa
        self.parameters.append(self.weight)
        
    def forward(self, input):
        return self.weight.index_select(input)

Prueba de la capa de Embeddings

np.random.seed(0)

data = Tensor(np.array([1,2,1,2]), autograd=True)
target = Tensor(np.array([[0],[1],[0],[1]]), autograd=True)

embed = Embedding(5,3)
model = Sequential([embed, Tanh(), Linear(3,1), Sigmoid()])
criterion = MSELoss()
optim = SGD(model.get_parameters(), alpha=0.5)

for i in range(10):
    # Predecir
    pred = model.forward(data)
    
    # Comparar
    loss = criterion.forward(pred, target)
    
    # Aprender
    loss.backward(Tensor(np.ones_like(loss.data)))
    optim.step()
    print(loss)
    
[0.98874126]
[0.6658868]
[0.45639889]
[0.31608168]
[0.2260925]
[0.16877423]
[0.13120515]
[0.10555487]
[0.08731868]
[0.07387834]