Проблема сохранения контекста при асинхронном программировании в scala

от автора

В какой-то момент в проекте встает вопрос отслеживания хода выполнения операции, получения или складирование какой-то информации о ней. Для этого как нельзя лучше служит контекст операции, например, контекст клиентской сессии. Если вам интересно, как это можно сделать относительно безболезненно, прошу под кат.

В мире java, зачастую (но не всегда), каждая операция выполняется в своём потоке. И тут всё получается довольно просто, можно воспользоваться ThreadLocal объектом и получать его в любом момент выполнения операции:

class Context {   public static final ThreadLocal<Context> global = new ThreadLocal<Context>; }   //где-то в месте вызова операции Context context = new Context(...); Context.global.set(context); try {   someService.someMethod(); } finally {   Context.global.set(null); } 

В scala же, зачастую, всё не так просто, и по ходу операции поток может смениться неоднократно, например в очень асинхронных приложениях. И способ с ThreadLocal уже не подходит (как и в случае с переключением потоков в java, конечно же).
Первое, что может прийти в голову, это передавать контекст через имплиситный аргумент функции.

def foo(bar: Bar)(implicit context: Context) 

Но это будет захламлять протокол сервисов. Поломав немного голову, пришла довольно простая идея: привязать контекст к объектам сервиса, и распространять его по внутренним сервисам по мере вызова функций.
Допустим, наш контекст выглядит вот так:

//data - склад для всякой информации касательно операции class Context(val operationId: String, val data: TrieMap[String, String] = TrieMap.empty) 

Создадим трейты, которыми будем помечать контекстно зависимые объекты:

trait ContextualObject {   protected def context: Option[Context] }   //объект, способный менять свой контекст trait ChangeableContextualObject[T <: ContextualObject] extends ContextualObject {   def withContext(ctx: Option[Context]): T }   //объект с пустым контекстом trait EmptyContext {   _: ContextualObject =>     override protected val context: Option[Context] = None } 

Теперь объявим наши сервисы и реализации:

//Говорим, что наш сервис может изменять контекст trait ServiceA extends ChangeableContextualObject[ServiceA] {   def someSimpleOperation: Int     def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] }   trait ServiceAImpl extends ServiceA {     override def someSimpleOperation: Int = 1     override def someLongOperation(implicit executionContext: ExecutionContext): Future[Int] = {     Future(someSimpleOperation)       .map { res =>         //запишем какие-нибудь данные в контекст выполнения, если он присутствует         context.foreach(_.data.put("ServiceA.step1", res.toString))         res * Random.nextInt(10)       }       .map { res =>         context.foreach(_.data.put("ServiceA.step2", res.toString))         res - Random.nextInt(5)       }       .andThen {         case Success(res) => context.foreach(_.data.put("ServiceA.step3", res.toString))       }   }     //создаём сервис с нужным нам контекстом   override def withContext(ctx: Option[Context]): ServiceA = new ServiceAImpl {     ctx.foreach(_.data.put("ServiceA.withContext", "true"))     override protected def context: Option[Context] = ctx   } }   object ServiceAImpl {   def apply(): ServiceAImpl = new ServiceAImpl with EmptyContext } 

И второй сервис, который будет использовать первый:

trait ServiceB extends ChangeableContextualObject[ServiceB] {   def someOperationWithoutServiceA: Int     def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] }   /**   * При просмотре предыдущего и текущего сервиса мог возникнуть вопрос:    * почему это не класс и почему сервис А указан как абстрактный метод?   * частично ответом является примешивание EmptyContext при создании сервиса,   * но основная причина заключена в функции withContext.   * Также, как бонус, в этом случае можно использовать cake pattern при создании объекта   */ trait ServiceBImpl extends ServiceB {   self =>   protected def serviceA: ServiceA     override def someOperationWithoutServiceA: Int = 1     override def someOperationWithServiceA(implicit executionContext: ExecutionContext): Future[Boolean] = {     serviceA.someLongOperation.map {       case res if res % 2 == 0 =>         context.foreach(_.data.put("ServiceB.res", "even"))         true         case res =>         context.foreach(_.data.put("ServiceB.res", "odd"))         false     }   }     override def withContext(ctx: Option[Context]): ServiceB = new ServiceBImpl {     ctx.foreach(_.data.put("ServiceB.withContext", "true"))     override protected val context: Option[Context] = ctx     // собственно, тот факт, что мы объявили сервис А как функцию     // позволяет нам переопределить ее как lazy val,     // и этот сервис будем инициализирован с новым контекстом, только если это будет нужно.     // Это я и назвал распространением контекста     override protected lazy val serviceA: ServiceA = self.serviceA.withContext(ctx)   } }   object ServiceBImpl {   // Есть небольшой недостаток - нужно либо называть аргументы именами отличными от тех,   // что используются в классе, либо помещать их в отдельную переменную внутри функции.   // Но есть еще вариант объявлять так:   // class Builder(val serviceA: ServiceA) extends ServiceBImpl with EmptyContext   // И в месте вызова:   // new ServiceBImpl.Builder(serviceA)   // Имя, возможно, не самое удачное, но идея должна быть понятна.   def apply(a: ServiceA): ServiceBImpl = new ServiceBImpl with EmptyContext {     //  а в этом месте его можно объявить как val     override protected val serviceA: ServiceA = a   } } 

В итоге, в месте вызова мы получим следующий код:

val context = new Context("opId") val serviceBWithContext = serviceB.withContext(Some(context)) serviceBWithContext.someOperationWithoutServiceA context.data.get("ServiceB.withContext") // Some("true") context.data.get("ServiceA.withContext") // None   serviceBWithContext.someOperationWithServiceA.andThen {   case _ =>      context.data.get("ServiceA.withContext") // Some("true")     context.data.get("ServiceA.step1") // Some("1") } 

Всё довольно просто — таким образом, по ходу операции будет один и тот же контекст. Но нужно для этого всего найти какое-то реальное применение. Например, мы в ходе операции записывали важную информацию, и теперь хотим эту информацию залогировать. Самым простым вариантом было создавать логгер для каждого контекста, и при записи в лог к сообщению приписывать эту информацию в нём. Но появляется проблема логирования, которое происходит вне вашего кода (например, в сторонней библиотеке).

Для того, чтобы контекст можно было использовать вне нашего кода, сделаем ThreadLocal с нашим контекстом:

object Context {   val global: ThreadLocal[Option[Context]] = ThreadLocal.withInitial[Option[Context]](() => None)     //Запустить операцию в контексте   def runWith[T](context: Context)(operation: => T): T = {     runWith(Some(context))(operation)   }     //Запустить операцию в контексте   def runWith[T](context: Option[Context])(operation: => T): T = {     val old = global.get()     global.set(context)     // после завершения вернем старое значение на всякий случай     try operation finally global.set(old)    } } 

Например, если вы используете библиотеку logback-classic для логирования, то вы можете написать свой Layout для логирования этих параметров.

Возможная реализация

class OperationContextLayout extends LayoutBase[ILoggingEvent] {   private val separator: String = System.getProperty("line.separator")     override def doLayout(event: ILoggingEvent): String = {     val sb = new StringBuilder(256)     sb.append(event.getFormattedMessage)       .append(separator)       appendContextParams(sb)     appendStack(event, sb)     sb.toString()   }     private def appendContextParams(sb: StringBuilder): Unit = {     Context.global.get().foreach { ctx =>       sb.append("operationId=")         .append(ctx.operationId)         ctx.data.readOnlySnapshot().foreach {         case (key, value) =>           sb.append(" ").append(key).append("=").append(value)       }         sb.append(separator)     }   }     private def appendStack(event: ILoggingEvent, sb: StringBuilder): Unit = {     if (event.getThrowableProxy != null) {       val converter = new ThrowableProxyConverter       converter.setOptionList(List("full").asJava)       converter.start()         sb.append()     }   } } 

Возможный конфиг

<configuration>       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">         <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">             <layout class="operation.context.logging.OperationContextLayout" />         </encoder>     </appender>       <root level="debug">         <appender-ref ref="STDOUT" />     </root> </configuration> 

И попробуем что-нибудь залогировать:

  def runWithoutA(): Unit = {     val context = Some(createContext())     val res = serviceB.withContext(context).someOperationWithoutServiceA     Context.runWith(context) {       // Result of someOperationWithoutServiceA: '1'       // operationId=GPapC6JKmY ServiceB.withContext=true       logger.info(s"Result of someOperationWithoutServiceA: '$res'")     }   } 

  def runWithA(): Future[_] = {     val context = Some(createContext())     serviceB.withContext(context).someOperationWithServiceA.andThen {       case _ =>         Context.runWith(context) {           // someOperationWithServiceA completed           // operationId=XU1SGXPq1N ServiceB.res=even ServiceA.withContext=true ServiceB.withContext=true ServiceA.step1=1 ServiceA.step2=7 ServiceA.step3=4           logger.info("someOperationWithServiceA completed")         }     }   } 

И остался вопрос: как же быть с внешним кодом, который запускается в ExecutionContext? Но нам же никто не мешает написать враппер для него:

Возможная реализация враппера

class ContextualExecutionContext(context: Option[Context], executor: ExecutionContext) extends ExecutionContext {     override def execute(runnable: Runnable): Unit = executor.execute(() => {     Context.runWith(context)(runnable.run())   })     override def reportFailure(cause: Throwable): Unit = {     Context.runWith(context)(executor.reportFailure(cause))   }   }   object ContextualExecutionContext {   implicit class ContextualExecutionContextOps(val executor: ExecutionContext) extends AnyVal {     def withContext(context: Option[Context]): ContextualExecutionContext = new ContextualExecutionContext(context, executor)   } } 

Возможная реализация внешней системы

class SomeExternalObject {   val logger: Logger = LoggerFactory.getLogger(classOf[SomeExternalObject])     def externalCall(implicit executionContext: ExecutionContext): Future[Int] = {     Future(1).andThen {       case Success(res) => logger.debug(s"external res $res")     }   } } 

Попробуем сделать вызов в нашем ExecutionContext:

  def runExternal(): Future[_] = {     val context = Some(createContext())     implicit val executor = global.withContext(context)     // external res 1     // operationId=8Hf277SV7B     someExternalObject.externalCall   } 

Вот и вся идея. На самом деле, использование контекста не ограничивается только логированием. Можно хранить в этом контексте всё, что угодно. Например, слепок каких-то состояний, если нужно, чтобы все сервисы во время операции работали с одинаковыми данными. И так далее, и так далее.

Если есть необходимость в реализации слежения за контекстом при общении акторов, пишите в комментариях, дополню статью. Если есть идеи по поводу другой реализации, также пишете в комментариях, будет интересно почитать.

P.S. исходный код проекта, используемый в статье github.com/eld0727/scala-operation-context
P.P.S. Я уверен, что данный подход может быть применен и к других языкам, позволяющим создавать анонимные классы, и это всего лишь возможная реализация на scala.
ссылка на оригинал статьи https://habrahabr.ru/post/323682/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *