Высокопроизводительный SUN/ONCRPC сервер на Java NIO

от автора

В статьe о dCache рассказано о том, как использовать его в качестве NFS сервера. Но функциональной совместимости с существующими клиентами недостаточно, чтобы системой можно было пользоваться. Производительность тоже должна быть на высоте. Рабочей лошадкой NFS протокола является ONCRPC протокол. В dCache мы используем собственную реализацию, основанную на grizzly nio framework.

Немного истории для молодых

ONC RPC (Open Network Computing Remote Procedure Call) — протокол, созданный Sun Microsystems в конце 80х и опубликован в 1995г вместе с NFSv2. ONCRPC получил быстрое распространение и широко использовался, пока в начале 2000 не был вытеснен модными альтернативами, как CORBA, SOAP, а позже REST и JSON-RPC. Тем не менее, ONCRPC всё ещё используется, где простота и скорость важнее моды — в сетевых файловых системах.

Реализация

Чтобы не изобретать очередной велосипед, вначале мы использовали реализацию Remote Tea, но вскоре столкнулись с ограничениями, которые не могли легко решить: IPv6, GSSAPI, NIO. Так что велосипед пришлось изобретать, но не с нуля. Мы максимально сохранили совместимость с RemoteTea и адаптировали уже написанный код.

Grizzly-NIO

В основу мы взяли grizzly-nio, используемый в glassfish. Как и все современные NIO фраймворки, grizzly основан на обработке событий и шаблоне цепочка обязанностей. Т.е., мы описываем цепь фильтров, которые вызываются при определённом событии.

package org.glassfish.grizzly.filterchain;  import java.io.IOException;  public interface Filter {     public void onAdded(FilterChain fc);     public void onRemoved(FilterChain fc);     public void onFilterChainChanged(FilterChain fc);     public NextAction handleRead(FilterChainContext fcc) throws IOException;     public NextAction handleWrite(FilterChainContext fcc) throws IOException;     public NextAction handleConnect(FilterChainContext fcc) throws IOException;     public NextAction handleAccept(FilterChainContext fcc) throws IOException;     public NextAction handleEvent(FilterChainContext fcc, FilterChainEvent fce) throws IOException;     public NextAction handleClose(FilterChainContext fcc) throws IOException;     public void exceptionOccurred(FilterChainContext fcc, Throwable thrwbl); } 

Методы handleXXXX возвращают NextAction, который может быть StopAction или ContinueAction. Если фильтр возвращает StopAction, то обработка цепочки останавливается. В основном, нас интересуют handleRead и handleWrite, которые вызываются при чтении и записи сетевого соединения.

    @Override     public NextAction handleRead(FilterChainContext ctx) throws IOException {          Buffer messageBuffer = ctx.getMessage();         if (!isMessageArrived(messageBuffer)) {             // пришла только часть сообщения             // ждём остальную часть             return ctx.getStopAction(messageBuffer);         }         // читаем полное сообщение         ctx.setMessage(getMessage(messageBuffer));         return ctx.getInvokeAction();     } 
Боевой код

import java.io.IOException;  import java.nio.ByteOrder; import org.glassfish.grizzly.Buffer; import org.glassfish.grizzly.filterchain.BaseFilter; import org.glassfish.grizzly.filterchain.FilterChainContext; import org.glassfish.grizzly.filterchain.NextAction; import org.glassfish.grizzly.memory.BuffersBuffer;  public class RpcMessageParserTCP extends BaseFilter {      /**      * RPC fragment record marker mask      */     private final static int RPC_LAST_FRAG = 0x80000000;     /**      * RPC fragment size mask      */     private final static int RPC_SIZE_MASK = 0x7fffffff;      @Override     public NextAction handleRead(FilterChainContext ctx) throws IOException {          Buffer messageBuffer = ctx.getMessage();         if (messageBuffer == null) {             return ctx.getStopAction();         }          if (!isAllFragmentsArrived(messageBuffer)) {             return ctx.getStopAction(messageBuffer);         }          ctx.setMessage(assembleXdr(messageBuffer));          final Buffer reminder = messageBuffer.hasRemaining()                 ? messageBuffer.split(messageBuffer.position()) : null;          return ctx.getInvokeAction(reminder);     }      @Override     public NextAction handleWrite(FilterChainContext ctx) throws IOException {          Buffer b = ctx.getMessage();         int len = b.remaining() | RPC_LAST_FRAG;          Buffer marker = GrizzlyMemoryManager.allocate(4);         marker.order(ByteOrder.BIG_ENDIAN);         marker.putInt(len);         marker.flip();         marker.allowBufferDispose(true);         b.allowBufferDispose(true);         Buffer composite = GrizzlyMemoryManager.createComposite(marker, b);         composite.allowBufferDispose(true);         ctx.setMessage(composite);         return ctx.getInvokeAction();     }      private boolean isAllFragmentsArrived(Buffer messageBuffer) throws IOException {         final Buffer buffer = messageBuffer.duplicate();         buffer.order(ByteOrder.BIG_ENDIAN);          while (buffer.remaining() >= 4) {              int messageMarker = buffer.getInt();             int size = getMessageSize(messageMarker);              /*              * fragmen size bigger than we have received              */             if (size > buffer.remaining()) {                 return false;             }              /*              * complete fragment received              */             if (isLastFragment(messageMarker)) {                 return true;             }              /*              * seek to the end of the current fragment              */             buffer.position(buffer.position() + size);         }          return false;     }      private static int getMessageSize(int marker) {         return marker & RPC_SIZE_MASK;     }      private static boolean isLastFragment(int marker) {         return (marker & RPC_LAST_FRAG) != 0;     }      private Xdr assembleXdr(Buffer messageBuffer) {          Buffer currentFragment;         BuffersBuffer multipleFragments = null;          boolean messageComplete;         do {             int messageMarker = messageBuffer.getInt();              int size = getMessageSize(messageMarker);             messageComplete = isLastFragment(messageMarker);              int pos = messageBuffer.position();             currentFragment = messageBuffer.slice(pos, pos + size);             currentFragment.limit(size);              messageBuffer.position(pos + size);             if (!messageComplete & multipleFragments == null) {                 /*                  * we use composite buffer only if required                  * as they not for free.                  */                 multipleFragments = GrizzlyMemoryManager.create();             }              if (multipleFragments != null) {                 multipleFragments.append(currentFragment);             }         } while (!messageComplete);          return new Xdr(multipleFragments == null ? currentFragment : multipleFragments);     } } 

Если мы остановили цепь из-за недостатка данных, то следующий вызов handleRead будет содержать композитный буфер( состоящий из нескольких буферов).

Примитивный сервер выглядит так

    public static void main(String[] args) throws IOException {          FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();         filterChainBuilder.add(new TransportFilter());         filterChainBuilder.add(new /* здесь парсер */);         filterChainBuilder.add(new /* здесь обработчик */);          final TCPNIOTransport transport =                 TCPNIOTransportBuilder.newInstance().build();         transport.setProcessor(filterChainBuilder.build());         transport.bind(HOST, PORT);         transport.start();         System.in.read();     } 

На странице проекта можно найти много примеров. По умолчанию, grizzly создаст столько тредов, сколько на машине имеется процессоров. Этот подход хорошо зарекомендовал себя на практике. На машине с 24 ядрами, наш NFS сервер с лёгкостью обслуживает порядка тысячи клиентов.

Сам проект активно развивается, и команда разработчиков быстро реагирует как на сообщения об ошибках, так и на посылаемые патчи и рекомендации.

oncrpc4j

Весь ONCRPC код оформлен в виде простой для использования отдельной библиотеки. Поддерживаются два типичных варианта интеграции — сервис, встроенный в приложение или сервис, инициализируемый как Spring bean.

Встроенное приложение

import org.dcache.xdr.RpcDispatchable; import org.dcache.xdr.RpcCall; import org.dcache.xdr.XdrVoid; import org.dcache.xdr.OncRpcException;  public class Svcd {     private static final int DEFAULT_PORT = 1717;     private static final int PROG_NUMBER = 111017;     private static final int PROG_VERS = 1;      public static void main(String[] args) throws Exception {         RpcDispatchable dummy = new RpcDispatchable() {             @Override             public void dispatchOncRpcCall(RpcCall call)                           throws OncRpcException, IOException {                 call.reply(XdrVoid.XDR_VOID);             }         };         OncRpcSvc service = new OncRpcSvcBuilder()                 .withTCP()                 .withAutoPublish()                 .withPort(DEFAULT_PORT)                 .withSameThreadIoStrategy()                 .build();         service.register(new OncRpcProgram(PROG_NUMBER, PROG_VERS), dummy);         service.start();     } } 
Интеграция со Spring

Я не боюсь XML

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">      <bean id="my-rpc-svc" class="me.mypackage.Svcd">         <description>My RPC service</description>     </bean>       <bean id="my-rpc" class="org.dcache.xdr.OncRpcProgram">         <description>My RPC program number</description>         <constructor-arg index="0" value="1110001" />         <constructor-arg index="1" value="1" />     </bean>      <bean id="rpcsvc-builder" class="org.dcache.xdr.OncRpcSvcFactoryBean">         <description>Onc RPC service builder</description>         <property name="port" value="1717"/>         <property name="useTCP" value="true"/>     </bean>      <bean id="oncrpcsvc" class="org.dcache.xdr.OncRpcSvc" init-method="start" destroy-method="stop">         <description>My RPC service</description>         <constructor-arg ref="rpcsvc-builder"/>         <property name="programs">             <map>                 <entry key-ref="my-rpc" value-ref="my-rpc-svc"/>             </map>         </property>     </bean> </beans> 

Производительность

Как видно из графика, код на яве не только не медленнее написанного на ‘C’, но и обгоняет линуксовское ядро (из-за бага, который, надеюсь, уже починили).

To steal and contribute code

Код доступен на гитхабе под LGPL лицензией.

ссылка на оригинал статьи http://habrahabr.ru/post/199196/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *