При старте клиентское приложение асинхронно отправляет «пачку» запросов к API. Приложение имеет идентификатор clientId, на основании которого можно различить запросы одного клиента от другого. На каждый запрос на сервере выполняется код вида:
// получаем из репозитория данные клиента Client client = clientRepository.findByClientId(clientId); // если клиент не создан ранее if(client == null){ client = clientRepository.save(new Client(clientId)); } // далее обрабатываем запрос
где сущность Client имеет поле clientId, которое должно являться уникальным и имеет для этого в базе unique constraint. Так как в Spring каждый запрос будет выполнять данный код в отдельном потоке, даже если это запросы от одного и того же клиентского приложения, будет возникать ошибка вида:
integrity constraint violation: unique constraint or index violation; UK_BFJDOY2DPUSSYLQ7G1S3S1TN8 table: CLIENT
Ошибка возникает по очевидной причине: 2 или более потоков с одним clientId получают сущность client == null и начинают ее создавать, после чего при коммите получают ошибку.
Задача:
Необходимо синхронизировать запросы от одного clientId так, чтобы только первый запрос выполнил создание сущности Client, а остальные блокировались бы на момент создания и получали бы уже созданный им объект.
Решение 1
// если клиент не создан ранее if(client == null){ // выполняем синхронизацию synchronized (this){ // выполняем повторную проверку client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } }
Данное решение является работающим, но весьма дорогим, так как блокируются все запросы (потоки), которым нужно выполнить создание, даже если они будут создавать Client с разными clientId и никак друг с другом не конкурируют.
Обратите внимание, что сочетании синхронизации с аннотацией @Transactional
@Transactional public synchronized Client getOrCreateUser(String clientId){ // получаем из репозитория данные клиента Client client = clientRepository.findByClientId(clientId); // если клиент не создан ранее if(client == null){ client = clientRepository.save(new Client(clientId)); } return client; }
опять возникнет та же ошибка. Причина в том, что сначала освободится монитор (synchronized) и следующий поток войдет в синхронизированную область, а только после в прокси-объекте произойдет коммит транзакции первым потоком. Решить эту проблему просто — нужно чтобы монитор освобождался после коммита, следовательно, synchronized необходимо вызывать выше:
synchronized (this){ client = clientService.getOrCreateUser(clientId); }
Решение 2
Очень хотелось бы использовать конструкцию вида:
synchronized (clientId)
но проблема в том, что для каждого запроса будет создаваться новый объект clientId, даже если значения их эквивалентны, поэтому выполнить таким образом синхронизацию нельзя. Для того, чтобы решить проблему с разными объектами clientId необходимо использовать пул:
Client client = clientRepository.findByClientId(clientId); // если клиент не создан ранее if(client == null){ // выполняем синхронизацию synchronized (clientId.intern()){ // выполняем повторную проверку client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } }
В данном решении используется java string pool, соответственно, запросы с эквивалентным clientId, вызвав clientId.intern(), получат один и тот же объект. Менеджмент «протухания» clientId ложится в данном случае на плечи GС. Данный подход работает только для конструкции synchronized, что, к сожалению, не всегда достаточно.
Решение 3
Для того, чтобы использовать ReentrantLock, необходим пул вида:
private final ConcurrentMap<String, ReentrantLock> locks;
и тогда:
Client client = clientRepository.findByClientId(clientId); // если клиент не создан ранее if(client == null){ // выполняем синхронизацию ReentrantLock lock = locks.computeIfAbsent(clientId, (k) -> new ReentrantLock()); lock.lock(); try{ // выполняем повторную проверку client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } finally { // отпускаем лок lock.unlock(); } }
Единственной проблемой остается менеджмент «протухания» clientId, ее можно решить использованием нестандартной реализации ConcurrentMap, которая уже поддерживает expire, для примера берем guava Cache:
locks = CacheBuilder.newBuilder() .concurrencyLevel(4) .expireAfterWrite(Duration.ofMinutes(1)) .<String, ReentrantLock>build().asMap();
Решение 4
Приведенные ранее решения осуществляют синхронизацию запросов в рамках одного инстанса. Что же делать если ваш сервис крутится на N нодах и запросы могут попасть одновременно на разные? Для данной ситуации отлично подойдет в качестве решения использование библиотеки Redisson:
Client client = clientRepository.findByClientId(clientId); // если клиент не создан ранее if(client == null){ // выполняем синхронизацию RLock lock = redissonClient.getFairLock(clientId); lock.lock(); try{ // выполняем повторную проверку client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } finally { // отпускаем лок lock.unlock(); } }
Библиотека решает задачу «distributed locks», используя в качестве хранилища redis.
Заключение
Какое решение применить безусловно зависит от масштаба задачи: решения 1-3 вполне подойдут для небольших одноинстансных сервисов, решение 4 нацелено уже на распределенные сервисы. Также отдельно стоит заметить, что решение данной задачи с использованием Redisson или аналогов (например классического Zookeeper) это, безусловно, частный случай, так как они рассчитаны на куда больший круг задач для распределенных систем.
В нашем случае мы остановились на решении 4, так как наш сервис является распределенным и интеграция Redisson была наиболее простой в сравнении с аналогами.
Друзья, предлагайте в комментариях Ваши варианты решения данной задачи, буду очень рад!
Исходный код примеров доступен на GitHub.
ссылка на оригинал статьи https://habr.com/ru/company/maximatelecom/blog/434714/
Добавить комментарий