Many-to-One RNN with tensorflow2/Keras

Ulf Hamster 5 min.
python tensorflow2 keras rnn recurrent neural network

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

Load packages

%%capture
!pip install tensorflow-gpu==2.0.0-beta1
import tensorflow as tf
print(tf.__version__)
print("GPU" if tf.test.is_gpu_available() else "CPU")
tf.random.set_seed(42)  # tensorflow seed
2.0.0-beta1
GPU
import numpy as np
#np.random.seed(42)  # numpy seed

Toy Dataset

Let's dream up some variable length sequences (seq). An $i$-th sequence has an observation $x_{i,t}$ at time $t$. An observation is a list of features.

Each sequence is associated with 1 output, or target. This output can be a scalar number or a vector of numbers.

X = []
y = []

# Sequence 1
seq = ((1, 0, 0), (0, 1, 0))
out = (3, 9)
X.append(seq)
y.append(out)

# Sequence 2
seq = ((0, 0, 1), (0, 0, 0), (0, 1, 1))
out = (7, 6)
X.append(seq)
y.append(out)

# Sequence 3
seq = ((0, 0, 1), (1, 1, 0), (1, 0, 0), (1, 1, 1), (0, 1, 0))
out = (5, 1)
X.append(seq)
y.append(out)

# Sequence 4
seq = ((1, 0, 1))
out = (6, 6)
X.append(seq)
y.append(out)

# Sequence 5
seq = ((1, 1, 0), (0, 0, 0), (1, 0, 0))
out = (4, 8)
X.append(seq)
y.append(out)

# Sequence 6
seq = ((0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1))
out = (2, 1)
X.append(seq)
y.append(out)

# Sequence 7
seq = ((0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1))
out = (2, 7)
X.append(seq)
y.append(out)
np.c_[y, X]
array([[3, 9, ((1, 0, 0), (0, 1, 0))],
       [7, 6, ((0, 0, 1), (0, 0, 0), (0, 1, 1))],
       [5, 1, ((0, 0, 1), (1, 1, 0), (1, 0, 0), (1, 1, 1), (0, 1, 0))],
       [6, 6, (1, 0, 1)],
       [4, 8, ((1, 1, 0), (0, 0, 0), (1, 0, 0))],
       [2, 1, ((0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1))],
       [2, 7, ((0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1))]], dtype=object)

Padding Variable Length sequences of data points

tf2/Keras pad_sequences only works for sequences with just 1 feature. The following function pad_sequences_n_feat will process sequences with multiple features.

def pad_sequences_n_feat(sequences, padding='pre', maxlen=None, value=None):
    # use the len of the longest sequence
    if maxlen is None:
        maxlen = max([len(seq) for seq in sequences])
    # impute, e.g. (0, 0, 0), as padding value
    if value is None:
        value = [0 for _ in range(len(sequences[0][0]))]
    # loop over all sequences
    padded = []
    for seq in sequences:
        # ensure that 1 single observation is processed as list
        if not isinstance(seq[0], (list, tuple)):
            seq = [seq]
        # convert tuple to list
        seq = list(seq)
        # Padding
        if padding == 'pre':
            seq.reverse()
        while len(seq) < maxlen:
            seq.append(value)
        if padding == 'pre':
            seq.reverse()
        # Trucation (not implemented)
        # sequence is done
        padded.append(seq)
    # dataset is done
    return padded
Xp = pad_sequences_n_feat(X)
Xp
[[[0, 0, 0], [0, 0, 0], [0, 0, 0], (1, 0, 0), (0, 1, 0)],
 [[0, 0, 0], [0, 0, 0], (0, 0, 1), (0, 0, 0), (0, 1, 1)],
 [(0, 0, 1), (1, 1, 0), (1, 0, 0), (1, 1, 1), (0, 1, 0)],
 [[0, 0, 0], [0, 0, 0], [0, 0, 0], [0, 0, 0], (1, 0, 1)],
 [[0, 0, 0], [0, 0, 0], (1, 1, 0), (0, 0, 0), (1, 0, 0)],
 [[0, 0, 0], (0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1)],
 [[0, 0, 0], (0, 0, 0), (0, 1, 1), (0, 1, 0), (0, 0, 1)]]
# Convert into 3D tensor (samples, time-steps, features)
n_samples = 7
n_timesteps = 5
n_features = 3

# convert to tensor
Xp = tf.convert_to_tensor(Xp)
Xp = tf.reshape(Xp, [n_samples, n_timesteps, n_features])   # [7, 5, 3]
y = tf.convert_to_tensor(y)

# Data Splitting
X_train, y_train = Xp[:4], y[:4]
X_test, y_test = Xp[4:], y[4:]


X_train, y_train, X_test, y_test
(<tf.Tensor: id=7, shape=(4, 5, 3), dtype=int32, numpy=
 array([[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         [1, 0, 0],
         [0, 1, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 1],
         [0, 0, 0],
         [0, 1, 1]],
 
        [[0, 0, 1],
         [1, 1, 0],
         [1, 0, 0],
         [1, 1, 1],
         [0, 1, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         [0, 0, 0],
         [1, 0, 1]]], dtype=int32)>,
 <tf.Tensor: id=11, shape=(4, 2), dtype=int32, numpy=
 array([[3, 9],
        [7, 6],
        [5, 1],
        [6, 6]], dtype=int32)>,
 <tf.Tensor: id=15, shape=(3, 5, 3), dtype=int32, numpy=
 array([[[0, 0, 0],
         [0, 0, 0],
         [1, 1, 0],
         [0, 0, 0],
         [1, 0, 0]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 1, 1],
         [0, 1, 0],
         [0, 0, 1]],
 
        [[0, 0, 0],
         [0, 0, 0],
         [0, 1, 1],
         [0, 1, 0],
         [0, 0, 1]]], dtype=int32)>,
 <tf.Tensor: id=19, shape=(3, 2), dtype=int32, numpy=
 array([[4, 8],
        [2, 1],
        [2, 7]], dtype=int32)>)

Modeling

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN

model = Sequential([
    SimpleRNN(units=4, use_bias=True, 
              input_shape=(n_timesteps, n_features),  # (5, 3)
              return_sequences=False),  # Off => ???-to-One RNN
    Dense(units=2, use_bias=True)  # 2x output neurons bc y is bivariate
])

model.compile(optimizer='adam', loss='mse')

Training

Let's train (overfit) our model with our 4 training samples

model.fit(X_train, y_train)
Train on 4 samples
4/4 [==============================] - 0s 87ms/sample - loss: 41.1231





<tensorflow.python.keras.callbacks.History at 0x7fc591a1c1d0>
print("RNN cell: 3x inputs to 4x hidden neurons")
tf.print(model.layers[0].weights[0])

print("\nRNN cell: 4x4 recurrent kernel of the 4x hidden neurons")
tf.print(model.layers[0].weights[1])

print("\nRNN cell: 4x bias for the hidden neurons")
tf.print(model.layers[0].weights[2])
RNN cell: 3x inputs to 4x hidden neurons
[[0.305709809 -0.110234246 -0.271408647 -0.0667655915]
 [-0.862493098 0.340946704 0.443611056 0.688633204]
 [-0.505745113 -0.51353991 -0.352093071 0.410685927]]

RNN cell: 4x4 recurrent kernel of the 4x hidden neurons
[[0.10498514 -0.443927079 0.413701177 -0.787657559]
 [-0.623301506 0.656780958 0.314388126 -0.287483305]
 [-0.534211934 -0.282273203 -0.739219487 -0.299787849]
 [-0.562198937 -0.541136682 0.427469343 0.454493076]]

RNN cell: 4x bias for the hidden neurons
[0.00100002962 -0.00100002973 0.000999985612 -0.00100002962]
print("Final Layer: 4x hidden/recurrent neurons to 2x output neurons")
tf.print(model.layers[1].weights[0])

print("\nFinal Layer: 2x bias")
tf.print(model.layers[1].weights[1])
Final Layer: 4x hidden/recurrent neurons to 2x output neurons
[[0.481735617 0.256092459]
 [-0.96623075 -0.31471023]
 [0.0222753081 -0.243491784]
 [-0.85256803 -0.95625937]]

Final Layer: 2x bias
[0.00100002927 0.00100002927]

Predict

The predictions are far off, of course.

y_pred = model.predict(X_test)
y_pred
array([[-0.34660533, -0.40281075],
       [-0.6561756 , -0.789058  ],
       [-0.6561756 , -0.789058  ]], dtype=float32)
tf.print(y_test)
[[4 8]
 [2 1]
 [2 7]]

Links