Building a RNN-LSTM completely from scratch (no libraries!)

In this post, we are going to build a RNN-LSTM completely from scratch only by using numpy (coding like it’s 1999). LSTMs belong to the family of recurrent neural networks which are very usefull for learning sequential data as texts, time series or video data. While traditional feedforward networks consist of an input layer, a hidden layer, an output layer and the weights, bias, and activation function between each layer, the RNN network incorporates a hidden state layer which is connected to itself (recurrent).

Hence, between layer_1 and layer_2 we do not merely pass the weighted+bias input but also, include the previous hidden state. In case we unroll the LSTM, we are able to think of it as a chain of feedforward networks that are connected to themselves over time.

As we massively increase the number of backpropagation steps due to the hidden state we experience the problem of vanishing gradient. This implies that the weights of the last layers of the network will be more affected than the weights of the first layer as the magnitude of the gradient decreases with each propagation step. To update the weights recursively for each layer correctly, we need to maintain the gradient value as we backpropagate through the network.
The solution to this problem is provided by replacing the RNN cell (hidden state) with an LSTM cell. This cell which uses an input, forget and output gate as well as a cell state.

A gate can be understood as a layer which merely performs a series of matrix operations. For a better understanding – it might be helpful to think of the LSTM cell as a single layer mini neural network. Each gate has its weight matrix which makes the LSTM cell as such fully differentiable. Hence, we can compute the derivative of the gates and update the weight matrix via gradient descent.

In an ideal scenario, the LSTM cell memorizes the data that is relevant and forgets any long-term information that is no longer needed whenever a new input arrives.  The cell state represents the long-term memory and stores the relevant information from all time steps. The hidden state acts as working memory and is analogous to the latent state in RNNs. The forget gate defines merely whether to keep or to omit information that is stored in the cell state. Whereas the input gate determines the amount of information at the current time step that should be stored in the cell state. The output gate on the hand acts as an attention mechanism and decides what part of the data the neural network should focus. The whole process of learning what to learn what information to keep and what to forget happens (magically) by adjusting the weights via gradient descent.

LSTM networks can be applied to almost any kind of sequential data. One of the most popular use case is natural language processing, which we will also tackle in this post by solely using numpy. First, we start by defining the RNN class. In a second step, we define the LSTM cell which replaces the hidden state layer from the previously defined RNN. Given that, we will write some data loading functions which we are going to need to train our model. In the last step, we will train our model using backpropagation and produce an output sample using our model that is build and trained without any additional libraries.

The RNN class below includes the sigmoid activation function, forward and backpropagation as well as the RMSprop and weight update routine. Since we want to capture the sequential dependence of the data through time, we shift our output by one-time step. The hyperparameter learning rate controls the convergence speed of the weight updates. The comments in the code snippets below are mostly particular and contribute to a better understanding of each step.

import numpy as np

class RecurrentNeuralNetwork:
    #input (in t), expected output (shifted by one time step), num of words (num of recurrences), array of expected outputs, learning rate
    
    def __init__ (self, input, output, recurrences, expected_output, learning_rate):
        #initial input 
        self.x = np.zeros(input)
        #input size 
        self.input = input
        #expected output (shifted by one time step)
        self.y = np.zeros(output)
        #output size
        self.output = output
        #weight matrix for interpreting results from hidden state cell (num words x num words matrix)
        #random initialization is crucial here
        self.w = np.random.random((output, output))
        #matrix used in RMSprop in order to decay the learning rate
        self.G = np.zeros_like(self.w)
        #length of the recurrent network - number of recurrences i.e num of inputs
        self.recurrences = recurrences
        #learning rate 
        self. = learning_rate
        #array for storing inputs
        self.ia = np.zeros((recurrences+1,input))
        #array for storing cell states
        self.ca = np.zeros((recurrences+1,output))
        #array for storing outputs
        self.oa = np.zeros((recurrences+1,output))
        #array for storing hidden states
        self.ha = np.zeros((recurrences+1,output))
        #forget gate 
        self.af = np.zeros((recurrences+1,output))
        #input gate
        self.ai = np.zeros((recurrences+1,output))
        #cell state
        self.ac = np.zeros((recurrences+1,output))
        #output gate
        self.ao = np.zeros((recurrences+1,output))
        #array of expected output values
        self.expected_output = np.vstack((np.zeros(expected_output.shape[0]), expected_output.T))
        #declare LSTM cell (input, output, amount of recurrence, learning rate)
        self.LSTM = LSTM(input, output, recurrences, learning_rate)
    
    #activation function. simple nonlinearity, converts influx floats into probabilities between 0 and 1
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    #the derivative of the sigmoid function. We need it to compute gradients for backpropagation
    def dsigmoid(self, x):
        return self.sigmoid(x) * (1 - self.sigmoid(x))    
    
    #Here is where magic happens: We apply a series of matrix operations to our input and compute an output 
    def forwardProp(self):
        for i range(1, self.recurrences+1):
            self.LSTM.x = np.hstack((self.ha[i-1], self.x))
            cs, hs, f, c, o = self.LSTM.forwardProp()
            #store cell state from the forward propagation
            self.ca[i] = cs #cell state
            self.ha[i] = hs #hidden state
            self.af[i] = f #forget state
            self.ai[i] = inp #inpute gate
            self.ac[i] = c #cell state
            self.ao[i] = o #output gate
            self.oa[i] self.sigmoid(np.dot(self.w, hs)) #activate the weight*input
            self.x = self.expected_output[i-1]
        return self.oa
   
    
    def backProp(self):
        #Backpropagation of our weight matrices (Both in our Recurrent network + weight matrices inside LSTM cell)
        #start with an empty error value 
        totalError = 0
        #initialize matrices for gradient updates
        #First, these are RNN level gradients
        #cell state
        dfcs = np.zeros(self.output)
        #hidden state,
        dfhs = np.zeros(self.output)
        #weight matrix
        tu = np.zeros((self.output,self.output))
        #Next, these are LSTM level gradients
        #forget gate
        tfu = np.zeros((self.output, self.input+self.output))
        #input gate
        tiu = np.zeros((self.output, self.input+self.output))
        #cell unit
        tcu = np.zeros((self.output, self.input+self.output))
        #output gate
        tou = np.zeros((self.output, self.input+self.output))
        #loop backwards through recurrences
        for i in range(self.recurrences, -1, -1):
            #error = calculatedOutput - expectedOutput
            error = self.oa[i] - self.expected_output[i]
            #calculate update for weight matrix
            #Compute the partial derivative with (error * derivative of the output) * hidden state
            tu += np.dot(np.atleast_2d(error * self.dsigmoid(self.oa[i])), np.atleast_2d(self.ha[i]).T)
            #Now propagate the error back to exit of LSTM cell
            #1. error * RNN weight matrix
            error = np.dot(error, self.w)
            #2. set input values of LSTM cell for recurrence i (horizontal stack of array output, hidden + input)
            self.LSTM.x = np.hstack((self.ha[i-1], self.ia[i]))
            #3. set cell state of LSTM cell for recurrence i (pre-updates)
            self.LSTM.cs = self.ca[i]
            #Finally, call the LSTM cell's backprop and retreive gradient updates
            #Compute the gradient updates for forget, input, cell unit, and output gates + cell states + hiddens states
            fu, iu, cu, ou, dfcs, dfhs = self.LSTM.backProp(error, self.ca[i-1], self.af[i], self.ai[i], self.ac[i], self.ao[i], dfcs, dfhs)
            #Accumulate the gradients by calculating total error (not necesarry, used to measure training progress)
            totalError += np.sum(error)
            #forget gate
            tfu += fu
            #input gate
            tiu += iu
            #cell state
            tcu += cu
            #output gate
            tou += ou
        #update LSTM matrices with average of accumulated gradient updates    
        self.LSTM.update(tfu/self.recurrences, tiu/self.recurrences, tcu/self.recurrences, tou/self.recurrences) 
        #update weight matrix with average of accumulated gradient updates  
        self.update(tu/self.recurrences)
        #return total error of this iteration
        return totalError
    
    def update(self, u):
        #Implementation of RMSprop in the vanilla world
        #We decay our learning rate to increase convergence speed
        self.G = 0.95 * self.G + 0.1 * u**2  
        self.w -= self.learning_rate/np.sqrt(self.G + 1e-8) * u
        return
    
    #We define a sample function which produces the output once we trained our model
    #let's say that we feed an input observed at time t, our model will produce an output that can be 
    #observe in time t+1 
    def sample(self):
         #loop through recurrences - start at 1 so the 0th entry of all output will be an array of 0's
        for i in range(1, self.recurrences+1):
            #set input for LSTM cell, combination of input (previous output) and previous hidden state
            self.LSTM.x = np.hstack((self.ha[i-1], self.x))
            #run forward prop on the LSTM cell, retrieve cell state and hidden state
            cs, hs, f, inp, c, o = self.LSTM.forwardProp()
            #store input as vector
            maxI = np.argmax(self.x)
            self.x = np.zeros_like(self.x)
            self.x[maxI] = 1
            self.ia[i] = self.x #Use np.argmax?
            #store cell states
            self.ca[i] = cs
            #store hidden state
            self.ha[i] = hs
            #forget gate
            self.af[i] = f
            #input gate
            self.ai[i] = inp
            #cell state
            self.ac[i] = c
            #output gate
            self.ao[i] = o
            #calculate output by multiplying hidden state with weight matrix
            self.oa[i] = self.sigmoid(np.dot(self.w, hs))
            #compute new input
            maxI = np.argmax(self.oa[i])
            newX = np.zeros_like(self.x)
            newX[maxI] = 1
            self.x = newX
        #return all outputs    
        return self.oa

In the next step, we define the LSTM cell which is a neural network by itself and hence is similarly constructed as the RNN.

class LSTM:
    # LSTM cell (input, output, amount of recurrence, learning rate)
    def __init__ (self, input, output, recurrences, learning_rate):
        #input size
        self.x = np.zeros(input+output)
        #input size
        self.input = input + output
        #output 
        self.y = np.zeros(output)
        #output size
        self.output = output
        #cell state intialized as size of prediction
        self.cs = np.zeros(output)
        #how often to perform recurrence
        self.recurrences = recurrences
        #balance the rate of training (learning rate)
        self.learning_rate = learning_rate
        #init weight matrices for our gates
        #forget gate
        self.f = np.random.random((output, input+output))
        #input gate
        self.i = np.random.random((output, input+output))
        #cell state
        self.c = np.random.random((output, input+output))
        #output gate
        self.o = np.random.random((output, input+output))
        #forget gate gradient
        self.Gf = np.zeros_like(self.f)
        #input gate gradient
        self.Gi = np.zeros_like(self.i)
        #cell state gradient
        self.Gc = np.zeros_like(self.c)
        #output gate gradient
        self.Go = np.zeros_like(self.o)
    
    #activation function. simple nonlinearity, converts influx floats into probabilities between 0 and 1
    #same as in rnn class
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    #The derivativeative of the sigmoid function. We need it to compute gradients for backpropagation
    def dsigmoid(self, x):
        return self.sigmoid(x) * (1 - self.sigmoid(x))
    
    #Within the LSTM cell we are going to use a tanh activation function.
    #Long story short: This leads to stronger gradients (derivativeatives are higher) which is usefull as our data is centered around 0, 
    def tangent(self, x):
        return np.tanh(x)
    
    #derivativeative for computing gradients
    def dtangent(self, x):
        return 1 - np.tanh(x)**2
    
    #Here is where magic happens: We apply a series of matrix operations to our input and compute an output 
    def forwardProp(self):
        f = self.sigmoid(np.dot(self.f, self.x))
        self.cs *= f
        i = self.sigmoid(np.dot(self.i, self.x))
        c = self.tangent(np.dot(self.c, self.x))
        self.cs += i * c
        o = self.sigmoid(np.dot(self.o, self.x))
        self.y = o * self.tangent(self.cs)
        return self.cs, self.y, f, i, c, o
    
   
    def backProp(self, e, pcs, f, i, c, o, dfcs, dfhs):
        #error = error + hidden state derivativeative. we clip the value between -6 and 6 to prevent vanishing
        e = np.clip(e + dfhs, -6, 6)
        #multiply error by activated cell state to compute output derivativeative
        do = self.tangent(self.cs) * e
        #output update = (output derivative * activated output) * input
        ou = np.dot(np.atleast_2d(do * self.dtangent(o)).T, np.atleast_2d(self.x))
        #derivativeative of cell state = error * output * derivativeative of cell state + derivative cell
        #compute gradients and update them in reverse order!
        dcs = np.clip(e * o * self.dtangent(self.cs) + dfcs, -6, 6)
        #derivativeative of cell = derivativeative cell state * input
        dc = dcs * i
        #cell update = derivativeative cell * activated cell * input
        cu = np.dot(np.atleast_2d(dc * self.dtangent(c)).T, np.atleast_2d(self.x))
        #derivativeative of input = derivative cell state * cell
        di = dcs * c
        #input update = (derivative input * activated input) * input
        iu = np.dot(np.atleast_2d(di * self.dsigmoid(i)).T, np.atleast_2d(self.x))
        #derivative forget = derivative cell state * all cell states
        df = dcs * pcs
        #forget update = (derivative forget * derivative forget) * input
        fu = np.dot(np.atleast_2d(df * self.dsigmoid(f)).T, np.atleast_2d(self.x))
        #derivative cell state = derivative cell state * forget
        dpcs = dcs * f
        #derivative hidden state = (derivative cell * cell) * output + derivative output * output * output derivative input * input * output + derivative forget
        #* forget * output
        dphs = np.dot(dc, self.c)[:self.output] + np.dot(do, self.o)[:self.output] + np.dot(di, self.i)[:self.output] + np.dot(df, self.f)[:self.output] 
        #return update gradients for forget, input, cell, output, cell state, hidden state
        return fu, iu, cu, ou, dpcs, dphs
            
    def update(self, fu, iu, cu, ou):
        #Update forget, input, cell, and output gradients
        self.Gf = 0.9 * self.Gf + 0.1 * fu**2 
        self.Gi = 0.9 * self.Gi + 0.1 * iu**2   
        self.Gc = 0.9 * self.Gc + 0.1 * cu**2   
        self.Go = 0.9 * self.Go + 0.1 * ou**2   
        
        #Update our gates using our gradients
        self.f -= self.learning_rate/np.sqrt(self.Gf + 1e-8) * fu
        self.i -= self.learning_rate/np.sqrt(self.Gi + 1e-8) * iu
        self.c -= self.learning_rate/np.sqrt(self.Gc + 1e-8) * cu
        self.o -= self.learning_rate/np.sqrt(self.Go + 1e-8) * ou
        return

That’s for the definition of our RNN-LSTM network. Now we merely load some sequential data, in this case, a research paper of mine and try to predict the next word of each given the word. We export the preprocessed text data and store it as a plain text file.

def read_text_file():
    #Open textfile and return a separated input and output variable(series of words)
    with open("main.txt", "r") as text_file:
        data = text_file.read()
    text = list(data)
    outputSize = len(text)
    data = list(set(text))
    uniqueWords, dataSize = len(data), len(data) 
    returnData = np.zeros((uniqueWords, dataSize))
    for i in range(0, dataSize):
        returnData[i][i] = 1
    returnData = np.append(returnData, np.atleast_2d(data), axis=0)
    output = np.zeros((uniqueWords, outputSize))
    for i in range(0, outputSize):
        index = np.where(np.asarray(data) == text[i])
        output[:,i] = returnData[0:-1,index[0]].astype(float).ravel()  
    return returnData, uniqueWords, output, outputSize, data

#Write the predicted output (series of words) to disk
def export_to_textfile(output, data):
    finalOutput = np.zeros_like(output)
    prob = np.zeros_like(output[0])
    outputText = ""
    print(len(data))
    print(output.shape[0])
    for i in range(0, output.shape[0]):
        for j in range(0, output.shape[1]):
            prob[j] = output[i][j] / np.sum(output[i])
        outputText += np.random.choice(data, p=prob)    
    with open("output.txt", "w") as text_file:
        text_file.write(outputText)
    return

Finally, we can put all pieces together, read the text file from disk, train our model and take a look at the predicted output.  For a given error threshold our model performs a forward propagation step and predicts an output sample.

print("Initialize Hyperparameters")
iterations = 10000
learningRate = 0.001
print("Reading Inputa/Output data from disk")
returnData, numCategories, expectedOutput, outputSize, data = read_text_file()
print("Reading from disk done. Proceeding with training.")
#Initialize the RNN using our hyperparameters and data
RNN = RecurrentNeuralNetwork(numCategories, numCategories, outputSize, expectedOutput, learningRate)

#training time!
for i in range(1, iterations):
    #Predict the next word of each word
    RNN.forwardProp()
    #update all our weights using our error
    error = RNN.backProp()
    #For a given error threshold
    print("Reporting error on iteration ", i, ": ", error)
    if error > -10 and error < 10 or i % 10 == 0:
        #We provide a seed word
        seed = np.zeros_like(RNN.x)
        maxI = np.argmax(np.random.random(RNN.x.shape))
        seed[maxI] = 1
        RNN.x = seed  
        #and predict the upcoming one
        output = RNN.sample()
        print(output)    
        #finally, we store it to disk
        export_to_textfile(output, data)
        print("Done Writing")
print("Train/Prediction routine complete.")

The lines of code that I produced in this post are far from what I expected in the first place. Nevertheless, building fundamental models like the RNN-LSTM helps to clarify it’s idea and functionality which is easily overlooked.

Print Friendly, PDF & Email