Coins classifier Neural Network: Head or Tail?

от автора

Home of this article.

The global objective of these articles is to build a coin classifier, capable of scanning your pocket change and find rare / valuable coins. This is a second article in a series, so let me remind you what happened earlier.

During previous step we got a rather large dataset composed of pairs of images, loaded from an online coins site meshok.ru. Those images were uploaded to the Internet by people we do not know, and though they are supposed to contain coin’s head in one image and tail in the other, we can not rule out a situation when we have two heads and no tail and vice versa. Also at the moment we have no idea which image contains head and which contains tail: this might be important when we feed data to our final classifier.

So let’s write a program to distinguish heads from tails. It is a rather simple task, involving a convolutional neural network that is using transfer learning.

Same way as before, we are going to use Google Colab environment, taking the advantage of a free video card they grant us an access to. We will store data on a Google Drive, so first thing we need is to allow Colab to access the Drive:

from google.colab import drive drive.mount("/content/drive/", force_remount=True)

Next step, we are going to install the Efficient Net. This is the pretrained network (remember I spoke about transfer learning?) that we use as a starting point, rather than training a network from scratch.

!pip install -q efficientnet  import efficientnet.tfkeras as efn

Next, i usually have a large «include» section, please note that some files may be included that are not really used: feel free to delete them:

import numpy as np import pandas as pd import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers  import sys import random  import os from os import listdir from os.path import isfile, join  from tensorflow.keras import regularizers from tensorflow.keras.optimizers import Adamax from tensorflow.keras.preprocessing.image import ImageDataGenerator from tensorflow.keras.preprocessing.image import array_to_img, img_to_array from tensorflow.keras import backend as K from tensorflow.keras.applications.vgg16 import VGG16,preprocess_input from tensorflow.keras.applications import InceptionResNetV2, Xception, NASNetLarge  from mpl_toolkits.mplot3d import Axes3D from sklearn.manifold import TSNE  from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Activation, Dropout, Flatten, Lambda, concatenate, BatchNormalization, GlobalAveragePooling2D from tensorflow.keras.callbacks import LambdaCallback from tensorflow.keras.callbacks import ModelCheckpoint from tensorflow.keras.models import Sequential from sklearn.neighbors import NearestNeighbors  import seaborn as sns import cv2 from tensorflow.python.keras.utils.data_utils import Sequence  import re

Let’s see which version of Tensorflow is used. This step is important, as Google is known for suddenly changing (increasing) versions:

import tensorflow as tf print(tf.__version__) tf.test.gpu_device_name()

The output in my case was:

2.4.0 '/device:GPU:0'

Then we do some additional initializations. Setting directories where our project is, and some subfolders for weight stored during training:

working_path = "/content/drive/My Drive/02_avers_or_revers/"  best_weights_filepath = working_path + "models/01_avers_or_revers.h5" last_weights_filepath = working_path + "models/01_avers_or_revers.h5"

We only train once, why would we do it every time, right? So we are going to use the boolean flag, if false, it means that training was already done, weights are stored in files, and instead of re-training, we can simply load those weights:

bDoTraining = True

We are going to scale down images to 256×256, use batch size 8 during training, and so on: here are constants we will need. Names are self-explainatory. We are also going to break our data to training images (used to tune network’s weights), validation images used to calculate performance on data the net never saw) and the rest (testing data, used to test the result).

IMAGE_SIZE = 256 input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)  BATCH_SIZE = 8  embedding_model = 0 alpha = 0.4  TRAINING_IMAGES_PERCENT = 0.6 VALIDATION_IMAGES_PERCENT = 0.2  IMAGE_ROTATION_ANGLE = 180

We have two classes for our classifier to distinguish between:

# Class name corresponds to a folder.  # Image path is "images" + class name + image name arrClasses = ["head", "tail"]    

Let’s load data by reading the «head» and «tail» folders’ content:

if(bDoTraining):   pdLabels = pd.get_dummies(arrClasses)   arrLabeledData = []    for cls in arrClasses:     arrImageNames = [f for f in listdir(working_path + "images/" + cls) if isfile(join(working_path, "images/", cls, f))]     arrLabeledData.append(     {       'class':cls,       'image_names':arrImageNames     })

Function to load images:

def loadImage(path):     img=cv2.imread(str(path))     #img = rotate_bound(img, angle)     img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE))     img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)     img = img.astype(np.float32)/255.     img = img.reshape(input_shape)          return img

For an array of image info (file names and so on), get max indexes of training, validation and testing subsets:

def getClassMinMax(cls, bIsTrain):   nLen = len(cls['image_names'])   if(bIsTrain):     nMinIdx = 0     nMaxIdx = nLen * TRAINING_IMAGES_PERCENT   else:     nMinIdx = nLen * TRAINING_IMAGES_PERCENT + 1     nMaxIdx = nLen * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT)        return int(nMinIdx), int(nMaxIdx)

It is always a good idea to make sure everything works as intended, so let’s test image loading:

if(bDoTraining):   nClassIdx = np.random.randint(len(arrLabeledData))   cls = arrLabeledData[nClassIdx]    nMinIdx, nMaxIdx = getClassMinMax(cls, False)   nImageIdx = random.randint(nMinIdx, nMaxIdx)    arrLabeledData[0]['class']   img = loadImage(join(working_path, "images/", cls['class'], cls['image_names'][nImageIdx]))#, 0)   #img = img.reshape((IMAGE_SIZE, IMAGE_SIZE))   print(cls['class'])   plt.imshow(img)   plt.show()

To make our dataset more diverse (augmentation), we might want to add noise to images:

def add_noise(img):     '''Add random noise to an image'''     VARIABILITY = 40     deviation = VARIABILITY*random.random() / 255.     noise = np.random.normal(0, deviation, img.shape)     img += noise     np.clip(img, 0., 1.)     return img

We will need the ImageDataGenerator to produce augmented images:

if(bDoTraining):   datagen = ImageDataGenerator(     samplewise_center=True,     rotation_range=IMAGE_ROTATION_ANGLE,     width_shift_range=0.1,     height_shift_range=0.1,     zoom_range=0.1 #[1, 1.2],     #preprocessing_function=add_noise   )

The following function is used to get an image by index from data we loaded earlier, using image data generator we just created:

def getImage(cClass, nImageIdx, datagen):   image_name = cClass['image_names'][nImageIdx]      #angle = random.randint(-180, 180)   img = loadImage(join(working_path, "images/", cClass['class'], cClass['image_names'][nImageIdx]))#, angle)      arrImg = img_to_array(img)   arrImg = datagen.random_transform(arrImg) # augmentation   arrImg = add_noise(arrImg)      return np.array(arrImg, dtype="float32")

Again, we need to make sure everything works, so let’s see what this function returns:

if(bDoTraining):   nClassIdx = np.random.randint(len(arrLabeledData))   cls = arrLabeledData[nClassIdx]    img = getImage(cls, 0, datagen)    print(cls['class'])   plt.imshow(img) #, cmap='gray')   plt.show()

If we do training and for some reason want to start it over, we need to delete network we saved by that time:

def deleteSavedNet(best_weights_filepath):     if(os.path.isfile(best_weights_filepath)):         os.remove(best_weights_filepath)         print("deleteSavedNet():File removed")     else:         print("deleteSavedNet():No file to remove") 

As we train our network, it accumulates «history». It is a good idea to be able to show it as a chart, this way we can often see if training can be improved:

def plotHistory(history, strParam1, strParam2):     plt.plot(history.history[strParam1], label=strParam1)     plt.plot(history.history[strParam2], label=strParam2)     #plt.title('strParam1')     #plt.ylabel('Y')     #plt.xlabel('Epoch')     plt.legend(loc="best")     plt.show()      def plotFullHistory(history):     arrHistory = []     for i,his in enumerate(history.history):         arrHistory.append(his)     plotHistory(history, arrHistory[0], arrHistory[2])         plotHistory(history, arrHistory[1], arrHistory[3]) 

Now a function that creates a model. It loads the EfficientNet, removes its last layers (the classifier) and attaches our own classifier, one we are going to train:

def createModel(nL2, optimizer):   global embedding_model    inputs = keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))     model_b0 = efn.EfficientNetB0(weights='imagenet', include_top=False)(inputs)   model_b0.trainable = False    model_concat = model_b0 #layers.concatenate([model_b0, model_vgg16]) #, model_x]) #model_b0      model_classifier = layers.Flatten(name="Flatten")(model_concat)      model_classifier = layers.Dense(32, kernel_regularizer=regularizers.l2(nL2), activation='relu', name="Dense128")(model_classifier)   model_classifier = layers.LeakyReLU(alpha=0.1, name="LeakyReLU")(model_classifier)   model_classifier = layers.Dropout(0.4, name="Dropout")(model_classifier)   base_model = layers.Dense(len(arrClasses), activation="softmax", kernel_regularizer=regularizers.l2(nL2), name="DenseEmbedding")(model_classifier)                   embedding_model = keras.Model(inputs=inputs, outputs=base_model, name="embedding_model")      embedding_model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=["accuracy"])     return embedding_model

The following class is used to produce batches of images (and labels) that are used during training. Sequence class that is used as a parent is a new standard of Keras (if you don’t want to use tfdata), it is highly paralelizeable and convenient:

from skimage.io import imread from skimage.transform import resize import numpy as np  # Here, `x_set` is list of path to the images # and `y_set` are the associated classes.      class MyImageDataGenerator(Sequence):       def __init__(self, bIsTrain):     self.batch_size = BATCH_SIZE     self.bIsTrain = bIsTrain      nNumOfTrainSamples = 10000     for cls in arrLabeledData:       nMin, nMax = getClassMinMax(cls, True)       nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin)      if(self.bIsTrain):       self.STEP_SIZE = nNumOfTrainSamples // BATCH_SIZE     else:       nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT)       self.STEP_SIZE = nNumOfValidSamples // BATCH_SIZE          if(self.STEP_SIZE < 100):       self.STEP_SIZE = 100      print("STEP_SIZE: ", self.STEP_SIZE, " (bIsTrain: ", bIsTrain, ")")    def __len__(self):     return self.STEP_SIZE    def __getitem__(self, idx):     arrBatchImages = []     arrBatchLabels = []      for i in range(self.batch_size):       arrClassIdx = np.random.randint(len(arrLabeledData))       cls = arrLabeledData[arrClassIdx]        nMinIdx, nMaxIdx = getClassMinMax(cls, self.bIsTrain)       nImageIdx = random.randint(nMinIdx, nMaxIdx)        img = getImage(cls, nImageIdx, datagen)       strLabel = cls['class']        arrBatchImages.append(img)       arrBatchLabels.append(pdLabels[strLabel].to_list())        return np.array(arrBatchImages), np.array(arrBatchLabels)

We will need two objects of this class, one for training and one for validation:

if(bDoTraining):   gen_train = MyImageDataGenerator(True)   gen_valid = MyImageDataGenerator(False)

As usual, we need a function to show image obtained this way:

def ShowImg(img, label):      print(label)      fig = plt.figure()   fig.add_subplot(1, 1, 1)   plt.imshow(img) #, cmap='gray')   plt.show()   plt.close()

And (again, as usual) we want to test the result:

if(bDoTraining):   (images, labels) = gen_valid.__getitem__(0) #next(gen_train)    for i, img in enumerate(images):     ShowImg(img, labels[i])     break

We want to be able to stop training any time and later start from where we left, so we need to save weights at the end of each epoch. To do it, we create a list of callbacks and use it during training.

def getCallbacks(monitor, mode): checkpoint = ModelCheckpoint(best_weights_filepath, monitor=monitor, save_best_only=True, save_weights_only=True, mode=mode, verbose=1)  save_model_at_epoch_end_callback = LambdaCallback(on_epoch_end=lambda epoch, logs: embedding_model.save_weights(last_weights_filepath))    callbacks_list = [checkpoint, save_model_at_epoch_end_callback]  # , early]  return callbacks_list

Also, we need to be able to load the model (to continue training or to do testing):

def loadModel(embedding_model, bBest):   if(bBest):     path = best_weights_filepath     strMessage = "load best model"   else:     path = last_weights_filepath     strMessage = "load last model"    if(os.path.isfile(path)):     embedding_model.load_weights(path)     print(strMessage, ": File loaded")   else:     print(strMessage, ": No file to load")    return embedding_model

The following function does actual training:

def trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning = False):   global embedding_model   global history   global arrImages   global arrLabels    if(bCumulativeLearning == False):     deleteSavedNet(best_weights_filepath)    random.seed(7)      embedding_model = createModel(nL2, optimizer)   print("Model created")      callbacks_list = getCallbacks("val_accuracy", 'max')            if(bCumulativeLearning == True):     loadModel(embedding_model, False)    nNumOfTrainSamples = 10000   for cls in arrLabeledData:     nMin, nMax = getClassMinMax(cls, True)     nNumOfTrainSamples = min(nNumOfTrainSamples, nMax - nMin)    STEP_SIZE_TRAIN = nNumOfTrainSamples // BATCH_SIZE   if(STEP_SIZE_TRAIN < 100):     STEP_SIZE_TRAIN = 100    nNumOfValidSamples = int(nNumOfTrainSamples * VALIDATION_IMAGES_PERCENT / TRAINING_IMAGES_PERCENT)   STEP_SIZE_VALID = nNumOfValidSamples // BATCH_SIZE   if(STEP_SIZE_VALID < 100):     STEP_SIZE_VALID = 100    print(STEP_SIZE_TRAIN, STEP_SIZE_VALID)   print("Available metrics: ", embedding_model.metrics_names)    history = embedding_model.fit(gen_train,      validation_data=gen_valid, verbose=0,     epochs=EPOCHS, steps_per_epoch=STEP_SIZE_TRAIN,      validation_steps=STEP_SIZE_VALID, callbacks=callbacks_list)    print(nL2)   plotFullHistory(history)      # TBD: here, return best model, not last one   return embedding_model

As you can see, it does some initializations, and then calls Keras’s «fit» function.

Another data generator. This one reads images that we use AFTER network was trained. We don’t care about labels here, as we deal with test set (or pretend it is test data).

def data_generator_simple(arrAllImageNames, arrAllImageClasses):   i = 0   arrImages = []   arrImageLabels = []   arrImageClasses = []   for nImageIdx in range(len(arrAllImageNames)):     if(i == 0):       arrImages = []       arrImageNames = []       arrImageClasses = []            i += 1      strClass = arrAllImageClasses[nImageIdx]     strImageName = arrAllImageNames[nImageIdx]      #angle = random.randint(0, 90)     img = loadImage(join(working_path, "images/", strClass, strImageName)) #, angle)     arrImg = img_to_array(img)      #arrImg = datagen.random_transform(arrImg) #/ 255.     #arrImg = add_noise(arrImg)      arrImg = np.array(arrImg, dtype="float32")        arrImages.append(arrImg)     arrImageNames.append(strImageName)     arrImageClasses.append(strClass)      if i == BATCH_SIZE:       i = 0       yield np.array(arrImages), arrImageNames, arrImageClasses      raise StopIteration()

As usual, load image using this generator:

def ShowImgSimple(img, label):   print(label)    fig = plt.figure()    fig.add_subplot(1, 1, 1)   plt.imshow(img, cmap='gray')    plt.show()   plt.close()

And display it:

Using the generator above, we can load all test images and run prediction on them:

def getAllTestImages():   global embedding_model    arrAllImageNames = []   arrAllImageClasses = []    for cClass in arrLabeledData:     for nIdx in range(int(len(cClass['image_names']) * (TRAINING_IMAGES_PERCENT + VALIDATION_IMAGES_PERCENT)), len(cClass['image_names'])):        arrAllImageNames.append(cClass['image_names'][nIdx])       arrAllImageClasses.append(cClass['class'])    test_preds  = []   test_file_names = []   test_class_names = []    for imgs, fnames, classes in data_generator_simple(arrAllImageNames, arrAllImageClasses):     predicts = embedding_model.predict(imgs)     predicts = predicts.tolist()     test_preds += predicts     test_file_names += fnames     test_class_names += classes   test_preds = np.array(test_preds)    return test_preds, test_file_names, test_class_names

By the way, we can get accuracies for all our predictions:

def getAccuracy(test_preds, test_file_names, test_class_names):    nTotalSuccess = 0    for i, arrPredictedProbabilities in enumerate(test_preds):     nPredictedClassIdx = arrPredictedProbabilities.argmax()     gt_class = test_class_names[i]     predicted_class = arrClasses[nPredictedClassIdx]     if(predicted_class == gt_class):       nTotalSuccess += 1     else:       print("GT: ", gt_class, "; Pred: ", predicted_class, "; Probabilitires: ", arrPredictedProbabilities[0], ", ", arrPredictedProbabilities[1])       img = loadImage(join(working_path, "images/", gt_class, test_file_names[i]))#, 0)       plt.imshow(img)       plt.show()    nSuccess = nTotalSuccess / (i+1)    return nSuccess

Finally, here is the function that STARTS the training. It has somewhat confusing name «test»:

def test(EPOCHS, nL2, optimizer, learning_rate, bCumulativeLearning):   global embedding_model      embedding_model = trainNetwork(EPOCHS, nL2, optimizer, bCumulativeLearning)   print("loading best model")   embedding_model = loadModel(embedding_model, True)    test_preds, test_file_names, test_class_names = getAllTestImages()    # print("test_preds[0], test_file_names[0], test_class_names[0]: ", test_preds[0], test_file_names[0], test_class_names[0])    nSuccess = getAccuracy(test_preds, test_file_names, test_class_names)    print(">>> Accuracy on test set:", nSuccess, "<<<")

We can now call this function and therefore start training:

opt = tf.keras.optimizers.Adam(0.0002) ##Adamax(lr=0.0001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0) nL2 = 0.4  if(bDoTraining):   EPOCHS = 50   learning_rate=0.001    np.random.seed(7)   test(EPOCHS, nL2, opt, learning_rate, bCumulativeLearning=False)    embedding_model = loadModel(embedding_model, True)   embedding_model.save(best_weights_filepath)    # A full model is saved

After training is complete, we can run predictions on all test data:

if(bDoTraining):   nClassIdx = np.random.randint(len(arrLabeledData))   cls = arrLabeledData[nClassIdx]    nMinIdx, nMaxIdx = getClassMinMax(cls, False)   nImageIdx = random.randint(nMinIdx, nMaxIdx)    for i, nImageIdx in enumerate(range(nMinIdx, nMaxIdx)):     print(i+1, "of", nMaxIdx - nMinIdx)     img = loadImage(join(working_path, "images/", arrLabeledData[nClassIdx]['class'], arrLabeledData[nClassIdx]['image_names'][nImageIdx]))#, 0)      arrImg = img_to_array(img)     arrImg = np.array(arrImg, dtype="float32")        # ---      test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3))      nIdx = test_preds.argmax()     if(nClassIdx != nIdx):       print("GT: ", arrLabeledData[nClassIdx]['class'], "; Pred: ", arrClasses[nIdx])       plt.imshow(img)       plt.show()

Ok, our model is trained and tested on a test data set. Now we can actually USE it: we can load a HUGE set of images and classify them (see comments in code). Note that this code is written to work with image file names convention from previous step:

# Same as above in "test" section, but this time we process images from output folder # The "/content/drive/My Drive/01_Output/" is the output of the previous step, remember, we goi pairs of images, and  # now need to figure which ones are avers and which ones are revers?  images_source_path = "/content/drive/My Drive/01_Output/"  # We will save images by new names (with "head" or "tail" suffix) in this folder images_dest_path = working_path + "images_processed/"  arrSourceImageNames = [f for f in listdir(images_source_path) if isfile(join(images_source_path, f))]  # Create model and load its weights (ones we got during training) embedding_model = createModel(nL2, opt) embedding_model = loadModel(embedding_model, True)  # Dictionary will store image names and counter: see below for details dictNames = {}  nTotal = len(arrSourceImageNames) for i, file_name in enumerate(arrSourceImageNames):    image_path = join(images_source_path, file_name)   img = loadImage(image_path)    arrImg = img_to_array(img)   arrImg = np.array(arrImg, dtype="float32")      # ---    # For image, predict its class   test_preds = embedding_model.predict(arrImg.reshape(1, IMAGE_SIZE, IMAGE_SIZE, 3))    nIdx = test_preds.argmax()   #print(i+1, "of", nTotal, ": ", arrClasses[nIdx])   #plt.imshow(img)   #plt.show()    # Split image name   word_list = file_name.split(".")  # ['0_000_00', 'png']   image_name = word_list[0]   image_ext = word_list[1]    plt.imsave(images_dest_path + image_name + "_" + arrClasses[nIdx] + ".png", img)    # Now we need to move source file to trash, but make it zero size first so it doesn't take space there   open(image_path, 'w').close() #overwrite and make the file blank instead   os.remove(image_path)    if(i%100 == 0):     print(i, " of ", nTotal)    # File names look like 123496110_07_03.   # Here 123496110 is the file root name, 07 is number of a coin in that image (some images contain >1 coins), and 03 is a number of images of that coin    #    (Say, we have 169860023_000.jpg, 169860023_001.jpg, 169860023_002.jpg, one coin per image, tail-tail-head. Then at step 1 we will get   #    169860023_00_00, 169860023_00_01, and 169860023_00_02)    # We append _head or _tail: 169860023_00_00_tail(.png)   # In dictNames we keep pairs 169860023_00 + flag. Flag == 0 if no heads, no tails, 1 if heads / no tails, 2 if tails / no heads and 3 if has both   # When scanning is complete, we delete files that have flag != 3        arrImageNameParts = image_name.split("_")  # ['169860023', '000', '00']   # We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus    # number of a coin (00)   coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2]    if(arrClasses[nIdx] == "head"):       if coin_name in dictNames:         dictNames[coin_name] = dictNames[coin_name] | 1       else:         dictNames[coin_name] = 1   else:       if coin_name in dictNames:         dictNames[coin_name] = dictNames[coin_name] | 2       else:         dictNames[coin_name] = 2  #print(dictNames)  # Now we need to delete all files for which dictNames[coin_name] != 3 print("Deleting files that do not have both head and tail")  nDeleted = 0 for i, file_name in enumerate(arrSourceImageNames):   image_path = join(images_dest_path, file_name)      word_list = file_name.split(".")  # ['0_000_00', 'png']      image_name = word_list[0]   image_ext = word_list[1]      arrImageNameParts = image_name.split("_")  # ['169860023', '000', '00']   # We do not need "000" here, as it is just number of an image in a group of images for that coin. We need name (169860023) of course, plus    # number of a coin (00)   coin_name = arrImageNameParts[0] + "_" + arrImageNameParts[2]      if (coin_name not in dictNames) or (dictNames[coin_name] != 3):     open(image_path, 'w').close() #overwrite and make the file blank instead     os.remove(image_path)    if(i%100 == 0):     print(i, " of ", nTotal)  print("Deleted", nDeleted)

As the result, we have file names with «_head» or «_tail» suffix, and coins that have no pair are removed.


ссылка на оригинал статьи https://habr.com/ru/articles/540324/