Deep Learning, Keras, Python, Tensorflow, Time Series

Heart arrhythmia detection using Deep Learning

How to combine CNNs with LSTM for time series classification using Tensorflow / Keras

A common problem that Deep Learning is helping to solve lately involves time series classification. A classic approach to this kind of problem is generating features from the signals we have and training a machine learning model. The process of handcrafting features might take a great chunk of your project schedule. For that matter, employing a combination of a Convolutional Neural Network with a Long Short-Term Memory Recurrent Neural Network. This architecture has proven to be effective in order to reduce the amount of time spent on feature engineering. In this article we’re going to train a couple of models to detect irregular heart rhythms. The idea is to show step by step how to build a sequence model for time series classification. If you want a deep understanding of this problem, I highly recommend this article from the Stanford ML Group.

Processing Dataset

For this experiment we’ll be using the MIT-BIH Arrhythmia Database that contains 48 half-hour excerpts of two-channel ambulatory ECG recordings, obtained from 47 subjects. The dataset is annotated by a cardiologist, all the labels and a full explanation of how the data was collected can be found here. For the purpose of this experiment, let’s consider only two classes: normal beats and abnormal beats. Invalid beats were discarded from the labels. These are considered labels, the ones not on these lists are marked as normal.
invalid_beat = [
    "[", "!", "]", "x", "(", ")", "p", "t", 
    "u", "`", "'", "^", "|", "~", "+", "s", 
    "T", "*", "D", "=", '"', "@"
]

abnormal_beats = [
    "L", "R", "B", "A", "a", "J", "S", "V", 
    "r", "F", "e", "j", "n", "E", "/", "f", "Q", "?"
]
Each record is composed of a signal and its annotations. Both files are read with the library Python WFDB. The record can be read using:
record = wfdb.rdrecord(filename)
annotation = wfdb.rdann(filename, "atr")
The annotation contains each beat annotation. Each sample in our model will be a beat label as target and a sequence surrounding this beat as input (3 seconds each side). Now let’s define two functions that will help us to process the records. One to simply classify the record, given the list above, into a normal vs. abnormal beat. Another function will build a sample for our CNN/LSTM 1D model.
# Function to classify beats in normal (0) / abnormal (1)
def classify_beat(symbol):
    if symbol in abnormal_beats:
        return 1
    elif symbol == "N" or symbol == ".":
        return 0  
      
# Function to return a sequence surrounding a beat 
# with window_size for each side
def get_sequence(signal, beat_loc, window_sec, fs):
    window_one_side = window_sec * fs
    beat_start = beat_loc - window_one_side
    beat_end = beat_loc + window_one_side + 1
    if beat_end < signal.shape[0]:
        sequence = signal[beat_start:beat_end, 0]
        return sequence.reshape(1, -1, 1)
    else:
        return np.array([])
To better understand the second function, we need to understand how the input for the CNN or LSTM model is formatted. You’ll have to input a three dimensional array with the shape (batch_size, sequence_size, number_of_features). In this case, each sample would be (1, sequence_size, 1) since we have only one feature in our model. Later, you’re going to see that for CNN with LSTM we also need to define another dimension (subsequences). Next, the following code will build a list of sequences (inputs) list of labels (target) and a map for each patient. The idea is to stratify for training and validation based on the rate of abnormal beats.
all_sequences = []
all_labels = []
window_sec = 3
subject_map = []
for subject in records:
    record = wfdb.rdrecord(f'mit-bih-arrhythmia-database-1.0.0/{subject}')
    annotation = wfdb.rdann(f'mit-bih-arrhythmia-database-1.0.0/{subject}', 'atr')
    atr_symbol = annotation.symbol
    atr_sample = annotation.sample
    fs = record.fs
    # Normalizing by mean and standard deviation
    scaler = StandardScaler()
    signal = scaler.fit_transform(record.p_signal)
    subject_labels = []
    for i, i_sample in enumerate(atr_sample):
        label = classify_beat(atr_symbol[i])
        sequence = get_sequence(signal, i_sample, window_sec, fs)
        if label is not None and sequence.size > 0:
            all_sequences.append(sequence)
            subject_labels.append(label)

    normal_percentage = sum(subject_labels) / len(subject_labels)
    subject_map.append({
        "subject": subject,
        "percentage": normal_percentage,
        "num_seq": len(subject_labels),
        "start": len(all_labels),
        "end": len(all_labels)+len(subject_labels)
    })
    all_labels.extend(subject_labels)

Training model

Let’s try different models to train our classifiers. The main purpose here is to show how to setup the keras layers, the architectures aren’t very well designed. The first one is a CNN only model.
from keras.models import Sequential
from keras.layers import Conv1D, Flatten, Dense, Dropout
from keras.optimizers import Adam

sequence_size = X_train.shape[1]
n_features = 1

model = Sequential([
    Conv1D(
        filters=8,
        kernel_size=4,
        strides=1,
        input_shape=(sequence_size, n_features),
        padding="same",
        activation="relu"
    ),
    Flatten(),
    Dropout(0.5),
    Dense(
        1,
        activation="sigmoid",
        name="output",
    )
])

optimizer = Adam(lr=0.001)
# Compiling the model
model.compile(
    optimizer=optimizer,
    loss="binary_crossentropy",
    metrics=["accuracy"]
)
This model looks like something below:
To train the model let’s call the fit method from keras:
model.fit(
    X_train, 
    y_train, 
    batch_size=128,
    epochs=15,
    validation_data=(X_val, y_val)
)
With this architecture, we can accomplish an accuracy of 0.82 in the validation set. It might not mean an amazing model, especially because of the loss over epochs that doesn’t improve. Another common architecture used for sequential models is CNN with LSTM. The idea is to have a CNN layer working as a feature extraction layer and an LSTM to interpret the sequence of subsequences. This kind of architecture looks like this:
Building a model using the Sequential API doesn’t change much from what was done above.
sequence_size = X_train.shape[1]
n_features = 1 
n_subsequences = 4
subsequence_size = int(sequence_size / n_subsequences)

# Reshaping to be (samples, subsequences, sequence, feature)
X_train = X_train.reshape(-1, n_subsequences, subsequence_size, n_features)
X_val = X_val.reshape(-1, n_subsequences, subsequence_size, n_features)

cnn_lstm_model = Sequential([
    TimeDistributed(
        Conv1D(
            filters=8,
            kernel_size=4,
            strides=1,
            padding="same",
            activation="relu"
        ), 
        input_shape=(n_subsequences, subsequence_size, n_features)
    ),
    TimeDistributed(Flatten()),
    LSTM(
        units=1,
        input_shape=(sequence_size, n_features)
    ),
    Dense(
        1,
        activation="sigmoid",
        name="output",
    )
])

optimizer = Adam(lr=0.001)
# Compiling the model
cnn_lstm_model.compile(
    optimizer=optimizer,
    loss="binary_crossentropy",
    metrics=["accuracy"]
)
cnn_lstm_model.summary()
With this architecture, the accuracy on the validation set was 0.8. Next step, you can improve those models by reducing overfit through dropout layers or kernel regularizations. The code for this experiment is in the repo for this post.