Spring: Реализация TaskExecutor c поддержкой транзакций

от автора

Spring, позаботившись о разработчиках, предлагает удобный и простой фасад для взаимодействия с менеджером транзакций. Однако всегда ли стандартного механизма будет достаточно для реализации изощрённых архитектурных идей? Очевидно — нет.

В этом посте пойдёт речь о возможностях Spring —

  • взглянем на примеры стандартного управления транзакциями с помощью аннотаций,
  • поймём — когда решить задачу с помощью аннотаций не получится,
  • и, судя по заголовку статьи, дадим пример реализации транзакционного исполнения кода в новом потоке, создавемых с помощью Spring TaskExecutor.


Наиболее общеупотребимо задание транзакций с помощью аннотации @Transactional. Что бы воспользоваться этим механизмом достаточно сконфигурировать предпочитаемый TransactionManager и включить обработку аннотаций. В случае конфигурации с помощью XML файлов это выглядит примерно так:

    .....      <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">         <property name="dataSource" ref="dataSource" />         <property name="entityManagerFactory" ref="entityManagerFactory" />         <property name="jpaDialect" ref="jpaDialect"/>     </bean>     <tx:annotation-driven transaction-manager="transactionManager"/>     .... 

Использовать далее это так же просто как «навесить» аннотацию @Transactional на реализацию метода интерфейса или над всем классом-реализацией.

@Service public class FooServiceImpl implements FooService {     @Autowired     private FooDao fooDao;      @Transactional     @Override     public void update(Foo entity)  {         fooDao.merge(entity);     } } 

Однако, что бы эффективно использовать этот механизм нужно помнить о нескольких, невсегда очевидных тонкостях:

  • Класс должен быть объявлен как bean (c помощью аннотаций, code-based или в xml конфигурации контейнера)
  • Класс должен реализовывать интерфейс, иначе контейнеру будет сложно создать прокси-объект, с помощью которого и выполняется управление транзакцией.
  • Вызов транзакционных методов из другого метода того же класса не приведёт к созданию транзакции! (следствие из предыдущего пункта)

Подобный механизм позволяет не писать код управления транзакцией каждый раз!

Однако, могут ли быть ситуации, когда этого механизма будет недостаточно? или возможен ли контекст, в котором аннотациями обойтись не удастся? Боюсь, ответ очевиден. К примеру, мы можем захотеть выполнить часть кода в одной транзакции, а другую часть – во второй (тут скорее архитектурно верным будет разделить метод на два). У читателей, думаю, есть и свои примеры.

Более реалистичен пример, когда часть кода нужно выполнить асинхронно:

@Service public class FooServiceImpl implements FooService {     @Autowired     private TaskExecutor taskExecutor;     @Autowired     private FooDao fooDao;      @Transactional     @Override     public void update(Foo entity)  {         fooDao.merge(entity);         taskExecutor.run(new Runnable() {             public void run() {                 someLongTimeOperation(entity);             }         });     }          @Transactional     @Override     public void someLongTimeOperation(Foo entity)  {         // тут набор ресурсоёмких операций     } } 

Что же получается: до старта метода update() создаётся транзакция, затем выполняются операции из тела, а по выходу из метода транзакция закрывается. Но в нашем случае создаётся новый поток, в котором будет исполнен код. И весьма очевидно, что на момент выхода из метода update() и сопутствующего уничтожения транзакции, выполнение кода во втором запущенном потоке может/будет продолжаться. Как итог, по завершению метода, во втором потоке получим исключение и вся транзакция «ролбэкнится».

К предудщему примеру добавим ручное создание транзакции:

@Service public class FooServiceImpl implements FooService {     @Autowired     private TaskExecutor taskExecutor;     @Autowired     private FooDao fooDao;      @Transactional     @Override     public void update(final Foo entity)  {        fooDao.merge(entity);        final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);        taskExecutor.execute(new Runnable() {             @Override             public void run() {                 transactionTemplate.execute(new TransactionCallbackWithoutResult() {                     @Override                     protected void doInTransactionWithoutResult(TransactionStatus status) {                         someLongTimeOperation(entity);                     }                 });             }         });          }      @Transactional     @Override     public void someLongTimeOperation(Foo entity)  {         // тут набор ресурсоёмких операций     } } 

Теперь someLongTimeOperation() исполняется асинхронно и в выделенной транзакции. Однако, хочется обощённой реализации, что бы не дублировать громоздкий код ручного управления.

Что ж… вот и она:

public interface TransactionalAsyncTaskExecutor extends AsyncTaskExecutor {     void execute(Runnable task, Integer propagation, Integer isolationLevel); } 

public class DelegatedTransactionalAsyncTaskExecutor implements InitializingBean, TransactionalAsyncTaskExecutor {     private PlatformTransactionManager transactionManager;     private AsyncTaskExecutor delegate;     private TransactionTemplate sharedTransactionTemplate;      public DelegatedTransactionalAsyncTaskExecutor() {     }      public DelegatedTransactionalAsyncTaskExecutor(PlatformTransactionManager transactionManager, AsyncTaskExecutor delegate) {         this.transactionManager = transactionManager;         this.delegate = delegate;     }      @Override     public void execute(final Runnable task, Integer propagation, Integer isolationLevel) {         final TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);         transactionTemplate.setPropagationBehavior(propagation);         transactionTemplate.setIsolationLevel(isolationLevel);         delegate.execute(new Runnable() {             @Override             public void run() {                 transactionTemplate.execute(new TransactionCallbackWithoutResult() {                     @Override                     protected void doInTransactionWithoutResult(TransactionStatus status) {                         task.run();                     }                 });             }         });     }      @Override     public void execute(final Runnable task) {         execute(task, TransactionDefinition.PROPAGATION_REQUIRED, TransactionDefinition.ISOLATION_DEFAULT);     }      @Override     public void execute(final Runnable task, long startTimeout) {         final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();         delegate.execute(new Runnable() {             @Override             public void run() {                 transactionTemplate.execute(new TransactionCallbackWithoutResult() {                     @Override                     protected void doInTransactionWithoutResult(TransactionStatus status) {                         task.run();                     }                 });             }         }, startTimeout);      }      @Override     public Future<?> submit(final Runnable task) {         final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();         return delegate.submit(new Runnable() {             @Override             public void run() {                 transactionTemplate.execute(new TransactionCallbackWithoutResult() {                     @Override                     protected void doInTransactionWithoutResult(TransactionStatus status) {                         task.run();                     }                 });             }         });     }      @Override     public <T> Future<T> submit(final Callable<T> task) {         final TransactionTemplate transactionTemplate = getSharedTransactionTemplate();         return delegate.submit(new Callable<T>() {             @Override             public T call() throws Exception {                 return transactionTemplate.execute(new TransactionCallback<T>() {                     @Override                     public T doInTransaction(TransactionStatus status) {                         T result = null;                         try {                             result = task.call();                         } catch (Exception e) {                             e.printStackTrace();                             status.setRollbackOnly();                         }                         return result;                     }                 });             }         });     }      public PlatformTransactionManager getTransactionManager() {         return transactionManager;     }      public void setTransactionManager(PlatformTransactionManager transactionManager) {         this.transactionManager = transactionManager;     }      public AsyncTaskExecutor getDelegate() {         return delegate;     }      public void setDelegate(AsyncTaskExecutor delegate) {         this.delegate = delegate;     }      public TransactionTemplate getSharedTransactionTemplate() {         return sharedTransactionTemplate;     }      public void setSharedTransactionTemplate(TransactionTemplate sharedTransactionTemplate) {         this.sharedTransactionTemplate = sharedTransactionTemplate;     }      @Override     public void afterPropertiesSet() {         if (transactionManager == null) {             throw new IllegalArgumentException("Property 'transactionManager' is required");         }         if (delegate == null) {             delegate = new SimpleAsyncTaskExecutor();         }         if (sharedTransactionTemplate == null) {             sharedTransactionTemplate = new TransactionTemplate(transactionManager);         }     } } 

Это реализация есть обёртка, в итоге делигирующая вызовы к любому TaskExecutor коих несколько в составе Spring. При этом каждый вызов «завёрнут» в транзакцию. Вручную управлять транзакциями в Spring можно используя TransactionTemplate, а вот EntityManager#getTransaction() выдаёт исключение.

Ну и наконец около практический пример в действии:

Конфигурируем TaskExecutor:

    <bean id="transactionalTaskExecutor" class="ru.habrahabr.support.spring.DelegatedTransactionalAsyncTaskExecutor">         <property name="delegate">             <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">                 <property name="threadNamePrefix" value="Habrahabr example - "/>                 <property name="threadGroupName" value="Habrahabr examples Group"/>                 <property name="corePoolSize" value="10"/>                 <property name="waitForTasksToCompleteOnShutdown" value="true"/>             </bean>         </property>         <property name="transactionManager" ref="transactionManager"/>     </bean> 

Пример сервиса:

@Service public class FooServiceImpl implements FooService {     @Autowired     private TransactionalAsyncTaskExecutor trTaskExecutor;     @Autowired     private FooDao fooDao;      @Transactional     @Override     public void update(Foo entity)  {         fooDao.merge(entity);                                   // Выполнится в транзакции созданной Spring'ом (tr_1).         trTaskExecutor.run(new Runnable() {       // Запустится новый поток и новая транзакция (tr_2), метод run() выполнится паралельно текущему потоку и в рамках транзакции tr_2.             public void run() {                 someLongTimeOperation();             }         });     }                                                                             // Выход из метода и вместе с этим tr_1 завершится. Обрабока tr_2 осуществится с помощью TransactionTemplate.           @Transactional     @Override     public void someLongTimeOperation(Foo entity)  {         // тут набор ресурсоёмких операций     } } 

Таким образом, имеем вполне обобщённую реализацию обёртки для TaskExecutor, позволяющую избежать дублирования кода создания транзакций.

ссылка на оригинал статьи http://habrahabr.ru/post/228953/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *