В этой статье я расскажу о своем курсовом проекте: о модуле проверки задач, который я написал для эмулятора Gym-Duckietown. Речь пойдет о тестирующей системе и об интеграции этой системы с образовательными онлайн-платформами, которые используют технологию External Grader — например, с платформой Stepik.org.
Об авторе
Меня зовут Даниил Плющенко, я студент первого (уже второго) курса магистерской программы «Программирование и анализ данных» в Питерской Вышке. В 2019 году я закончил бакалавриат «Прикладная математика и информатика» в этом же университете.
Платформа Duckietown
Duckietown — это исследовательский проект в области беспилотного транспорта. Организаторы проекта создали платформу, которая помогает внедрять новый подход к обучению в области искусственного интеллекта и робототехники. Все началось как курс в MIT в 2016 году, но постепенно распространилось по всему миру и по разным ступеням образования: от старшей школы до магистерских программ.
Платформа Duckietown состоит из двух частей. Во-первых, это уменьшенная модель городской транспортной среды с дорогами, зданиями, дорожными знаками и препятствиями. Во-вторых, это транспорт. Небольшие мобильные роботы (Duckiebots) под управлением Raspberry Pi получают информацию об окружающем мире через камеру и перевозят по дорогам жителей города — желтых резиновых уточек.
Я работал с эмулятором Duckietown. Он называется Gym-Duckietown, и это open-source проект, написанный на языке Python. Эмулятор помещает вашего бота внутрь города, изменяет его положение в зависимости от алгоритма, который вы используете (или от кнопки, которую нажали), перерисовывает картинку и записывает в логи текущую позицию бота.
Если вам интересно попробовать, рекомендую склонировать себе репозиторий и запустить manual_control.py: так ботом можно будет управлять при помощи стрелок на клавиатуре.
Скриншот из эмулятора
Эмулятор можно использовать как среду для исполнения задач. Поставим такую задачу: по заданной карте, которая состоит из одной дорожной полосы, нужно проехать один метр по прямой.
Для решения задачи можно использовать такой алгоритм:
for _ in range(25): env.step([1, 0]) env.render()
Здесь в переменной env
хранится состояние среды.
Метод step принимает на вход action
: список из двух элементов, описывающий действие бота. Первый элемент задает скорость, второй — угол поворота. Метод render
перерисовывает картинку, учитывая новую позицию бота. Количество шагов и скорость подобраны эмпирически: именно такие значения нужны, чтобы бот проехал ровно один метр по прямой.
Если вы захотите использовать этот фрагмент в manual_control.py, то вставьте его сюда. Код до этого места занимается загрузкой среды. Для простоты можно его переиспользовать, а после добавить предложенное выше решение задачи.
Тестирующая система
Хотелось бы иметь возможность проверять такие задачи автоматически: взять реализацию алгоритма управления ботом, промоделировать поездку и сообщить, корректно ли предложенный алгоритм выполняет поставленную задачу. Такая тестирующая система позволила бы использовать среду во время подготовки к соревнованиям по управлению автономным транспортом, а также в образовательных целях: выдавать обучающимся набор задач и проверять их решения автоматически. Я занимался ее разработкой во время работы над курсовым проектом и ниже расскажу о том, что у меня получилось.
Последовательность шагов при проверке решения выглядит так:
Задачи в тестирующую систему можно добавить самостоятельно или обратиться к тем, которые сделал я. У каждой задачи есть генератор условия — класс, который генерирует состояние окружающей среды. В нем указываются название карты и стартовая позиция бота внутри начальной клетки. Также задаются целевые координаты: в данном случае это точка в одном метре от начальной позиции.
class Ride1MTaskGenerator(TaskGenerator): def __init__(self, args): super().__init__(args) def generate_task(self): env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv env = env_loader( map_name="straight_road", position_on_initial_road_tile=PositionOnInitialRoadTile( x_coefficient=0.5, z_coefficient=0.5, angle=0, ), ) self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]] self.generated_task['env'] = env env.render() return self.generated_task
Здесь TrackingDuckietownEnv
и CVTaskEnv
— классы-обертки, которые используются для сохранения информации о поездке для дальнейшего анализа.
class TrackingDuckietownEnv: def __init__(self, **kwargs): self.__wrapped = DuckietownEnv(**kwargs) … … def step(self, action): obs, reward, done, misc = self.__wrapped.step(action) message = misc['Simulator']['msg'] if 'invalid pose' in message.lower(): raise InvalidPoseException(message) for t in self.trackers: t.track(self) return obs, reward, done, misc
Трекеры собирают информацию о текущем состоянии, например, о позиции бота.
CVTaskEnv
используется, если требуется решение с использованием только информации с камеры («компьютерного зрения»), а не функций эмулятора: например, если нужно узнать, насколько далеко бот располагается от центра полосы или где находится ближайший видимый объект. Обращение к функциям эмулятора может упростить решение задачи, и класс CVTaskEnv
ограничивает вызов методов эмулятора. Он используется, когда выставлен флаг is_cv_task
.
class CVTaskEnv: def __init__(self, **kwargs): self.__wrapped = TrackingDuckietownEnv(**kwargs) def __getattr__(self, item): if item in self.__wrapped.overriden_methods: return self.__wrapped.__getattribute__(item) ALLOWED_FOR_CV_TASKS = [ 'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped', 'road_tile_size', 'trip_statistics' ] if item in ALLOWED_FOR_CV_TASKS: return self.__wrapped.__getattr__(item) else: raise AttributeError(item + " call is not allowed in CV tasks")
После того как исполнение решения завершено — при условии, что оно не было прервано по тайм-ауту, — информация о поездке пропускается через последовательность чекеров. Если все чекеры отработали успешно, задача считается решенной правильно. Иначе выводится поясняющий вердикт — например, бот врезался, съехал с дороги и т. д.
Вот один из стандартных чекеров. Он проверяет, что бот в конце поездки вернулся в начальную точку. Это может быть полезно, если, например, в задаче требуется доехать до определенной точки, а затем вернуться обратно.
class SameInitialAndFinalCoordinatesChecker(Checker): def __init__(self, maximum_deviation=0.1, **kwargs): super().__init__(**kwargs) self.maximum_deviation = maximum_deviation def check(self, generated_task, trackers, **kwargs): trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics)) trip_data = trip_statistics.trip_data if len(trip_data) == 0: return True initial_coordinates = trip_data[0].position.coordinates final_coordinates = trip_data[-1].position.coordinates return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation
Такой тестирующей системой можно пользоваться в ручном режиме, то есть вручную запускать проверку, а затем визуально изучать вердикт. Если бы мы хотели, к примеру, запустить онлайн-курс по автономному транспорту на Stepik.org, нам понадобилась бы интеграция с платформой. Об этом и пойдет речь в следующей части статьи.
Интеграция с онлайн-платформами
Для тестирования задач часто используют технологию External Grader, которая была разработана платформой edX.
При использовании External Grader образовательная платформа не занимается проверкой задач самостоятельно, а формирует очередь посылок, которые отправляет на другое устройство. Функционал подключения к очереди реализован в проекте xqueue-watcher. Xqueue-watcher извлекает посылки, и затем они тестируются модулем проверки (в котором обычно происходят более нетривиальные действия, чем сравнение текста/чисел). После этого вердикт проверки отправляется обратно на сторону образовательной платформы.
Рассмотрим подробнее момент подключения к очереди. После того как образовательная платформа предоставит данные для подключения, их нужно будет добавить в конфигурационные файлы, а в методе grade реализовать непосредственно запуск проверки. Более подробные инструкции можно найти здесь и здесь.
Xqueue-watcher вызывает endpoint get_submission, который извлекает посылку из очереди, если это возможно. После этого она отправляется на тестирование. Затем xqueue-watcher вызывает put_result для возврата вердикта.
Запустить xqueue-watcher можно так:
make requirements && python -m xqueue_watcher -d conf.d/
Допустим, мы хотим воспользоваться технологией External Grader, но не хотим запускать курс на онлайн-платформе. Xqueue-watcher реализован в предположении, что есть некоторое файловое хранилище, куда выгружаются файлы с решениями (платформы такое хранилище имеют). Мы можем модифицировать Xqueue, чтобы файловое хранилище стало не нужно, и подобные системы можно будет запускать, в общем-то, даже у себя на ноутбуке.
Для начала нужно научиться поддерживать саму очередь посылок. Функционал очереди предоставляет проект xqueue.
Картинка взята из документации.
Запустить его можно так:
apt-get install libaio1 libaio-dev apt-get install libmysqlclient-dev pip3 install -r requirements.txt python3 manage.py migrate python3 manage.py runserver $xqueue_address
Может понадобиться создать файл ~/edx/edx.log
По умолчанию xqueue отдает xqueue-watcher’у не содержимое посылок, то есть файлы с решением задачи, а ссылки на эти файлы в файловом хранилище. Чтобы не зависеть от файлового хранилища, можно сделать так, чтобы пересылались сами файлы, и хранить их на той же машине, с которой запущен xqueue-watcher.
Вот как нужно изменить исходный код, чтобы этого достигнуть:
Реализацию метода _upload в файле lms_interface.py заменим на эту:
def _upload(file_to_upload, path, name): ''' Upload file using the provided keyname. Returns: URL to access uploaded file ''' full_path = os.path.join(path, name) return default_storage.save(full_path, file_to_upload)
Если никакое файловое хранилище не было подключено, то данный метод сохранит файл с решением по пути $queue_name/$file_to_upload_hash.
В реализации get_sumbission в файле ext_interface.py вместо этой строки напишем:
xqueue_files = json.loads(submission.urls) for xqueue_file in xqueue_files.keys(): with open(xqueue_files[xqueue_file], 'r') as f: xqueue_files[xqueue_file] = f.readlines()
Передадим не ссылки (пути) на файлы, а их содержимое.
Каждое решение исполняется в «одноразовом» докер-контейнере с ограниченными ресурсами, то есть для исполнения каждого решения создается отдельный контейнер, который удаляется после окончания тестирования. Чтобы создавать такие контейнеры и исполнять команды в них, используется portainer-api (по факту в качестве обертки над Docker API).
Итог
В этой статье я рассказал о том, как была создана тестирующая система и задачи по автономному транспорту, а также об интеграции данной системы с образовательными онлайн-платформами, которые используют технологию External Grader. Надеюсь, что в скором времени будет запущен курс, использующий эту систему, а часть про интеграцию с онлайн-платформами пригодится желающим создать собственный офлайн- или онлайн-курс.
ссылка на оригинал статьи https://habr.com/ru/company/hsespb/blog/510868/
Добавить комментарий