
Автор статьи: Рустем Галиев
IBM Senior DevOps Engineer & Integration Architect
Привет, Хабр! На связи Рустем, IBM Senior DevOps Engineer & Integration Architect.
Сегодня мы будем работать с открытым набором данных по рентгенографии грудной клетки которые, использовали для этого исследования, с предварительно обученной моделью MobileNet_v2 для классификации изображений TensorFlow и переносом обучения для создания классификатора пневмонии, который работает с рентгенограммами грудной клетки.
Целью этой статьи является не столько получение навыков классификации изображений, а сколько понимание того, насколько легко вы можете создать соответствующую модель.
Я надеюсь, вам понравится это!
Загрузка модулей и данных
Я знаю, вы хотите сразу приступить к разработке методов обнаружения пневмонии, но сначала давайте импортируем некоторые необходимые модули:
import numpy as np import os import pathlib import matplotlib.pylab as plt import tensorflow as tf import tensorflow_hub as hub import tensorflow_datasets as tfds tfds.disable_progress_bar() from tqdm import tqdm AUTOTUNE = tf.data.experimental.AUTOTUNE
Теперь давайте загрузим данные в память.
data_dir = pathlib.Path('tflite/images') image_count_train = len(list(data_dir.glob('train/*/*.jpeg'))) image_count_test = len(list(data_dir.glob('test/*/*.jpeg'))) image_count_val = len(list(data_dir.glob('val/*/*.jpeg'))) BATCH_SIZE = 32 IMG_HEIGHT = 224 IMG_WIDTH = 224 IMG_SHAPE = (IMG_HEIGHT, IMG_WIDTH, 3) STEPS_PER_EPOCH = np.ceil(image_count_train/BATCH_SIZE) EPOCHS = 10 SAVED_MODEL = "pneumonia_saved_model" (image_count_test, image_count_train, image_count_val)
Теперь, когда у нас есть все изображения BATCH_SIZE для обучения, мы настроим классы для обучения (метки):
CLASS_NAMES = np.array([item.name for item in data_dir.glob('train/*') if item.name != "LICENSE.txt"]) num_classes = len(CLASS_NAMES) CLASS_NAMES
Изображения находятся в папках train, test или val. Например, чтобы увидеть изображение, вам нужно выполнить:
pneumonia = list(data_dir.glob('train/PNEUMONIA/*.jpeg')) for image_path in pneumonia[:3]: Image.open(str(image_path))
Создание наборов данных
Теперь, когда все загружено, мы можем создать итераторы, которые будут давать каждому набор изображений для обучения. TensorFlow предлагает отличные утилиты для этой задачи в наборах данных TensorFlow:
test_ds = tf.data.Dataset.list_files(str(data_dir/'test/*/*')) train_ds = tf.data.Dataset.list_files(str(data_dir/'train/*/*')) val_ds = tf.data.Dataset.list_files(str(data_dir/'val/*/*')) for f in test_ds.take(5): print(f.numpy()) print('Datasets loaded')
На данном этапе мы видим, что у нас есть итераторы для каждой группы: обучение, тестирование и проверка. Теперь нам нужно сделать несколько вещей:
Мы должны декодировать каждое изображение в каналы RGB.
Мы должны изменить размер каждого изображения до наших предопределенных размеров.
Для каждого изображения мы должны вычислить метку, дающую 1 одному классу и 0 другому.
Мы должны применить каждый из этих шагов к итераторам, чтобы мы могли перебирать изображения с измененными размерами с их метками.
Для этих задач мы будем использовать следующие ютилити методы:
def get_label(file_path): parts = tf.strings.split(file_path, os.path.sep) return parts[-2] == CLASS_NAMES[0] def decode_img(img): img = tf.image.decode_jpeg(img, channels=3) img = tf.image.convert_image_dtype(img, tf.float32) return tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT]) def process_path(file_path): label = get_label(file_path) img = tf.io.read_file(file_path) img = decode_img(img) return img, label def format_image(image, label): image = tf.image.resize(image, IMAGE_SIZE) / 255.0 return image, label print('Utility methods loaded!')
Применив это к нашим итераторам, теперь мы можем итерировать тестовые примеры, чтобы они имели соответствующие шейпы:
train_examples = train_ds.map(process_path, num_parallel_calls=AUTOTUNE) test_examples = test_ds.map(process_path, num_parallel_calls=AUTOTUNE) validation_examples = val_ds.map(process_path, num_parallel_calls=AUTOTUNE) for image, label in test_examples.take(5): print("Image shape: ", image.numpy().shape) print("Label: ", label.numpy()) print('Check the shapes!')
Интересно отметить, что теперь каждая итерация набора данных выдает тензорное изображение и тензорную метку.
Теперь, на этом шаге, нам нужно пакетировать каждый набор данных, добавить кэш для повышения производительности и выполнить предварительную выборку по мере необходимости! Этот метод обычно повышает производительность пакетной обработки в 10 раз и взят непосредственно из учебника Google:
def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000): if cache: if isinstance(cache, str): ds = ds.cache(cache) else: ds = ds.cache() ds = ds.shuffle(buffer_size=shuffle_buffer_size) ds = ds.repeat() ds = ds.batch(BATCH_SIZE) ds = ds.prefetch(buffer_size=AUTOTUNE) return ds train_examples_dataset = prepare_for_training(train_examples) test_examples_dataset = prepare_for_training(test_examples) validation_examples_dataset = prepare_for_training(validation_examples)
Можем сделать вызов
image_batch, label_batch = next(iter(test_examples_dataset))
Приступим к определению модели!
Обучение модели
Теперь, когда у нас есть итератор, все дело в модели!
Все, что нам нужно сделать, это поместить линейный классификатор поверх слоя feature_extractor_layer с помощью модуля Hub.
Для скорости мы начинаем с необучаемого feature_extractor_layer, но вы также можете включить тонкую настройку для большей точности.
Модули-концентраторы для TensorFlow 1.x здесь не будут работать, поэтому мы можем использовать один из следующих вариантов:
module_selection = ("mobilenet_v2", 224, 1280) #or use ["(\"mobilenet_v2\", 224, 1280)", "(\"inception_v3\", 299, 2048)"] {type:"raw", allow-input: true} handle_base, pixels, FV_SIZE = module_selection MODULE_HANDLE ="https://tfhub.dev/google/tf2-preview/{}/feature_vector/4".format(handle_base) IMAGE_SIZE = (pixels, pixels) print("Using {} with input size {} and output dimension {}".format(MODULE_HANDLE, IMAGE_SIZE, FV_SIZE))
Обратите внимание, что мы используем вектор признаков, а не полную модель. Это потому, что мы не хотим тонкой настройки (для избежания проблем со временем). Однако, если вы хотите выполнить точную настройку, загрузите полную модель (она находится в TensorFlow Hub).
Загрузите модуль TFHub:
feature_extractor = hub.KerasLayer(MODULE_HANDLE, input_shape=IMAGE_SIZE + (3,), output_shape=[FV_SIZE], trainable=False) feature_extractor.trainable = False print("Building model with", MODULE_HANDLE) model = tf.keras.Sequential([ feature_extractor, tf.keras.layers.Dense(num_classes, activation='softmax')]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) model.summary()
Как видите, большинство наших параметров не поддаются обучению (см. параметр из MobileNet), поэтому обучение должно быть быстрым. Для обучения нам потребуется запустить следующее:
hist = model.fit(train_examples_dataset, epochs=EPOCHS, steps_per_epoch=image_count_train/BATCH_SIZE, validation_steps=np.floor(image_count_val/BATCH_SIZE), validation_data=validation_examples_dataset) tf.saved_model.save(model, SAVED_MODEL)
Мы можем проверить, что модель имеет правильную подпись, загрузив ее снова и показав информацию:
loaded = tf.saved_model.load(SAVED_MODEL) print(list(loaded.signatures.keys())) infer = loaded.signatures["serving_default"] print(infer.structured_input_signature) print(infer.structured_outputs)
Мы используем следующую команду, чтобы проверить, можете ли вы также использовать интерфейс командной строки TensorFlow для проверки подписи (вне Python):
saved_model_cli show --dir $1 --tag_set serve --signature_def serving_default
Это было невероятно легко; с трансферным обучением мы можем легко, в четыре строки кода, проделать работу целых исследовательских групп!
Теперь, когда у нас есть рабочая модель, давайте перейдем к работе по ее оптимизации.
Преобразование классификатора пневмонии TensorFlow в TensorFlow Lite с помощью квантования
мы можем легко преобразовать модель из обычного TensorFlow в TensorFlow Lite с помощью Python Converter API. Этот шаг необходим для запуска наших моделей на периферийных и мобильных устройствах.
Квантование с помощью конвертера TensorFlow Lite
Теперь, когда у нас есть сохраненный объект SavedModel, первое, что вам нужно сделать, чтобы преобразовать его в модель TensorFlow Lite, — создать экземпляр преобразователя:
import numpy as np import os import pathlib import matplotlib.pylab as plt import tensorflow as tf import tensorflow_hub as hub import tensorflow_datasets as tfds tfds.disable_progress_bar() from tqdm import tqdm AUTOTUNE = tf.data.experimental.AUTOTUNE SAVED_MODEL = "pneumonia_saved_model" converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL)
Помните, что мы можем создать конвертер из моделей SavedModel, ConcreteFunction или Keras!
Квантование после обучения
Простейшая форма квантования после обучения квантует от плавающей запятой до 8-битной точности. Этот метод включен в качестве опции в конвертере TensorFlow Lite. При выводе вес преобразуются из 8-битной точности в числа с плавающей запятой и вычисляются с использованием ядер с плавающей запятой. Это преобразование выполняется один раз и кэшируется для уменьшения задержки.
converter.optimizations = [tf.lite.Optimize.DEFAULT]
Эта оптимизация была сделана путем размышления между оптимизацией размера и задержки. Если бы мы хотели оптимизировать только размер, мы могли бы сделать следующее:
converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]
Точно так же мы можем преобразовать нашу модель, и она будет квантована:
tflite_model = converter.convert() tflite_model_file = 'converted_model.tflite' with open(tflite_model_file, "wb") as f: f.write(tflite_model) print('Done quantizing')
Это было удивительно просто и быстро, хотя квантование модели звучит сложно и красиво на бэкэнде, но ее весьма легко реализовать.
Проверка уменьшения размера
Давайте проверим, что квантованная модель действительно меньше:
from pathlib import Path saved_model = Path(SAVED_MODEL) full_model_size = sum(f.stat().st_size for f in saved_model.glob('**/*') if f.is_file() )/(1024*1024) print(f'Full model size {full_model_size} MB') converted_model = Path(tflite_model_file) converted_model_size = converted_model.stat().st_size / (1024*1024) print(f'Converted model size {converted_model_size} MB')

Мы видим, что за одно простое квантование мы увеличили размер почти на 80%
Мы можем добиться дальнейшего улучшения задержки, сокращения пикового использования памяти и доступа к аппаратным ускорителям только для целых чисел, убедившись, что вся математика модели квантована. Для этого нам нужно измерить динамический диапазон активаций и входов с репрезентативным набором данных. Таким образом, вы просто создадите генератор входных данных и предоставите его вашему конвертеру:
Для этого сначала вернём наш тестовый набор данных:
def get_label(file_path): parts = tf.strings.split(file_path, os.path.sep) return parts[-2] == CLASS_NAMES[0] @tf.autograph.experimental.do_not_convert def decode_img(img): img = tf.image.decode_jpeg(img, channels=3) img = tf.image.convert_image_dtype(img, tf.float32) return tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT]) @tf.autograph.experimental.do_not_convert def process_path(file_path): label = get_label(file_path) img = tf.io.read_file(file_path) img = decode_img(img) return img, label def format_image(image, label): image = tf.image.resize(image, IMAGE_SIZE) / 255.0 return image, label def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000): if cache: if isinstance(cache, str): ds = ds.cache(cache) else: ds = ds.cache() ds = ds.shuffle(buffer_size=shuffle_buffer_size) ds = ds.repeat() ds = ds.batch(BATCH_SIZE) ds = ds.prefetch(buffer_size=AUTOTUNE) return ds data_dir = pathlib.Path('tflite/images') BATCH_SIZE = 32 IMG_HEIGHT = 224 IMG_WIDTH = 224 IMG_SHAPE = (IMG_HEIGHT, IMG_WIDTH, 3) CLASS_NAMES = np.array([item.name for item in data_dir.glob('train/*') if item.name != "LICENSE.txt"]) test_ds = tf.data.Dataset.list_files(str(data_dir/'test/*/*')) test_examples = test_ds.map(process_path, num_parallel_calls=AUTOTUNE) test_examples_dataset = prepare_for_training(test_examples)
А теперь давайте определим репрезентативный набор данных. Это будет настроено на то, чтобы преобразователь выполнял некоторые выводы по мере квантования, чтобы поддерживать как можно большую точность, а также преобразовывать все возможные веса и активации в INT8:
def representative_data_gen(): for image_batch, label_batch in test_examples_dataset.take(1): for image in image_batch: yield [[image]] len(list(representative_data_gen())) converter.representative_dataset = representative_data_gen
Результирующая модель будет полностью квантована, но для удобства по-прежнему будет принимать входные и выходные данные с плавающей запятой.
Операции, которые не имеют квантованных реализаций, автоматически останутся с плавающей запятой. Это позволяет выполнять преобразование гладко, но может ограничивать развертывание ускорителями, поддерживающими float.
Полноцелочисленное квантование (необязательно, просто для знания)
Чтобы преобразователь выдавал только целочисленные операции, можно указать:
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
Однако имейте в виду, что если преобразователь не может найти поддерживаемую INT8-совместимую операцию с вашей моделью, он не будет работать. Этот шаг обычно необязателен, но в некоторых случаях, например при развертывании на TPU, он необходим, поскольку это оборудование поддерживает только операции INT8.
Преобразование и проверка модели
Наконец, давайте преобразуем нашу модель:
tflite_model = converter.convert() tflite_model_file = 'converted_model_int8.tflite' with open(tflite_model_file, "wb") as f: f.write(tflite_model) print('Done quantizing with Representative Dataset')
Сравнение размеров
Можно подумать, что новая переделанная модель меньше, но это не всегда так. Преобразование в операции INT8 в значительной степени сосредоточено на требованиях к памяти и скорости:
from pathlib import Path quantized_weights = Path('converted_model.tflite') weights_quantized_size = quantized_weights.stat().st_size/(1024*1024) print(f'Quantized for weights model size {weights_quantized_size} MB') weights_and_activations_model = Path('converted_model_int8.tflite') weights_and_activations_model_size = weights_and_activations_model.stat().st_size/(1024*1024) print(f'Quantized for weights and activations size {weights_and_activations_model_size} MB')

И мы видим, что оба размера одинаковы. На следующем этапе мы рассмотрим, что происходит со скоростью!
Протестируем модель TensorFlow Lite с помощью интерпретатора Python.
Теперь, когда у нас есть наши квантованные модели, мы можем протестировать их и проверить их точность!
Во-первых, давайте загрузим квантованную модель весов. Для этого нам нужно выделить тензоры для прогнозов:
weights_tflite_model_file = 'converted_model.tflite' interpreter = tf.lite.Interpreter(model_path=weights_tflite_model_file) interpreter.allocate_tensors() input_index = interpreter.get_input_details()[0]["index"] output_index = interpreter.get_output_details()[0]["index"]
Теперь давайте создадим простую партию из 15 изображений (из соображений производительности) и проверим ее показатели:
import time start_time = time.time() predictions = [] test_labels, test_imgs = [], [] debug = 0 image_batch, label_batch = next(iter(test_examples_dataset)) for img, label in zip(image_batch, label_batch): debug += 1 if debug % 5 == 1: print(f'I am treating image {debug} with label {label}') if debug == 15: break interpreter.set_tensor(input_index, np.array([img])) interpreter.invoke() predictions.append(interpreter.get_tensor(output_index)) test_labels.append(label.numpy()) test_imgs.append(img) print(f'Predictions calculated in {time.time() - start_time} seconds')

Теперь у нас есть все прогнозы. Рассчитаем точность, чувствительность и специфичность:
ok_value = 0 wrong_value = 0 true_positives = 0 total = 0 true_negatives = 0 false_positives = 0 false_negatives = 0 for predictions_array, true_label in zip(predictions, test_labels): predicted_label = np.argmax(predictions_array) if predicted_label == true_label: ok_value += 1 if CLASS_NAMES[int(true_label)] == 'NORMAL': true_negatives += 1 else: true_positives += 1 else: wrong_value += 1 if CLASS_NAMES[predicted_label] == 'NORMAL': false_negatives +=1 else: false_positives += 1 total += 1 print(f'Accuracy: {(true_positives + true_negatives) / total} \n ') print(f'Sensitivity: {true_positives/ (true_positives + false_negatives)} \n ') print(f'Specificity: {true_negatives / (true_negatives + false_positives)}')

Наша модель очень хороша: ее, наверное, можно было бы улучшить, но получить такой результат за 30 минут — это очень хорошо!
Теперь давайте проверим квантованную модель весов и активаций:
weights_tflite_model_file = 'converted_model_int8.tflite' interpreter = tf.lite.Interpreter(model_path=weights_tflite_model_file) interpreter.allocate_tensors() input_index = interpreter.get_input_details()[0]["index"] output_index = interpreter.get_output_details()[0]["index"]
Как и раньше, давайте создадим простую партию из 15 изображений (опять же, из соображений производительности) и проверим ее метрики:
import time start_time = time.time() predictions = [] test_labels, test_imgs = [], [] debug = 0 image_batch, label_batch = next(iter(test_examples_dataset)) for img, label in zip(image_batch, label_batch): debug += 1 if debug % 5 == 1: print(f'I am treating image {debug} with label {label}') if debug == 15: break interpreter.set_tensor(input_index, np.array([img])) interpreter.invoke() predictions.append(interpreter.get_tensor(output_index)) test_labels.append(label.numpy()) test_imgs.append(img) print(f'Predictions calculated in {time.time() - start_time} seconds')

Теперь у нас есть все прогнозы. Рассчитаем точность, чувствительность и специфичность:
ok_value = 0 wrong_value = 0 true_positives = 0 total = 0 true_negatives = 0 false_positives = 0 false_negatives = 0 for predictions_array, true_label in zip(predictions, test_labels): predicted_label = np.argmax(predictions_array) if predicted_label == true_label: ok_value += 1 if CLASS_NAMES[int(true_label)] == 'NORMAL': true_negatives += 1 else: true_positives += 1 else: wrong_value += 1 if CLASS_NAMES[predicted_label] == 'NORMAL': false_negatives +=1 else: false_positives += 1 total += 1 print(f'Accuracy: {(true_positives + true_negatives) / total} \n ') print(f'Sensitivity: {true_positives/ (true_positives + false_negatives)} \n ') print(f'Specificity: {true_negatives / (true_negatives + false_positives)}')

Мы видим, что, хотя модель с оптимизацией весов немного меньше и лучше, модель INT8 намного быстрее. Идея квантования INT8 состоит в том, чтобы потерять точность (немного) для увеличения скорости.
Резюмируя:
Мы загрузили набор рентгеновских данных из репозитория лаборатории.
Мы создали данные наборы данных с помощью наборов данных TensorFlow, которые создают итераторы из наших изображений.
Мы узнали, как адаптировать эти наборы данных, сопоставив несколько методов, которые позволяли переформатировать изображения до 224 x 224 x 3 и возвращали правильную метку, причем все в пакетном режиме.
Мы создали, обучили и сохранили нашу модель с передачей обучения, используя MobileNet v2 и простой слой softmax над ним.
Мы квантовали сохраненную модель классификации пневмонии для весов в качестве оптимизации после обучения.
Мы подтвердили, что мы увеличиваем размер на 80%, просто делая это.
Мы узнали о квантовании активаций с репрезентативным набором данных и о квантовании с полным целым числом.
Мы оценили обе модели, чтобы проверить производительность.
Так как статья подготовлена в преддверии старта курса Machine Learning. Professional, хочу пригласить всех на бесплатный урок курса, где преподаватели OTUS расскажут какие подходы к ансамблированию сегодня существуют в машинном обучении, как устроены такие популярные техники ансамблирования как Bagging, Random Forest и Gradient Boosting. Когда и как их стоит применять для решения ML-задач.
ссылка на оригинал статьи https://habr.com/ru/company/otus/blog/703186/
Добавить комментарий