Muxalma — обмен пакетами данных через общее хранилище

от автора

Проект на GitHub находится в стадии ранней альфы. Эта статья дополняет README проекта.

Сетевые события можно записывать и вычитывать из стороннего хранилища по своему вкусу. Каждое событие выглядит так:

public interface NetworkEvent {    UUID getConnectionId();    int getSerial();    EventType getType();    byte[] getPayload();}

Они рассказывают о некотором сетевом изменении (соединение открылось, пришли данные, соединение закрылось или оборвано) и поддаются обработке через Consumer<NetworkEvent>, причём обработчики можно выстраивать в цепочки и лабиринты для фильтрации, троттлинга, распараллеливания обработки и т.п.

Если два процесса умеют записывать такие события в хранилище вычитывать записанное с другой стороны — между ними налажено взаимодействие, при том, что прямых сетевых подключений друг к другу они не производят.

Пример использования этого очевиднейшего принципа — HTTP-прокси. Один процесс — приёмник прокси, «мама», то, что слушает на порту 3128 обычно. Он соединения принимает, но вместо того, чтобы как классический прокси тут же обратиться на запрошенные адреса и передать им полученные запросы — сохраняет запросы в виде NetworkEvent и кладёт в хранилище. А в это время совсем в другом месте передатчик прокси, «папа», уже вычитывает запросы из хранилища, открывает соединения на своём сетевом стеке, получает ответы и передаёт их через NetworkEvent и хранилище «маме».

Вся суть моего прототипа в том, что транспорт (чтение и запись хранилища) пишет пользователь под свои сиюминутные нужды. Я приведу пример транспорта на основе H2SQL практически целиком. Чтение:

public class H2Retrieval extends PollingRetrievalSupport {    private final String connectionUrl;    private final String tableName;    private final Consumer<NetworkEvent> targetConsumer;    private final AtomicLong lastProcessedId = new AtomicLong();    public H2Retrieval(String databasePath, String tableName, Consumer<NetworkEvent> targetConsumer, long pollIntervalMs) {        super(pollIntervalMs);        this.connectionUrl = String.format(                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",                Paths.get(databasePath).toAbsolutePath()        );        this.tableName = tableName;        this.targetConsumer = targetConsumer;    }    protected void poll() {        String selectSQL = String.format("""                SELECT id, connection_id, serial, event_type, payload                 FROM %s                 WHERE id > ?                 ORDER BY id ASC                """, tableName);        try (Connection conn = DriverManager.getConnection(connectionUrl);             PreparedStatement pstmt = conn.prepareStatement(selectSQL)) {            pstmt.setLong(1, lastProcessedId.get());            long eventId = lastProcessedId.get();            try (ResultSet rs = pstmt.executeQuery()) {                while (rs.next()) {                    eventId = rs.getLong("id");                    NetworkEvent event = extractEvent(rs);                    targetConsumer.accept(event);                }            } finally {                long finalEventId = eventId;                lastProcessedId.updateAndGet(val -> Math.max(val, finalEventId));            }        } catch (SQLException e) {            throw new RuntimeException("Failed to poll events", e);        }    }    private NetworkEvent extractEvent(ResultSet rs) throws SQLException {        UUID connectionId = (UUID) rs.getObject("connection_id");        int serial = rs.getInt("serial");        EventType type = EventType.valueOf(rs.getString("event_type"));        byte[] payload = rs.getBytes("payload");        return NetworkEvent.create(connectionId, serial, type, payload);    }}

И запись:

public class H2Storage implements Consumer<NetworkEvent> {    private final String connectionUrl;    private final String tableName;    public H2Storage(String databasePath, String tableName) {        this.connectionUrl = String.format(                "jdbc:h2:%s;AUTO_SERVER=TRUE;DB_CLOSE_DELAY=-1;LOCK_TIMEOUT=10000",                Paths.get(databasePath).toAbsolutePath()        );        this.tableName = tableName;    }    @Override    public void accept(NetworkEvent event) {        String insertSQL = String.format("""            INSERT INTO %s (connection_id, serial, event_type, payload)             VALUES (?, ?, ?, ?)            """, tableName);        try (Connection conn = DriverManager.getConnection(connectionUrl);             PreparedStatement pstmt = conn.prepareStatement(insertSQL)) {            pstmt.setObject(1, event.getConnectionId());            pstmt.setInt(2, event.getSerial());            pstmt.setString(3, event.getType().name());            pstmt.setBytes(4, event.getPayload());            pstmt.executeUpdate();        } catch (SQLException e) {            throw new RuntimeException("Failed to store network event", e);        }    }}

На этом, в общем-то, всё. Я надеюсь, что этот интерфейс не будет становиться сложнее. Он может стать даже проще, если Muxalma посредством своих цепочек преобразователей станет умнее — научится компенсировать потерю пакетов, шейпингу трафика, backpressure и т.п. Реализации этих интерфейсов достаточно, чтобы заработал HTTP-прокси на основе Netty через хранилище, избранное вами. Отслеживание судьбы HTTP-соединений и мультиплекс возьмёт на себя Муксалма.

И здесь вы можете сказать — господин хороший, не ломитесь ли вы в открытую дверь со своей поделкой из трёх с половиной классов? На это я отвечу, что хотел начать именно эту дискуссию, т.к. дверь такую сходу не нашёл, а ещё я писатель, а не читатель, и широко известное в узких кругах всеми признанное решение просто не найду.

ссылка на оригинал статьи https://habr.com/ru/articles/1044330/