В новой статье CleverData мы расскажем о проектировании ML-пайплайна предсказания целевого действия с помощью Yandex Cloud. Пайплайн необходим для автоматического обмена данными с CDP CleverData Join — использования информации с платформы для обучения ML-моделей и формирования прогнозов поведения каждого пользователя. На примерах рассмотрим использование API сервисов Yandex Cloud, коснемся алгоритмов обработки данных и обучения ML-модели, а также расскажем о возникших проблемах. Под катом делимся кодом.
Что такое CDP и зачем нужны предиктивные атрибуты
Платформа управления клиентскими данными (Customer Data Platform) — это инструмент, помогающий компаниям собирать, анализировать и использовать информацию о своих клиентах. CDP объединяет данные из таких источников, как сайты, мобильные приложения, CRM-системы и т.п., чтобы сформировать полное представление о портрете клиента и истории его действий.
CDP CleverData Join содержит два основных типа данных: клиентские профили и события, привязанные к этим профилям. В состав профиля входят идентификаторы клиента и атрибуты, содержащие информацию о пользователе (его имя, демографические характеристики, предпочтения и многое другое). Атрибуты событий представляют собой информацию о конкретных действиях клиентов. Например, посещения страниц сайта, факты покупки товаров, реакции на маркетинговые кампании. Но CDP — это не только база данных. Это инструмент, который помогает делать полезные выводы о клиентах и повышать эффективность маркетинга.
Основная часть данных собирается из внешних систем, интегрированных с CDP. Однако при наличии такого массива данных возникает желание получить новую, предиктивную характеристику профиля, сформированную методом машинного обучения на собранных сведениях. Знать не то, что человек уже сделал, а предугадывать его следующие шаги.
В CDP предиктивные атрибуты, как правило, используются для прогнозирования поведения клиентов, определения их потребностей и предсказания вероятности совершения определенных действий. Например, для маркетологов может быть полезно иметь прогноз того, что клиент возьмет кредит или оформит подписку. Это можно вычислить при помощи ML-модели, обученной на исторических данных о поведении клиента. Предиктивные атрибуты играют важную роль в персонализации маркетинговых кампаний, улучшении обслуживания клиентов.
Конечно же, нужно понимать, что для моделирования предиктивных атрибутов требуется:
-
иметь качественные, структурированные данные для моделирования,
-
обеспечить достаточный объем таких данных,
-
обладать качественными алгоритмами предобработки и обучения.
Не менее важным вопросом является обслуживание моделей — их надо регулярно обновлять и дообучать на свежих данных, иначе качество прогнозирования будет неизбежно падать. Построению пайплайна, который обеспечивает автоматизированное взаимодействие с CDP, и посвящена эта статья.
MVP и его цели
Мы решили построить MVP пайплайна, реализующего передачу данных для обучения, само обучение и передачу предсказаний обратно в CDP CleverData Join, используя готовые инструменты, предоставляемые Yandex Cloud.
В качестве бизнес-задачи мы для себя определили предсказание вероятности совершения клиентом целевого действия на основании данных о переходах по страницам сайта — кликстриме. Это типовая задача бинарной классификации (классификация по определенным признакам в две группы) массива клиентов. Под целевым действием подразумевается посещение нужного нам URL (например, страницы оформления покупки или кредита). Поскольку кликстрим является цепочкой переходов пользователей по ссылкам, в качестве архитектуры модели была выбрана рекуррентная нейронная сеть (вид нейронных сетей, где связи между элементами образуют направленную последовательность).
Целью проекта стало построение полностью автоматизированного пайплайна, который приносил бы ценность заказчику в течение длительного времени. Для этого нам нужно было реализовать следующие цепочки операций.
-
Предобработка новых данных для обучения и инференса (непрерывной работы нейронной сети на устройствах конечного пользователя) и преобразование их из формата хранения в CDP в формат, требуемый для ML-модели.
-
Первичное обучение модели под задачу конкретного заказчика на кликстриме его клиентов.
-
Периодическое дообучение модели на новых данных, поступающих в CDP. Дообучение на новых данных крайне важно, поскольку с течением времени модель может потерять свою актуальность, а, значит, точность прогнозов значительно уменьшится.
-
Периодический инференс (обеспечение непрерывной работы нейронной сети на устройствах конечного пользователя) для клиентов с обновлениями в кликстриме и отгрузку результатов обратно в CDP.
Архитектура компонентов MVP в Yandex Cloud
Итак, нам потребовалось разработать и развернуть в Yandex Cloud сервисы, реализующие все необходимые для построения MVP цепочки операций. После анализа требований была предложена следующая архитектура решения:
На стороне CleverData Join находятся сервисы Model Manager, отвечающие за автоматизированное создание инфраструктуры под пайплайн в Yandex Cloud, и Data Processor, обеспечивающий периодическую первичную обработку новых данных и загрузку их в бакет S3. По сути Model Manager является Control Plane, который автоматизирует операции по развертыванию компонентов в Yandex Cloud при появлении потребности создать новый предиктивный атрибут. Остальные компоненты находятся в инфраструктуре Yandex Cloud.
-
В бакете Yandex Object Storage накапливаются данные, представляющие собой первично обработанный кликстрим. Там же должны располагаться артефакты и метаданные, полученные при обучении моделей, а также результаты инференса моделей.
-
В сервисе Yandex DataSphere автоматически создаётся проект, содержащий ноутбуки для обучения и инференса. При поступлении новых данных в Yandex Object Storage должен запускаться необходимый ноутбук. В ноутбуке обучения реализованы вторичная обработка данных на кластере Yandex DataProc, обучение модели непосредственно в сервисе, загрузка артефактов модели на S3. В ноутбуке инференса — получение предсказаний по новым данным, отгруженным на S3.
-
В сервисе Yandex Cloud Functions располагаются функции, необходимые для запуска ноутбуков, и триггеры, которые привязаны к обновлению данных в Yandex Object Storage по определенным при их создании префиксам. Таким образом, после отгрузки новых данных на бакет производится, в зависимости от расположения данных, обучение/дообучение модели, инференс модели и отгрузка предсказаний в REST API CleverData Join.
Так как важно обеспечить одновременную работу нескольких моделей совершения целевого действия для разных тенантов в системе CD Join, для каждой задачи нужен свой отдельный проект в DataSphere со своими шаблонными ноутбуками, облачными функциями и триггерами. Чтобы процесс был максимально автоматизирован, необходимо, чтобы каждый проект был преднастроен:
-
в него были загружены шаблонные ноутбуки,
-
был активирован готовый Docker-образ для проекта DataSphere.
Забегая вперёд, отметим, что с автоматизацией преднастройки проекта возникли некоторые затруднения, но об этом мы расскажем позднее.
Немного о предобработке данных
Data Processor запускается по расписанию и производит первичную обработку данных и записывает результат в хранилище Yandex Object Storage.
События кликстрима хранятся в CleverData Join в формате AVRO. Предобработка происходит в два этапа.
-
На первом этапе из событий кликстрима извлекаются идентификатор клиента, метка времени и URL перехода. Ссылки очищаются от технических доменов и обрезаются до верхнеуровневых доменов. Данные сохраняются в формате Parquet.
-
Далее для каждого идентификатора клиента формируется цепочка из посещенных URL в хронологическом порядке и дельт, то есть временных интервалов, прошедших между посещениями каждого URL.
Процесс, организованный в два этапа, позволяет избегать повторной первичной обработки старых событий. Старые события хранятся в формате Parquet в S3 бакете и периодически дополняются первично обработанными событиями за новые периоды. После этого происходит пересборка цепочек для клиентов, по которым есть обновления.
Автоматизированное создание инфраструктуры
У сервисов Yandex Cloud есть API, позволяющий взаимодействовать с ними без использования UI, что критически важно для реализации управления процессом в Model Manager. В этом разделе мы пройдём по всем автоматизированным через API операциям, а именно:
-
созданию сервисного аккаунта;
-
созданию и настройке сети VPC для кластера Data Proc;
-
созданию проекта в DataSphere;
-
созданию S3 бакета и получению статического ключа доступа;
-
созданию и настройке Cloud Functions и их триггеров.
Сначала создадим сервисный аккаунт, от имени которого будет происходит взаимодействие с ресурсами Yandex Cloud:
def create_service_account(folder_id: str, name: str, iam_token: str, description: str = ''): url = 'https://iam.api.cloud.yandex.net/iam/v1/serviceAccounts' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": name, "description": description, } response = requests.post(url, headers=headers, json=data) return response sa_response = create_service_account(folder_id = folder_id, name = 'ta-service-account', iam_token = iam, description = 'ta-service-account')
Теперь, чтобы у сервисного аккаунта хватало прав на взаимодействие с необходимыми сервисами Yandex Cloud (DataSphere, Object Storage, Data Proc, Cloud Functions, VPC), выдадим ему необходимые роли. Для каталога это:
-
editor — нужна для чтения, записи и управления бакетом S3;
-
vpc.user, vpc.admin — необходимы для настройки доступа кластеру в интернет;
-
dataproc.agent — для использования кластера Data Proc;
-
dataproc.admin — для создания и удаления кластера Data Proc.
def update_access_bindings(resource_id : str, folder_id: str, roles: list = ['iam.serviceAccounts.user', 'editor', 'vpc.user', 'vpc.admin', 'dataproc.admin', 'dataproc.agent'], iam_token: str = iam): url = f'https://resource-manager.api.cloud.yandex.net/resource-manager/v1/folders/{folder_id}:updateAccessBindings' headers = {"Authorization": f"Bearer {iam_token}"} access_binding_deltas = [ { "action": "ADD", "accessBinding": { "roleId": role, "subject": { "id": resource_id, "type": "serviceAccount" } } } for role in roles ] payload = { "accessBindingDeltas": access_binding_deltas } response = requests.post(url, json=payload, headers=headers) return response response_update_access_bindings = update_access_bindings( resource_id=service_account_id, folder_id=folder_id)
Также необходимо назначить роль сервисному аккаунту, чтобы можно было им управлять. Роль должна быть назначена как на ресурс:
def update_access_bindings_1(resource_id: str, roles: list = ['datasphere.community-projects.editor', 'iam.serviceAccounts.user', 'functions.functionInvoker'], iam_token: str = iam): url = f"https://iam.api.cloud.yandex.net/iam/v1/serviceAccounts/{resource_id}:updateAccessBindings" headers = {"Authorization": f"Bearer {iam_token}"} access_binding_deltas = [ { "action": "ADD", "accessBinding": { "roleId": role, "subject": { "id": resource_id, "type": "serviceAccount" } } } for role in roles ] payload = { "accessBindingDeltas": access_binding_deltas } response = requests.post(url, json=payload, headers=headers) return response update_access_response_1 = update_access_bindings_1(resource_id=service_account_id)
Для того, чтобы кластер Data Proc мог общаться с внешним миром, нам нужно создать сеть VPC, указать шлюз и сформировать таблицу маршрутизации. В этой сети необходимо выделить подсеть для настройки доступа в интернет при помощи созданного шлюза.
Итак, создаём сеть:
def create_network(folder_id: str, network_name: str, description: str, iam_token: str = iam, ): url = 'https://vpc.api.cloud.yandex.net/vpc/v1/networks' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": network_name, "description": description, } response = requests.post(url, headers=headers, json=data) return response create_network_response = create_network(folder_id = folder_id, network_name = 'network-1', description = '' )
…, шлюз:
def create_gateway(folder_id: str, gateway_name: str, description: str, iam_token: str = iam, ): url = 'https://vpc.api.cloud.yandex.net/vpc/v1/gateways' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": gateway_name, "description": description, "sharedEgressGatewaySpec": {} } response = requests.post(url, headers=headers, json=data) return response response_create_gateway = create_gateway(folder_id = folder_id, gateway_name = 'gateway', description = '' )
…, таблицу маршрутизации:
def create_route_table(folder_id: str, route_table_name: str, description: str, gateway_id: str, network_id: str, destination_prefix: str = '0.0.0.0/0', iam_token: str = iam, ): url = 'https://vpc.api.cloud.yandex.net/vpc/v1/routeTables' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": route_table_name, "description": description, "networkId": network_id, "staticRoutes": [ { "destinationPrefix": destination_prefix, "gatewayId": gateway_id, } ] } response = requests.post(url, headers=headers, json=data) return response response_create_route_table = create_route_table(folder_id = folder_id, route_table_name = 'route-table', description = '', gateway_id = gateway_id, network_id = network_id, destination_prefix = '0.0.0.0/0' )
…, и подсеть с доступом в интернет:
def create_subnet(folder_id: str, subnet_name: str, network_id: str, route_table_id: str, description: str, ip_range: str, zone_id: str, dhcp_options: dict = {}, iam_token: str = iam, ): url = 'https://vpc.api.cloud.yandex.net/vpc/v1/subnets' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": subnet_name, "description": description, "networkId": network_id, "zoneId": zone_id, "v4CidrBlocks": [ ip_range ], "routeTableId": route_table_id, "dhcpOptions": dhcp_options } response = requests.post(url, headers=headers, json=data) return response response_create_subnet_a = create_subnet(folder_id = folder_id, subnet_name = 'subnet-ru-central1-a', network_id = network_id, route_table_id = route_table_id, description = 'default subnet for zone ru-central1-a', ip_range = '10.128.0.0/24', zone_id = 'ru-central1-a', dhcp_options = {}, )
В процессе работы мы будем создавать кластер Data Proc, на котором производится вторичная обработка данных.
При создании кластера важно не забыть передать идентификатор подсети, для которой мы ранее настраивали доступ в интернет. Нам нужно создать объект конфигурации с информацией о подкластерах, их ролях, типе и размере диска, количестве хостов. Этот объект будет передан в функцию создания кластера.
def create_cluster(folder_id: str, cluster_name: str, description: str, config_spec: dict, service_account_id: str, zone_id: str, bucket: str, ui_proxy: bool, security_group_ids: str, host_group_ids: str, deletion_protection: bool, log_group_id: str, iam_token: str = iam, ): url = 'https://dataproc.api.cloud.yandex.net/dataproc/v1/clusters' headers = {"Authorization": "Bearer {}".format(iam_token)} data = { "folderId": folder_id, "name": cluster_name, "description": description, "configSpec": config_spec, "zoneId": zone_id, "serviceAccountId": service_account_id, "uiProxy": ui_proxy, "deletionProtection": deletion_protection, } response = requests.post(url, headers=headers, json=data) return response config_spec = { "versionId": "2.0", "hadoop": { "services": [ 'HDFS', 'LIVY', 'MAPREDUCE', 'SPARK', 'TEZ', 'YARN', 'ZEPPELIN' ], "sshPublicKeys": [ public_key ], }, "subclustersSpec": [ { "name": "master", "role": "MASTERNODE", "resources": { "resourcePresetId": "s2.micro", "diskTypeId": "network-ssd", "diskSize": "137438953472" }, "subnetId": a_subnet_id, "hostsCount": "1", "assignPublicIp": False, }, { "name": "data", "role": "DATANODE", "resources": { "resourcePresetId": "s2.small", "diskTypeId": "network-hdd", "diskSize": "274877906944" }, "subnetId": a_subnet_id, "hostsCount": "1", "assignPublicIp": False, }, ] } response_create_cluster = create_cluster(folder_id = folder_id, cluster_name = 'xs-proc', description = '', config_spec = config_spec, service_account_id = service_account_id, zone_id = zone_id, bucket = '', ui_proxy = False, security_group_ids = '', host_group_ids = '', deletion_protection = False, log_group_id = '') cluster_id = response_create_cluster.json()['metadata']['clusterId']
Так как кластер будет считывать данные с s3, нужно предоставить ему Credentials для доступа к S3:
#!spark --cluster=xs-proc --session preparing_session --variables key --variables sct sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "") sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",key) sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",sct)
Теперь мы готовы создать проект в DataSphere, в нем будут происходить все операции, связанные с обучением и инференсом ML-моделей:
def create_project(project_name: str, community_id: str, service_account_id: str, description: str, subnet_id: str, commit_mode: str, default_folder_id: str, vm_inactivity_timeout: str, default_dedicated_spec: str, data_proc_cluster_id: str = '', security_group_ids: list = [], limits: dict = {}, early_access: bool = False, ide: str = 'JUPYTER_LAB', iam_token: str = iam): url = 'https://datasphere.api.cloud.yandex.net/datasphere/v2/projects' headers = {"Authorization": f"Bearer {iam_token}"} data = { "communityId": community_id, "name": project_name, "description": description, "settings": { "serviceAccountId": service_account_id, "subnetId": subnet_id, "dataProcClusterId": data_proc_cluster_id, "commitMode": commit_mode, "securityGroupIds": security_group_ids, "earlyAccess": early_access, "ide": ide, "defaultFolderId": default_folder_id, "vmInactivityTimeout": vm_inactivity_timeout, "defaultDedicatedSpec": default_dedicated_spec }, "limits": limits } response = requests.post(url, headers=headers, json=data) return response response_create_project = create_project(project_name = 'ta-service-project', community_id = community_id, service_account_id = service_account_id, subnet_id = a_subnet_id, description = 'ta model project', commit_mode = 'AUTO', default_folder_id = folder_id, vm_inactivity_timeout = '1800s', default_dedicated_spec = 'c1.4')
Сервисный аккаунт, который из Cloud Functions будет запускать ноутбук, должен быть в участниках проекта DataSphere. Выдадим ему соответствующую роль:
def update_access_bindings_2(resource_id : str, project_id: str, roles: list = ['datasphere.community-projects.editor'], iam_token: str = iam): url = f"https://datasphere.api.cloud.yandex.net/datasphere/v2/projects/{project_id}:updateAccessBindings" headers = {"Authorization": f"Bearer {iam_token}"} access_binding_deltas = [ { "action": "ADD", "accessBinding": { "roleId": role, "subject": { "id": resource_id, "type": "serviceAccount" } } } for role in roles ] payload = { "accessBindingDeltas": access_binding_deltas } response = requests.patch(url, json=payload, headers=headers) return response response_update_access_bindings_2 = update_access_bindings_2(resource_id=service_account_id, project_id=project_id)
Вы можете использовать для обмена данными уже существующий бакет S3. Но, если его нет, то его создание также не представляет сложности:
def create_bucket(bucket_name: str, folder_id: str, default_storage_class: str, max_size: str, anonymous_access_flags: dict, acl: dict, versioning: str, tags: list, iam_token: str = iam, ): url = 'https://storage.api.cloud.yandex.net/storage/v1/buckets' headers = {"Authorization": f"Bearer {iam_token}"} data = { "name": bucket_name, "folderId": folder_id, "defaultStorageClass": default_storage_class, "maxSize": max_size, "anonymousAccessFlags": anonymous_access_flags, "versioning": versioning, "acl": acl, "tags": tags } response = requests.post(url, headers=headers, json=data) return response bucket_name = 'test-storage-042' response_create_bucket = create_bucket(bucket_name = bucket_name, folder_id = folder_id, default_storage_class = 'STANDARD', max_size = '3221225472', anonymous_access_flags = {'read': False, 'list': False}, acl = {}, versioning = 'VERSIONING_DISABLED', tags = [])
Однако крайне важно создать статический ключ доступа к бакету. Ключ позволяет читать данные с S3 как кластеру Data Proc, так и при помощи SDK boto3. Конечно же, у сервисного аккаунта service_account_id, указанного в функции, должна быть роль в каталоге с правами чтения и записи S3 бакета, в нашем случае это editor.
def create_static_key(service_account_id: str, description: str, iam_token: str = iam, ): url = 'https://iam.api.cloud.yandex.net/iam/aws-compatibility/v1/accessKeys' headers = {"Authorization": f"Bearer {iam_token}"} data = { "serviceAccountId": service_account_id, "description": description } response = requests.post(url, headers=headers, json=data) return response response_create_static_key = create_static_key( service_account_id = service_account_id, description = '') key = response_create_static_key.json()['accessKey']['keyId'] secret = response_create_static_key.json()['secret']
Обязательно сохраняем идентификатор статического ключа и секретный ключ в защищенном месте. Мы передадим их через параметр запроса InputVariables метода execute при запуске ноутбуков.
Создаем функцию в Cloud Functions, которая будет запускать обучение:
def create_cloud_function(folder_id: str, function_name: str, description: str, iam_token: str = iam): url = 'https://serverless-functions.api.cloud.yandex.net/functions/v1/functions' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": function_name, "description": description, } response = requests.post(url, headers=headers, json=data) return response response_create_cloud_function = create_cloud_function(folder_id = folder_id, function_name = 'fit', description = 'start fit')
Далее нужно собрать версию функции, в которую передадим сам код скрипта в виде строки. Чтобы получить эту строку, нужно заархивировать код функции в zip-архив и закодировать в base64. Строка передается в переменную content:
def create_cloud_function_version(function_id: str, resources: dict, runtime: str, entrypoint: str, log_options: dict, description: str, execution_timeout: str, service_account_id: str, content: str, iam_token: str = iam ): url = 'https://serverless-functions.api.cloud.yandex.net/functions/v1/versions' headers = {"Authorization": f"Bearer {iam_token}"} data = { "functionId": function_id, "runtime": runtime, "description": description, "entrypoint": entrypoint, "resources": resources, "executionTimeout": execution_timeout, "serviceAccountId": service_account_id, "logOptions": log_options, "content": content, } response = requests.post(url, headers=headers, json=data) return response response_create_cloud_function_version = create_cloud_function_version( function_id = fit_cf_id, resources = {'memory': '134217728'}, runtime = 'python312', entrypoint = 'index.handler', log_options = {'disabled': False, 'folderId': folder_id}, description = 'the function will execute fit notebook', execution_timeout = '1s', service_account_id = service_account_id, content = str(encoded)[2:-1], iam_token = iam )
Функция и ее версия для запуска инференса добавляется полностью аналогичным образом.
Осталось создать триггеры, которые после появления новых данных в S3 запустят функции, которые в свою очередь запустят обучение, инференс или же отправку предсказаний в CleverData Join.
Мы будем отслеживать появление нового объекта на S3 с заданным префиксом, поэтому используем соответствующий event_type и передадим нужный prefix:
def create_trigger(folder_id: str, trigger_name: str, description: str, event_type: str, bucket_id: str, prefix: str, cloud_function_id: str, service_account_id: str, retry_attempts: str, interval: str, function_tag: str = '$latest', iam_token: str = iam ): url = 'https://serverless-triggers.api.cloud.yandex.net/triggers/v1/triggers' headers = {"Authorization": f"Bearer {iam_token}"} data = { "folderId": folder_id, "name": trigger_name, "description": description, "rule": { "objectStorage": { "eventType": [ event_type ], "bucketId": bucket_id, "prefix": prefix, "invokeFunction": { "functionId": cloud_function_id, "functionTag": function_tag, "serviceAccountId": service_account_id, "retrySettings": { "retryAttempts": retry_attempts, "interval": interval }, }, }, } } response = requests.post(url, headers=headers, json=data) return response response_create_fit_trigger = create_trigger( folder_id = folder_id, trigger_name = 'fit-test-trigger', description = '', event_type = 'OBJECT_STORAGE_EVENT_TYPE_CREATE_OBJECT', bucket_id = bucket_id, prefix = fitting_trigger_prefix, cloud_function_id = fit_cf_id, service_account_id = service_account_id, retry_attempts = '5', interval = '20s', function_tag = '$latest')
Поговорим немного о возврате данных в CDP. В CleverData Join имеются готовые адаптеры на пакетную загрузку данных. При этом, все операции взаимодействия с CDP также превосходно автоматизируются через API.
Мы создаем экземпляр входящего адаптера для того, чтобы передавать и записывать в атрибут клиентского профиля CDP предсказания модели:
def create_adapter_instance_input(adapter_instance_id: str, source: str, cid: str, access_token: str, value: dict={}): headers = {"Authorization": "Bearer {}".format(access_token), "x-dmpkit-onbehalf-of": cid} url = f'https://domain_name.cleverdata.ru/integration-manager/v1/adapters/1/instances/{adapter_instance_id}/inputs' data = { "value":value, "source":source, "adapterInstanceId":adapter_instance_id } response = requests.post(url, json=data, headers=headers) return response response = create_adapter_instance_input(adapter_instance_id=adapter_instance_id, source='predictions_task.json', access_token=access_token, cid=cid)
Все, что останется сделать, — реализовать в теле функции загрузку данных в файловый менеджер CleverData Join. Это тоже можно реализовать через API. Загруженные данные будут обработаны ранее созданным адаптером.
Обучение и дообучение модели
После запуска ноутбука обучения или инференса в проекте DataSphere происходит вторичная обработка с использованием кластера Dataproc. Новые сведения о событиях кликстрима добавляются в цепочки действий клиента. Далее на полученных цепочках происходит обучение модели.
Код вторичной обработки и код обучения во время первичного и повторного запуска ноутбуков немного отличается. Один и тот же код в зависимости от состояния объектов в бакете обрабатывает следующие несколько ситуаций. Рассмотрим их по отдельности.
В ситуации, когда в бакете находится только файл с событиями за текущий период, полученный в результате первичной обработки, мы имеем дело с первичным обучением модели. Следовательно, после обработки данных ноутбук выполнит последовательность первичного обучения модели. Для ноутбука инференса реализована та же логика. Если это первые данные для инференса в бакете, то модель вычислит предсказания для всех профилей.
В случае, если в бакете уже находились данные за предыдущие периоды, произойдет объединение строк и построение новых цепочек. Далее — дообучение модели.

Схема для повторного инференса выглядит схожим образом. Отличие заключается в том, что в данных за предыдущие периоды будут оставлены только события клиентов текущего периода. За счёт этого цепочки будут перестроены только для новых клиентов — именно тех, по которым значения предиктивных атрибутов ещё не сформированы.
В процессе дообучения создается новая модель-кандидат. Для того, чтобы определить, лучше ли эта модель, чем предыдущая, сравнивается AUC-ROC score новой модели на тестовой выборке с аналогичным показателем предыдущей модели, который сохраняется в бакете S3. При отсутствии увеличения метрики несколько итераций дообучения подряд — процесс дообучения заканчивается. Если же после дообучения модель показала лучшее качество, чем предыдущая лучшая модель, то происходит обновление лучшей модели (а также токенизатора, значения порога предсказания, файла с AUC-ROC score).
Ограничения в MVP версии проекта
В процессе реализации решения обнаружили некоторые ограничения, проработка которых может помочь расширить функциональность этого сервиса в будущем.
Например, в настоящее время отсутствует метод для загрузки ноутбуков в проект DataSphere. Поэтому при реализации мы изменили логику и ввели концепцию сервисного ноутбука, который однократно создается в DataSphere вручную. Это -разовая операция, нет необходимости выполнять ее для каждой новой задачи предсказания.
При заведении новой задачи выполнения целевого действия Model Manager размещает рабочие ноутбуки в S3 бакете. Сервисный ноутбук скачивает их в файловую систему DataSphere и последовательно запускает, если для задачи, соответствующей ноутбуку, есть новые первично обработанные данные.
Переход к данной концепции позволил ускорить и удешевить вторичную обработку. Теперь она сопровождается единоразовым поднятием кластера Data Proc и единой вторичной обработкой файлов всех задач клиента, под которые есть свежие данные.
Подводя итог, важно отметить, что в совокупности с концепцией сервисного ноутбука добавление метода загрузки ноутбуков в API позволит добиться еще более эффективной автоматизации развертывания пайплайна в Yandex Cloud, исключив необходимость в использовании UI.
Отключение режима запуска ноутбуков в Serverless в DataSphere
MVP изначально проектировалось в Serverless-парадигме — планировалось выделять ресурсы только в момент запуска вычислительных задач. Разумеется, мы с огромным удовольствием воспользовались Serverless-режимом запуска ноутбуков в DataSphere — это позволяло распределять рабочую нагрузку на CPU- и GPU-ресурсы в зависимости от характера нагрузки, хотя и приходилось взамен жертвовать временем на сериализацию, десериализацию переменных в процессе перехода между инстансами. Однако данный режим более не поддерживается, и, следовательно, нами был осуществлен переход к Dedicated.
Cтоит подчеркнуть, что переход от режима Serverless к Dedicated не повлиял на увеличение расходов на ресурсы вследствие простаивания инстансов, поскольку выделенная ВМ на запуск ноутбука через API после завершения выполнения инструкций моментально удаляется.
В рамках доработки текущего ограничения мы должны будем подобрать наиболее оптимальную конфигурацию ВМ при запуске в режиме Dedicated.
Ручная активация docker-образа в проекте
Пока что нет возможности взаимодействовать с ресурсом Docker-образ проекта DataSphere посредством API. Однако в ближайшем будущем этот функционал будет добавлен в API, что также обеспечит большую гибкость при развертывании пайплайна.
Для повышения функциональности ML-сервиса в CDP все перечисленные ограничения будут проработаны при разработке его продакшн-версии, в том числе некоторые из них будут усовершенствованы в сотрудничестве с Yandex Cloud.
Итоги MVP
Подводя итоги, можно с уверенностью сказать, что цели MVP были достигнуты.
-
Мы проверили гипотезу о возможности практически полностью автоматизированного развертывания инфраструктуры сервиса в облаке.
-
Построили эффективный конвейер предобработки данных.
-
Научились запускать код обучения и инференса по внешним триггерам.
-
Организовали процесс дообучения модели.
Использование ML-сервисов Yandex Cloud позволяет значительно упростить разработку решений по предиктивному анализу данных. Разработанный функционал является отличным дополнением к уже существующим широким возможностям платформы CleverData Join в части накопления знаний о едином профиле пользователя и построении его клиентского пути и позволяет обогащать данные о клиентской базе и открывает возможности использования предиктивных атрибутов для сегментации аудитории и организации адресных взаимодействий с клиентом.
ссылка на оригинал статьи https://habr.com/ru/articles/830446/
Добавить комментарий