# Building a RNN-LSTM completely from scratch (no libraries!)

In this post, we are going to build a RNN-LSTM completely from scratch only by using numpy (coding like it’s 1999). LSTMs belong to the family of recurrent neural networks which are very usefull for learning sequential data as texts, time series or video data. While traditional feedforward networks consist of an input layer, a hidden layer, an output layer and the weights, bias, and activation function between each layer, the RNN network incorporates a hidden state layer which is connected to itself (recurrent).

Hence, between layer_1 and layer_2 we do not merely pass the weighted+bias input but also, include the previous hidden state. In case we unroll the LSTM, we are able to think of it as a chain of feedforward networks that are connected to themselves over time.

As we massively increase the number of backpropagation steps due to the hidden state we experience the problem of vanishing gradient. This implies that the weights of the last layers of the network will be more affected than the weights of the first layer as the magnitude of the gradient decreases with each propagation step. To update the weights recursively for each layer correctly, we need to maintain the gradient value as we backpropagate through the network.
The solution to this problem is provided by replacing the RNN cell (hidden state) with an LSTM cell. This cell which uses an input, forget and output gate as well as a cell state.

A gate can be understood as a layer which merely performs a series of matrix operations. For a better understanding – it might be helpful to think of the LSTM cell as a single layer mini neural network. Each gate has its weight matrix which makes the LSTM cell as such fully differentiable. Hence, we can compute the derivative of the gates and update the weight matrix via gradient descent.

$\\&space;\text{Gating&space;Variables}&space;\\&space;&&space;\mathbf&space;g^i&space;=&space;\sigma(\mathbf&space;W^u&space;*&space;\mathbf&space;h_{t-1}&space;+&space;\mathbf&space;I^i&space;*&space;\x_t)&space;\\&space;&&space;\mathbf&space;g^f&space;=&space;\sigma(\mathbf&space;W^f&space;*&space;\mathbf&space;h_{t-1}&space;+&space;\mathbf&space;I^f&space;*&space;\x_t)&space;\\&space;&&space;\mathbf&space;g^o&space;=&space;\sigma(\mathbf&space;W^o&space;*&space;\mathbf&space;h_{t-1}&space;+&space;\mathbf&space;I^o&space;*&space;\x_t)&space;\\&space;\\&space;\text{Candidate&space;(memory)&space;state}\\&space;&&space;\mathbf&space;g^c&space;=&space;\tanh(\mathbf&space;W^c&space;*&space;\mathbf&space;h_{t-1}&space;+&space;\mathbf&space;I^c&space;*&space;\x_t)&space;\\&space;\\&space;\text{Cell&space;\&&space;Hidden&space;state}\\&space;&&space;\mathbf&space;m_t&space;=&space;\mathbf&space;g^f&space;\odot&space;\mathbf&space;+&space;\mathbf&space;g^u&space;\odot&space;\mathbf&space;g^c&space;\\&space;&&space;\mathbf&space;h_t&space;=&space;\tanh(\mathbf&space;g^o&space;\odot&space;\mathbf&space;m_{t-1})$

In an ideal scenario, the LSTM cell memorizes the data that is relevant and forgets any long-term information that is no longer needed whenever a new input arrives.  The cell state represents the long-term memory and stores the relevant information from all time steps. The hidden state acts as working memory and is analogous to the latent state in RNNs. The forget gate defines merely whether to keep or to omit information that is stored in the cell state. Whereas the input gate determines the amount of information at the current time step that should be stored in the cell state. The output gate on the hand acts as an attention mechanism and decides what part of the data the neural network should focus. The whole process of learning what to learn what information to keep and what to forget happens (magically) by adjusting the weights via gradient descent.

LSTM networks can be applied to almost any kind of sequential data. One of the most popular use case is natural language processing, which we will also tackle in this post by solely using numpy. First, we start by defining the RNN class. In a second step, we define the LSTM cell which replaces the hidden state layer from the previously defined RNN. Given that, we will write some data loading functions which we are going to need to train our model. In the last step, we will train our model using backpropagation and produce an output sample using our model that is build and trained without any additional libraries.

The RNN class below includes the sigmoid activation function, forward and backpropagation as well as the RMSprop and weight update routine. Since we want to capture the sequential dependence of the data through time, we shift our output by one-time step. The hyperparameter learning rate controls the convergence speed of the weight updates. The comments in the code snippets below are mostly particular and contribute to a better understanding of each step.

import numpy as np

class RecurrentNeuralNetwork:
#input (in t), expected output (shifted by one time step), num of words (num of recurrences), array of expected outputs, learning rate

def __init__ (self, input, output, recurrences, expected_output, learning_rate):
#initial input
self.x = np.zeros(input)
#input size
self.input = input
#expected output (shifted by one time step)
self.y = np.zeros(output)
#output size
self.output = output
#weight matrix for interpreting results from hidden state cell (num words x num words matrix)
#random initialization is crucial here
self.w = np.random.random((output, output))
#matrix used in RMSprop in order to decay the learning rate
self.G = np.zeros_like(self.w)
#length of the recurrent network - number of recurrences i.e num of inputs
self.recurrences = recurrences
#learning rate
self. = learning_rate
#array for storing inputs
self.ia = np.zeros((recurrences+1,input))
#array for storing cell states
self.ca = np.zeros((recurrences+1,output))
#array for storing outputs
self.oa = np.zeros((recurrences+1,output))
#array for storing hidden states
self.ha = np.zeros((recurrences+1,output))
#forget gate
self.af = np.zeros((recurrences+1,output))
#input gate
self.ai = np.zeros((recurrences+1,output))
#cell state
self.ac = np.zeros((recurrences+1,output))
#output gate
self.ao = np.zeros((recurrences+1,output))
#array of expected output values
self.expected_output = np.vstack((np.zeros(expected_output.shape[0]), expected_output.T))
#declare LSTM cell (input, output, amount of recurrence, learning rate)
self.LSTM = LSTM(input, output, recurrences, learning_rate)

#activation function. simple nonlinearity, converts influx floats into probabilities between 0 and 1
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))

#the derivative of the sigmoid function. We need it to compute gradients for backpropagation
def dsigmoid(self, x):
return self.sigmoid(x) * (1 - self.sigmoid(x))

#Here is where magic happens: We apply a series of matrix operations to our input and compute an output
def forwardProp(self):
for i range(1, self.recurrences+1):
self.LSTM.x = np.hstack((self.ha[i-1], self.x))
cs, hs, f, c, o = self.LSTM.forwardProp()
#store cell state from the forward propagation
self.ca[i] = cs #cell state
self.ha[i] = hs #hidden state
self.af[i] = f #forget state
self.ai[i] = inp #inpute gate
self.ac[i] = c #cell state
self.ao[i] = o #output gate
self.oa[i] self.sigmoid(np.dot(self.w, hs)) #activate the weight*input
self.x = self.expected_output[i-1]
return self.oa

def backProp(self):
#Backpropagation of our weight matrices (Both in our Recurrent network + weight matrices inside LSTM cell)
totalError = 0
#First, these are RNN level gradients
#cell state
dfcs = np.zeros(self.output)
#hidden state,
dfhs = np.zeros(self.output)
#weight matrix
tu = np.zeros((self.output,self.output))
#Next, these are LSTM level gradients
#forget gate
tfu = np.zeros((self.output, self.input+self.output))
#input gate
tiu = np.zeros((self.output, self.input+self.output))
#cell unit
tcu = np.zeros((self.output, self.input+self.output))
#output gate
tou = np.zeros((self.output, self.input+self.output))
#loop backwards through recurrences
for i in range(self.recurrences, -1, -1):
#error = calculatedOutput - expectedOutput
error = self.oa[i] - self.expected_output[i]
#calculate update for weight matrix
#Compute the partial derivative with (error * derivative of the output) * hidden state
tu += np.dot(np.atleast_2d(error * self.dsigmoid(self.oa[i])), np.atleast_2d(self.ha[i]).T)
#Now propagate the error back to exit of LSTM cell
#1. error * RNN weight matrix
error = np.dot(error, self.w)
#2. set input values of LSTM cell for recurrence i (horizontal stack of array output, hidden + input)
self.LSTM.x = np.hstack((self.ha[i-1], self.ia[i]))
#3. set cell state of LSTM cell for recurrence i (pre-updates)
self.LSTM.cs = self.ca[i]
#Compute the gradient updates for forget, input, cell unit, and output gates + cell states + hiddens states
fu, iu, cu, ou, dfcs, dfhs = self.LSTM.backProp(error, self.ca[i-1], self.af[i], self.ai[i], self.ac[i], self.ao[i], dfcs, dfhs)
#Accumulate the gradients by calculating total error (not necesarry, used to measure training progress)
totalError += np.sum(error)
#forget gate
tfu += fu
#input gate
tiu += iu
#cell state
tcu += cu
#output gate
tou += ou
self.LSTM.update(tfu/self.recurrences, tiu/self.recurrences, tcu/self.recurrences, tou/self.recurrences)
self.update(tu/self.recurrences)

def update(self, u):
#Implementation of RMSprop in the vanilla world
#We decay our learning rate to increase convergence speed
self.G = 0.95 * self.G + 0.1 * u**2
self.w -= self.learning_rate/np.sqrt(self.G + 1e-8) * u
return

#We define a sample function which produces the output once we trained our model
#let's say that we feed an input observed at time t, our model will produce an output that can be
#observe in time t+1
def sample(self):
#loop through recurrences - start at 1 so the 0th entry of all output will be an array of 0's
for i in range(1, self.recurrences+1):
#set input for LSTM cell, combination of input (previous output) and previous hidden state
self.LSTM.x = np.hstack((self.ha[i-1], self.x))
#run forward prop on the LSTM cell, retrieve cell state and hidden state
cs, hs, f, inp, c, o = self.LSTM.forwardProp()
#store input as vector
maxI = np.argmax(self.x)
self.x = np.zeros_like(self.x)
self.x[maxI] = 1
self.ia[i] = self.x #Use np.argmax?
#store cell states
self.ca[i] = cs
#store hidden state
self.ha[i] = hs
#forget gate
self.af[i] = f
#input gate
self.ai[i] = inp
#cell state
self.ac[i] = c
#output gate
self.ao[i] = o
#calculate output by multiplying hidden state with weight matrix
self.oa[i] = self.sigmoid(np.dot(self.w, hs))
#compute new input
maxI = np.argmax(self.oa[i])
newX = np.zeros_like(self.x)
newX[maxI] = 1
self.x = newX
#return all outputs
return self.oa


In the next step, we define the LSTM cell which is a neural network by itself and hence is similarly constructed as the RNN.

class LSTM:
# LSTM cell (input, output, amount of recurrence, learning rate)
def __init__ (self, input, output, recurrences, learning_rate):
#input size
self.x = np.zeros(input+output)
#input size
self.input = input + output
#output
self.y = np.zeros(output)
#output size
self.output = output
#cell state intialized as size of prediction
self.cs = np.zeros(output)
#how often to perform recurrence
self.recurrences = recurrences
#balance the rate of training (learning rate)
self.learning_rate = learning_rate
#init weight matrices for our gates
#forget gate
self.f = np.random.random((output, input+output))
#input gate
self.i = np.random.random((output, input+output))
#cell state
self.c = np.random.random((output, input+output))
#output gate
self.o = np.random.random((output, input+output))
self.Gf = np.zeros_like(self.f)
self.Gi = np.zeros_like(self.i)
self.Gc = np.zeros_like(self.c)
self.Go = np.zeros_like(self.o)

#activation function. simple nonlinearity, converts influx floats into probabilities between 0 and 1
#same as in rnn class
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))

#The derivativeative of the sigmoid function. We need it to compute gradients for backpropagation
def dsigmoid(self, x):
return self.sigmoid(x) * (1 - self.sigmoid(x))

#Within the LSTM cell we are going to use a tanh activation function.
#Long story short: This leads to stronger gradients (derivativeatives are higher) which is usefull as our data is centered around 0,
def tangent(self, x):
return np.tanh(x)

def dtangent(self, x):
return 1 - np.tanh(x)**2

#Here is where magic happens: We apply a series of matrix operations to our input and compute an output
def forwardProp(self):
f = self.sigmoid(np.dot(self.f, self.x))
self.cs *= f
i = self.sigmoid(np.dot(self.i, self.x))
c = self.tangent(np.dot(self.c, self.x))
self.cs += i * c
o = self.sigmoid(np.dot(self.o, self.x))
self.y = o * self.tangent(self.cs)
return self.cs, self.y, f, i, c, o

def backProp(self, e, pcs, f, i, c, o, dfcs, dfhs):
#error = error + hidden state derivativeative. we clip the value between -6 and 6 to prevent vanishing
e = np.clip(e + dfhs, -6, 6)
#multiply error by activated cell state to compute output derivativeative
do = self.tangent(self.cs) * e
#output update = (output derivative * activated output) * input
ou = np.dot(np.atleast_2d(do * self.dtangent(o)).T, np.atleast_2d(self.x))
#derivativeative of cell state = error * output * derivativeative of cell state + derivative cell
#compute gradients and update them in reverse order!
dcs = np.clip(e * o * self.dtangent(self.cs) + dfcs, -6, 6)
#derivativeative of cell = derivativeative cell state * input
dc = dcs * i
#cell update = derivativeative cell * activated cell * input
cu = np.dot(np.atleast_2d(dc * self.dtangent(c)).T, np.atleast_2d(self.x))
#derivativeative of input = derivative cell state * cell
di = dcs * c
#input update = (derivative input * activated input) * input
iu = np.dot(np.atleast_2d(di * self.dsigmoid(i)).T, np.atleast_2d(self.x))
#derivative forget = derivative cell state * all cell states
df = dcs * pcs
#forget update = (derivative forget * derivative forget) * input
fu = np.dot(np.atleast_2d(df * self.dsigmoid(f)).T, np.atleast_2d(self.x))
#derivative cell state = derivative cell state * forget
dpcs = dcs * f
#derivative hidden state = (derivative cell * cell) * output + derivative output * output * output derivative input * input * output + derivative forget
#* forget * output
dphs = np.dot(dc, self.c)[:self.output] + np.dot(do, self.o)[:self.output] + np.dot(di, self.i)[:self.output] + np.dot(df, self.f)[:self.output]
#return update gradients for forget, input, cell, output, cell state, hidden state
return fu, iu, cu, ou, dpcs, dphs

def update(self, fu, iu, cu, ou):
#Update forget, input, cell, and output gradients
self.Gf = 0.9 * self.Gf + 0.1 * fu**2
self.Gi = 0.9 * self.Gi + 0.1 * iu**2
self.Gc = 0.9 * self.Gc + 0.1 * cu**2
self.Go = 0.9 * self.Go + 0.1 * ou**2

#Update our gates using our gradients
self.f -= self.learning_rate/np.sqrt(self.Gf + 1e-8) * fu
self.i -= self.learning_rate/np.sqrt(self.Gi + 1e-8) * iu
self.c -= self.learning_rate/np.sqrt(self.Gc + 1e-8) * cu
self.o -= self.learning_rate/np.sqrt(self.Go + 1e-8) * ou
return

That’s for the definition of our RNN-LSTM network. Now we merely load some sequential data, in this case, a research paper of mine and try to predict the next word of each given the word. We export the preprocessed text data and store it as a plain text file.

def read_text_file():
#Open textfile and return a separated input and output variable(series of words)
with open("main.txt", "r") as text_file:
text = list(data)
outputSize = len(text)
data = list(set(text))
uniqueWords, dataSize = len(data), len(data)
returnData = np.zeros((uniqueWords, dataSize))
for i in range(0, dataSize):
returnData[i][i] = 1
returnData = np.append(returnData, np.atleast_2d(data), axis=0)
output = np.zeros((uniqueWords, outputSize))
for i in range(0, outputSize):
index = np.where(np.asarray(data) == text[i])
output[:,i] = returnData[0:-1,index[0]].astype(float).ravel()
return returnData, uniqueWords, output, outputSize, data

#Write the predicted output (series of words) to disk
def export_to_textfile(output, data):
finalOutput = np.zeros_like(output)
prob = np.zeros_like(output[0])
outputText = ""
print(len(data))
print(output.shape[0])
for i in range(0, output.shape[0]):
for j in range(0, output.shape[1]):
prob[j] = output[i][j] / np.sum(output[i])
outputText += np.random.choice(data, p=prob)
with open("output.txt", "w") as text_file:
text_file.write(outputText)
return

Finally, we can put all pieces together, read the text file from disk, train our model and take a look at the predicted output.  For a given error threshold our model performs a forward propagation step and predicts an output sample.

print("Initialize Hyperparameters")
iterations = 10000
learningRate = 0.001
returnData, numCategories, expectedOutput, outputSize, data = read_text_file()
print("Reading from disk done. Proceeding with training.")
#Initialize the RNN using our hyperparameters and data
RNN = RecurrentNeuralNetwork(numCategories, numCategories, outputSize, expectedOutput, learningRate)

#training time!
for i in range(1, iterations):
#Predict the next word of each word
RNN.forwardProp()
#update all our weights using our error
error = RNN.backProp()
#For a given error threshold
print("Reporting error on iteration ", i, ": ", error)
if error > -10 and error < 10 or i % 10 == 0:
#We provide a seed word
seed = np.zeros_like(RNN.x)
maxI = np.argmax(np.random.random(RNN.x.shape))
seed[maxI] = 1
RNN.x = seed
#and predict the upcoming one
output = RNN.sample()
print(output)
#finally, we store it to disk
export_to_textfile(output, data)
print("Done Writing")
print("Train/Prediction routine complete.")

The lines of code that I produced in this post are far from what I expected in the first place. Nevertheless, building fundamental models like the RNN-LSTM helps to clarify it’s idea and functionality which is easily overlooked.