Como manejar secuencias de datos con redes neuronales
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

Combinando varias capas es posible aprender sobre series de tiempo

La "capa" recurrente se construye tres capas lineales, y el método forward() tomará la salida del estado oculto anterior o hidden state y la entrada siguiente a ser procesada del conjunto de entrenamiento.

from lightdlf_old.cpu.core import Tensor
from lightdlf_old.cpu.layers import Layer, Linear, Embedding, CrossEntropyLoss, Sigmoid, Tanh
from lightdlf_old.cpu.optimizers import SGD

class RNNCell(Layer):
    def __init__(self, n_inputs, n_hidden, n_output, activation='sigmoid'):
        super().__init__()

        self.n_inputs = n_inputs
        self.n_hidden = n_hidden
        self.n_output = n_output

        if(activation == 'sigmoid'):
            self.activation = Sigmoid()
        elif(activation == 'tanh'):
            self.activation = Tanh()
        else:
            raise Exception("Non-linearity not found")

        self.w_ih = Linear(n_inputs=n_inputs, n_outputs=n_hidden)
        self.w_hh = Linear(n_inputs=n_hidden, n_outputs=n_hidden)
        self.w_ho = Linear(n_inputs=n_hidden, n_outputs=n_output)

        self.parameters += self.w_ih.get_parameters()
        self.parameters += self.w_hh.get_parameters()
        self.parameters += self.w_ho.get_parameters()

    def forward(self, input, hidden):
        from_prev_hidden = self.w_hh.forward(hidden)
        from_actual_input = self.w_ih.forward(input)
        combined =  from_actual_input + from_prev_hidden
        new_hidden = self.activation.forward(combined)
        output = self.w_ho.forward(new_hidden)
        return output, new_hidden

    def init_hidden(self, batch_size=1):
        return Tensor(np.zeros((batch_size, self.n_hidden)), autograd=True)

Las RRNs tienen un vector de estado que pasa de iteracion a iteracion. Este vector de estado es el vector hidden, que es tanto un parametro de entrada y una variable de salida para la funcion forward()

Las RNNs tienen diferentes matrices de pesos:

  • w_ih: mapea vectores de entrada a vectores ocultos (procesa datos de entrada)
  • w_hh: mapea de vectores ocultos a otros vectores ocultos (que actualiza cada vector oculto en base al vector oculto anterior)
  • w_ho: vector opcional que aprende a hacer predicciones en base al vector oculto

w_ih y w_ho son del tamaño del vocabulario, todas las demas dimensiones son configurables en base al parametro n_hidden.

Finalmente, el parametro activation define la funcion no lineal a utilizar como activación en cada paso o timestep

Prueba de la red neuronal recurrente

import sys,random,math
from collections import Counter
import numpy as np

f = open('datasets/en/qa1_single-supporting-fact_train.txt','r')
raw = f.readlines()
f.close()

tokens = list()
for line in raw[0:1000]:
    tokens.append(line.lower().replace("\n","").split(" ")[1:])

new_tokens = list()
for line in tokens:
    new_tokens.append(['-'] * (6 - len(line)) + line)

tokens = new_tokens

vocab = set()
for sent in tokens:
    for word in sent:
        vocab.add(word)

vocab = list(vocab)

word2index = {}
for i,word in enumerate(vocab):
    word2index[word]=i
    
def words2indices(sentence):
    idx = list()
    for word in sentence:
        idx.append(word2index[word])
    return idx

indices = list()
for line in tokens:
    idx = list()
    for w in line:
        idx.append(word2index[w])
    indices.append(idx)

data = np.array(indices)
embed = Embedding(vocab_size=len(vocab), dim=16)
model = RNNCell(n_inputs=16, n_hidden=16, n_output=len(vocab))

criterion = CrossEntropyLoss()
params = model.get_parameters() + embed.get_parameters()
optim = SGD(parameters=params, alpha=0.05)

for iter in range(1000):
    batch_size = 100
    total_loss = 0
    
    hidden = model.init_hidden(batch_size=100)
    
    for t in range(5):
        input = Tensor(data[0:batch_size,t], autograd=True)
        rnn_input = embed.forward(input=input)
        output, hidden = model.forward(input=rnn_input, hidden=hidden)
        
    target = Tensor(data[0:batch_size,t+1], autograd=True)
    loss = criterion.forward(output, target)
    loss.backward(grad=None)
    optim.step()
    total_loss += loss.data
    if(iter % 200 == 0):
        p_correct = (target.data == np.argmax(output.data,axis=1)).mean()
        print("Loss:",total_loss / (len(data)/batch_size),"% Correct:",p_correct)
    
Loss: 0.4703201325613545 % Correct: 0.0
Loss: 0.17965988247202694 % Correct: 0.22
Loss: 0.16504169330685436 % Correct: 0.32
Loss: 0.1514492684363077 % Correct: 0.34
Loss: 0.13746893243465 % Correct: 0.37