Creating a Tensorflow Dataset for an image recognition task
Use case
You have a folder called data
, in which there are two additional folders train
and test
. The train
folder contains hundreds to millions of jpeg files, each with a descriptive label in the file name. The test
folder contains samples for which we do not yet know the label (we will use our trained model to estimate that). These are jpeg images without any descriptive label in the file name.
Problem:
We use a GPU (or TPU) to train Deep Neural Networks. To use such 'accelerated hardware' most efficiently, you should feed data fast enough to keep them busy. If your data stored as thousands of individual files (i.e. images), you may not be utilizing your GPU at maximum throughput.
Solution:
You should split your data across several larger files, and stream from multiple files in parallel
For maintaining efficient high-throughput for GPU computation
Tensorflow has strategies for this scenario:
- First, we batch the numerous small files in a TFRecord file
- Then, we use the power of
tf.data.Dataset
to read from multiple files in parallel.
The TFRecord file format is a record-oriented binary format. If your input data are on disk or working with large data then TensorFlow recommended using TFRecord format. You get a significant impact on the performance of your input pipeline. Binary data takes less space on disk, takes less time to copy and can be read more efficiently from disk.
import tensorflow as tf
import matplotlib.pyplot as plt
from tqdm import tqdm
from matplotlib.image import imread
import numpy as np
import os, json, random, requests, glob, math
tf.data.experimental.AUTOTUNE
, or simply AUTO
is used in the tf.data.Dataset API
as a strategy for efficient use of your hardware. It will prompt the tf.data
runtime to tune the value dynamically at runtime. This will be used here in the following contexts:
- set and optimize the number of parallel calls for functions that apply transformations to imagery (cropping, resizing, etc), using
dataset.map
calls - Set and optimize the number of parallel calls for functions that load data into memory while models train, using
dataset.interleave
calls - Set and optimize the number of parallel calls for functions that load data into asynchronously memory while models train, using
dataset.prefetch
calls. These use a background thread and an internal buffer to prefetch elements from the input dataset ahead of the time they are requested.
You could familiarize yourself with these topic by following this guide and then this one
os.environ["TF_DETERMINISTIC_OPS"] = "1"
SEED=42
np.random.seed(SEED)
AUTO = tf.data.experimental.AUTOTUNE
You have a folder called data
, in which there are two additional folders train
and test
. The train
folder contains hundreds to millions of jpeg files, each with a descriptive label in the file name. The test
folder contains samples for which we do not yet know the label (we will use our trained model to estimate that). These are jpeg images without any descriptive label in the file name.
In this example below, we have a folder of more than 250,000 images of cats and dogs. Each jpeg file has either 'cat' or 'dog' in the file name. This workflow would apply to situations where you had more than 2 classes, as I will explain in a later post (illustrated using a different data set)
Creating TFRecords of images and discrete labels
Discrete labels in this sense means two things:
- There is only one label per image (known as single-label classification, to distingusih from multi-label classification where there are many labels per image)
- There is no overlap between labels, such that each label can be represented by a discrete number, i.e. an integer, counting up from zero.
We first have to define some variables - how many individual TFRecords per dataset -- called shards
-- do we wish to make? (16)
How large are the square images we will use, in pixels? (160)
What are the classes? (cat, and dog, as a list of binary strings). Note that the class labels must be binary strings (b'binary string'
) rather than regular strngs ('string'
)
SHARDS = 16
TARGET_SIZE=160
CLASSES = [b'cat', b'dog']
Next we need to define the number of images in the train directory, nb_images
, which is used to define how large each shard will be. Later, nb_images
will be used for defining the number of model training and validation steps per epoch - more on that later.
nb_images=len(tf.io.gfile.glob('data/train/*.jpg'))
shared_size = math.ceil(1.0 * nb_images / SHARDS)
To create a TFRecord, we need a function that reads an image from a file, and strips the class name from the file name. This will be different for each dataset and therefore the below function would need modification for each new dataset. In the below, the label
is extracted by first stripping the file separator out and removing all the file path before the file name (label = tf.strings.split(img_path, sep='/')
), then taking everything before the dot (label = tf.strings.split(label[-1], sep='.')
), then finally taking just the first item of the resulting list of file name parts label[0]
. The image is simply read in as a binary string of bytes, then reconstructed into the jpeg format using the handy utility tf.image.decode_jpeg
.
def read_image_and_label(img_path):
bits = tf.io.read_file(img_path)
image = tf.image.decode_jpeg(bits)
label = tf.strings.split(img_path, sep='/')
label = tf.strings.split(label[-1], sep='.')
return image,label[0]
Neural nets often use imagery that is not at full resolution, due to memory limitations on GPUs. In this course, downsizing and cropping of imagery will be a fairly common practice. The function below carries out a resizing to the desired TARGET_SIZE
(keeping track of which horizontal dimension is the largest), the crops to square
def resize_and_crop_image(image, label):
w = tf.shape(image)[0]
h = tf.shape(image)[1]
tw = TARGET_SIZE
th = TARGET_SIZE
resize_crit = (w * th) / (h * tw)
image = tf.cond(resize_crit < 1,
lambda: tf.image.resize(image, [w*tw/w, h*tw/w]), # if true
lambda: tf.image.resize(image, [w*th/h, h*th/h]) # if false
)
nw = tf.shape(image)[0]
nh = tf.shape(image)[1]
image = tf.image.crop_to_bounding_box(image, (nw - tw) // 2, (nh - th) // 2, tw, th)
return image, label
When a TFRecord is read back into memory, it is just a string of bytes, so the following function makes sure those bytes get encoded back into jpeg format (with no chroma subsampling)
def recompress_image(image, label):
image = tf.cast(image, tf.uint8)
image = tf.image.encode_jpeg(image, optimize_size=True, chroma_downsampling=False)
return image, label
You should stuff data in a protocol buffer called Example
. Example protocol buffer contains Features
. The feature is a protocol to describe the data and could have three types: bytes (images), float (floating point labels), and int64 (discrete labels).
def _bytestring_feature(list_of_bytestrings):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=list_of_bytestrings))
def _int_feature(list_of_ints): # int64
return tf.train.Feature(int64_list=tf.train.Int64List(value=list_of_ints))
def _float_feature(list_of_floats): # float32
return tf.train.Feature(float_list=tf.train.FloatList(value=list_of_floats))
We serialize the protocol buffer to a string and write it to a TFRecords files.
def to_tfrecord(img_bytes, label):
class_num = np.argmax(np.array(CLASSES)==label)
feature = {
"image": _bytestring_feature([img_bytes]), # one image in the list
"class": _int_feature([class_num]), # one class in the list
}
return tf.train.Example(features=tf.train.Features(feature=feature))
Get a list of files and shuffle them, then create a mapping to link them to the jpeg files so they can be read on the fly
dataset = tf.data.Dataset.list_files('data/train/*.jpg', seed=10000) # This also shuffles the images
dataset = dataset.map(read_image_and_label)
Next, create a new mapping to the resize and crop function
dataset = dataset.map(resize_and_crop_image, num_parallel_calls=AUTO)
Finally, a mapping to the jpeg recompression function, and then set the batch
which dictates how many images per shard
dataset = dataset.map(recompress_image, num_parallel_calls=AUTO)
dataset = dataset.batch(shared_size)
Okay, now the dataset is set up, we can begin writing the data to TFRecords. The following function oads image files, resizes them to a common size and then stores them across NUM_SHARDS
TFRecord files. It reads from files in parallel and disregards the order of the data in favour of reading speed. Selecting each (random) pair of image and label sequentially, we call the to_tfrecord
function we defined earlier to create the example
record, then that is serialized to string and appended to the file. When the requisite number of images for a shard
has been reached, a new shard
is created.
for shard, (image, label) in enumerate(dataset):
shard_size = image.numpy().shape[0]
filename = "cat_dog" + "{:02d}-{}.tfrec".format(shard, shard_size)
with tf.io.TFRecordWriter(filename) as out_file:
for i in range(shard_size):
example = to_tfrecord(image.numpy()[i],label.numpy()[i])
out_file.write(example.SerializeToString())
print("Wrote file {} containing {} records".format(filename, shard_size))
Preparing for model training
We now need some functions to read those TFRecords in and use them during model training. We wish to parallelize the data loading step as much as possible; individual images and labels are small, but there are many thousands of them, so we need to ensure both fast and also consistent rate of data throughput. We acheive this by interleaving the contents of the datasets. The number of datasets to overlap can be specified by the cycle_length argument (set to 16
here). This is where we also see many of the benefits of tf.data.experimental.AUTOTUNE
, or simply AUTO
is used in the tf.data.Dataset API
to tune the value dynamically at runtime.
def get_batched_dataset(filenames):
option_no_order = tf.data.Options()
option_no_order.experimental_deterministic = False
dataset = tf.data.Dataset.list_files(filenames)
dataset = dataset.with_options(option_no_order)
dataset = dataset.interleave(tf.data.TFRecordDataset, cycle_length=16, num_parallel_calls=AUTO)
dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)
dataset = dataset.cache() # This dataset fits in RAM
dataset = dataset.repeat()
dataset = dataset.shuffle(2048)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # drop_remainder will be needed on TPU
dataset = dataset.prefetch(AUTO) #
return dataset
We call that function for both training and validation subsets, that we will define below
def get_training_dataset():
return get_batched_dataset(training_filenames)
def get_validation_dataset():
return get_batched_dataset(validation_filenames)
The following function will read an individual example (random image, label pair) in a TFRecord. Extract the tf.train.Example
protocol buffer messages from a TFRecord-format file. Each tf.train.Example
record contains one or more “features”, and the input pipeline typically converts these features into tensors.
def read_tfrecord(example):
features = {
"image": tf.io.FixedLenFeature([], tf.string), # tf.string = bytestring (not text string)
"class": tf.io.FixedLenFeature([], tf.int64), # shape [] means scalar
}
# decode the TFRecord
example = tf.io.parse_single_example(example, features)
image = tf.image.decode_jpeg(example['image'], channels=3)
image = tf.cast(image, tf.float32) / 255.0
image = tf.reshape(image, [TARGET_SIZE,TARGET_SIZE, 3])
class_label = tf.cast(example['class'], tf.int32)
return image, class_label
Here we set the batch size. This is a hyperparameter (i.e. set by you, not by model training) and its value is dictated (for the most part) by GPU memory considerations. We are using small imagery, so we can fit a relatively large batch into memory at once. We'll go for something relatively high (> say, 20)
BATCH_SIZE = 32
This bit of code just makes sure that the dataset will be read correctly from the TFRecord files. We find them all using glob
pattern matching (using 'cat*.tfrec') to form an input dataset, then use the .take()
command to grab 1
batch. Print the labels out to screen - they should be integers. We also print the image dimensions out to ensure they are correct
training_filenames=tf.io.gfile.glob('cat*.tfrec')
train_ds = get_training_dataset()
for imgs,lbls in train_ds.take(1):
print(lbls)
print(imgs.shape)
We wrote all of our images out to TFRecords - we didn't split them into test and validation sets first. That gives us more flexibility to assign train/validation subsets (called splits
) here. Below we define VALIDATION_SPLIT
, which is the proportion of the total data that will be used for validation. The rest will be used for training. We grab the filenames (again). These are already shuffled, but we can shuffle yet again to ensure the images really do get drawn as randomly as possible from the deck. Then we define train and validation file lists based on the split.
VALIDATION_SPLIT = 0.19
filenames=tf.io.gfile.glob('cat*.tfrec')
random.shuffle(filenames)
split = int(len(filenames) * VALIDATION_SPLIT)
training_filenames = filenames[split:]
validation_filenames = filenames[:split]
During model training, one epoch provides the model an opportunity to 'see' the entire dataset. So the number of steps per epoch is essentially the number of unique samples of your dataset divided by the batch size.
validation_steps = int(nb_images // len(filenames) * len(validation_filenames)) // BATCH_SIZE
steps_per_epoch = int(nb_images // len(filenames) * len(training_filenames)) // BATCH_SIZE
Model training using TFRecords
To demonstrate training using this workflow, we choose a simple (so-called vanilla
) model that we construct using a few convolutional filter blocks of increasing size, interspersed with MaxPooling
layers, and finally a global pooling and a classifier head layer. This is very similar in design to dozens of examples you can find online using toy datasets such as this. This isn't necessarily the most optimal or powerful model for this or any other dataset, but it'll do fine for demonstration (and will actually likely to be close to optimal considering the relative simplicity of the data/problem)
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(kernel_size=3, filters=16, padding='same', activation='relu', input_shape=[TARGET_SIZE,TARGET_SIZE, 3]),
tf.keras.layers.Conv2D(kernel_size=3, filters=32, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(kernel_size=3, filters=64, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(kernel_size=3, filters=128, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=2),
tf.keras.layers.Conv2D(kernel_size=3, filters=256, padding='same', activation='relu'),
tf.keras.layers.GlobalAveragePooling2D(),
tf.keras.layers.Dense(1,'sigmoid')
])
You must .compile
your model before you train, giving it an optimizer, loss function and a metric to keep track of. The options below are fairly standard - we use binary_crossentropy
- 'binary' because we only have two classes (otherwise you would choose 'categorical') and 'crossentropy' because this is an image recognition problem
model.compile(optimizer='Adam',
loss='binary_crossentropy',
metrics=['accuracy'])
Call .fit()
to train the model
model.fit(get_training_dataset(), steps_per_epoch=steps_per_epoch, epochs=10,
validation_data=get_validation_dataset(), validation_steps=validation_steps)
Model validation
This little function will convert the integer label into a string label
get_label = lambda x : 'cat' if (x==0) else 'dog'
Below we call the validation dataset (ds=get_validation_dataset()
) and plot one (ds.take(1)
) batch. The figure shows 8 x 4 example images, their actual labels and their model-predicted values
fig = plt.figure(figsize=(12,28))
cnt=1
ds=get_validation_dataset()
for imgs,lbls in ds.take(1):
predicted_classes=model.predict_classes(imgs)
for img,lbl,cl in zip(imgs,lbls,predicted_classes):
fig.add_subplot(8,4, cnt)
plt.title('obs: {} / est: {}'.format(get_label(lbl),get_label(cl[0])))
plt.imshow(img)
plt.axis('off')
cnt=cnt+1
We need a new function for model evaluation. This version creates batches, but doesn't repeat them (no dataset.repeat()
command)
def get_eval_dataset(filenames):
option_no_order = tf.data.Options()
option_no_order.experimental_deterministic = False
dataset = tf.data.Dataset.list_files(filenames)
dataset = dataset.with_options(option_no_order)
dataset = dataset.interleave(tf.data.TFRecordDataset, cycle_length=16, num_parallel_calls=AUTO)
dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)
dataset = dataset.cache() # This dataset fits in RAM
dataset = dataset.shuffle(2048)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # drop_remainder will be needed on TPU
dataset = dataset.prefetch(AUTO) #
return dataset
def get_validation_eval_dataset():
return get_eval_dataset(validation_filenames)
To get a global sense of the skill of the model, call .evaluate
on the entire validation set, which will use the model for prediction on all validation images, then compare the predicted versus observed labels for each, with what metrics
you used when you compiled the model before training (we used accuracy
). Print the mean accuracy in percent to screen.
loss, accuracy = model.evaluate(get_validation_eval_dataset())
print('Test Mean Accuracy: ', round((accuracy)*100, 2))
Model deployment
Apply to test (unseen) sample imagery - here I have limited to BATCH_SIZE
number of images just for illustration
test_filenames = glob.glob('data/test1/*.jpg')[:BATCH_SIZE]
len(test_filenames)
For prediction on raw imagery (rather than pre-processed tensors in the TFRecords file), we need a resizing and converting and normalizing function
def preprocess_image(image):
image = tf.image.resize(image, (TARGET_SIZE, TARGET_SIZE))
image = tf.image.convert_image_dtype(image, tf.float32)
image = image/255.
return image
Test using one image
im = preprocess_image(imread(test_filenames[13]))
plt.imshow(im)
predicted_classes=model.predict_classes(np.expand_dims(im,axis=0))
get_label(predicted_classes.squeeze())
Test on a whole batch
imgs = []
predicted_classes = []
for f in test_filenames:
im = preprocess_image(imread(f))
imgs.append(im)
predicted_classes.append(int(model.predict_classes(np.expand_dims(im,axis=0)).squeeze().astype('int')))
Make a similar plot as before, but this time we only have the model predicted class, not the ground truth class
fig = plt.figure(figsize=(12,28))
cnt=1
for img,cl in zip(imgs,predicted_classes):
fig.add_subplot(8,4, cnt)
plt.title('est: {}'.format(get_label(cl)))
plt.imshow(img)
plt.axis('off')
cnt=cnt+1