Проект на GitHub находится в стадии ранней альфы. Эта статья дополняет README проекта.
Сетевые события можно записывать и вычитывать из стороннего хранилища по своему вкусу. Каждое событие выглядит так:
public interface NetworkEvent { UUID getConnectionId(); int getSerial(); EventType getType(); byte[] getPayload();}
Они рассказывают о некотором сетевом изменении (соединение открылось, пришли данные, соединение закрылось или оборвано) и поддаются обработке через Consumer<NetworkEvent>, причём обработчики можно выстраивать в цепочки и лабиринты для фильтрации, троттлинга, распараллеливания обработки и т.п.
Если два процесса умеют записывать такие события в хранилище вычитывать записанное с другой стороны — между ними налажено взаимодействие, при том, что прямых сетевых подключений друг к другу они не производят.
Пример использования этого очевиднейшего принципа — HTTP-прокси. Один процесс — приёмник прокси, «мама», то, что слушает на порту 3128 обычно. Он соединения принимает, но вместо того, чтобы как классический прокси тут же обратиться на запрошенные адреса и передать им полученные запросы — сохраняет запросы в виде NetworkEvent и кладёт в хранилище. А в это время совсем в другом месте передатчик прокси, «папа», уже вычитывает запросы из хранилища, открывает соединения на своём сетевом стеке, получает ответы и передаёт их через NetworkEvent и хранилище «маме».
Вся суть моего прототипа в том, что транспорт (чтение и запись хранилища) пишет пользователь под свои сиюминутные нужды. Я приведу пример транспорта на основе H2SQL практически целиком. Чтение:
public class H2Retrieval extends PollingRetrievalSupport { private final String connectionUrl; private final String tableName; private final Consumer<NetworkEvent> targetConsumer; private final AtomicLong lastProcessedId = new AtomicLong(); public H2Retrieval(String databasePath, String tableName, Consumer<NetworkEvent> targetConsumer, long pollIntervalMs) { super(pollIntervalMs); this.connectionUrl = String.format( "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000", Paths.get(databasePath).toAbsolutePath() ); this.tableName = tableName; this.targetConsumer = targetConsumer; } protected void poll() { String selectSQL = String.format(""" SELECT id, connection_id, serial, event_type, payload FROM %s WHERE id > ? ORDER BY id ASC """, tableName); try (Connection conn = DriverManager.getConnection(connectionUrl); PreparedStatement pstmt = conn.prepareStatement(selectSQL)) { pstmt.setLong(1, lastProcessedId.get()); long eventId = lastProcessedId.get(); try (ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { eventId = rs.getLong("id"); NetworkEvent event = extractEvent(rs); targetConsumer.accept(event); } } finally { long finalEventId = eventId; lastProcessedId.updateAndGet(val -> Math.max(val, finalEventId)); } } catch (SQLException e) { throw new RuntimeException("Failed to poll events", e); } } private NetworkEvent extractEvent(ResultSet rs) throws SQLException { UUID connectionId = (UUID) rs.getObject("connection_id"); int serial = rs.getInt("serial"); EventType type = EventType.valueOf(rs.getString("event_type")); byte[] payload = rs.getBytes("payload"); return NetworkEvent.create(connectionId, serial, type, payload); }}
И запись:
public class H2Storage implements Consumer<NetworkEvent> { private final String connectionUrl; private final String tableName; public H2Storage(String databasePath, String tableName) { this.connectionUrl = String.format( "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000", Paths.get(databasePath).toAbsolutePath() ); this.tableName = tableName; } @Override public void accept(NetworkEvent event) { String insertSQL = String.format(""" INSERT INTO %s (connection_id, serial, event_type, payload) VALUES (?, ?, ?, ?) """, tableName); try (Connection conn = DriverManager.getConnection(connectionUrl); PreparedStatement pstmt = conn.prepareStatement(insertSQL)) { pstmt.setObject(1, event.getConnectionId()); pstmt.setInt(2, event.getSerial()); pstmt.setString(3, event.getType().name()); pstmt.setBytes(4, event.getPayload()); pstmt.executeUpdate(); } catch (SQLException e) { throw new RuntimeException("Failed to store network event", e); } }}
На этом, в общем-то, всё. Я надеюсь, что этот интерфейс не будет становиться сложнее. Он может стать даже проще, если Muxalma посредством своих цепочек преобразователей станет умнее — научится компенсировать потерю пакетов, шейпингу трафика, backpressure и т.п. Реализации этих интерфейсов достаточно, чтобы заработал HTTP-прокси на основе Netty через хранилище, избранное вами. Отслеживание судьбы HTTP-соединений и мультиплекс возьмёт на себя Муксалма.
И здесь вы можете сказать — господин хороший, не ломитесь ли вы в открытую дверь со своей поделкой из трёх с половиной классов? На это я отвечу, что хотел начать именно эту дискуссию, т.к. дверь такую сходу не нашёл, а ещё я писатель, а не читатель, и широко известное в узких кругах всеми признанное решение просто не найду.
ссылка на оригинал статьи https://habr.com/ru/articles/1044330/