Изучай observable, создавая observable

от автора

Эта статья — перевод оригинальной статьи Ben Lesh “Learning Observable By Building Observable”. Также я веду телеграм канал “Frontend по-флотски”, где рассказываю про интересные вещи из мира разработки интерфейсов ​

Вступление

Это повторение старой статьи, написанной мною в 2016 году, и доклада, с которым я выступал несколько раз. Я хочу немного модернизировать контент и, надеюсь, упростить его. Цель — помочь людям понять, что такое observable. Не только observable из RxJS, но и любой observable (да, их больше одного) как тип.

Observables — просто функции

Чтобы понять это, я хочу разместить рядом два примера. Один является observable из RxJS, а другой — просто функцией. Оба этих примера имеют одинаковый результат:

RxJS Observable

Это пример простого observable, созданного с помощью RxJS, который выдает три числа и завершается. Если вы уже знакомы с RxJS, это эквивалент (1, 2, 3).

import { Observable } from 'rxjs';   const source = new Observable<number>((subscriber) => {   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   subscriber.complete(); });   // Usage console.log('start'); source.subscribe({   next: console.log,   complete: () => console.log('done'), }); console.log('stop');

Функция “observable”

/**  * A simple object with a `next` and `complete` callback on it.  */ interface Observer<T> {   next: (value: T) => void;   complete: () => void; }   /**  * A function that takes a simple object with callbacks  * and does something them.  */ const source = (subscriber: Observer<number>) => {   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   subscriber.complete(); };   // Usage console.log('start'); source({   next: console.log,   complete: () => console.log('done'), }); console.log('stop');

Вывод (обоих!)

"start" 1 2 3 "done" "stop"

Я хочу, чтобы вы обратили внимание на сходство. В обоих случаях вы передаете объект с помощью метода next и complete. В обоих случаях вы вызываете next и complete в теле функции. В обоих случаях тело функции не выполняется до тех пор, пока вы не вызовете source.subscribe() или просто вызовете функцию напрямую как source() в другом примере. Это потому, что observables — это просто специализированные функции.

Почему бы просто не использовать функции? Что «особенного» в observable?

Отлично подмечено. Если бы вы были очень осторожны, вы, вероятно, могли бы использовать обычные функции для некоторых из этих случаев. Но проблема в том, что версия с функциями не совсем «безопасна». На основе всего лишь одного примера, если семантика observable такова, что next никогда не должен вызываться после завершения, нам понадобятся некоторые гарантии для этого.

const source = function (subscriber: Observer<number>) {   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   subscriber.complete();   subscriber.next(4); // Упс, этого не должно быть };

В примере выше будет написано «done», а затем сразу 4. Такого быть вообще не должно! Поэтому мы хотим предоставить способ гарантировать, что следующий метод нашего подписчика не будет вызван после завершения. Это можно сделать, заключив нашу функцию в класс.

/**  * A class used to wrap a user-provided Observer. Since the  * observer is just a plain objects with a couple of callbacks on it,  * this type will wrap that to ensure `next` does nothing if called after  * `complete` has been called, and that nothing happens if `complete`  * is called more than once.  */ class SafeSubscriber<T> {   closed = false;     constructor(private destination: Observer<T>) {}     next(value: T) {     // Check to see if this is "closed" before nexting.     if (!this.closed) {       this.destination.next(value);     }   }     complete() {     // Make sure we're not completing an already "closed" subscriber.     if (!this.closed) {       // We're closed now.       this.closed = true;       this.destination.complete();     }   } }   /**  * A class to wrap our function, to ensure that when the function is  * called with an observer, that observer is wrapped with a SafeSubscriber  */ class Observable<T> {   constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {}     subscribe(observer: Observer<T>): void {     // We can wrap our observer in a "safe subscriber" that     // does the work of making sure it's not closed.     const subscriber = new SafeSubscriber(observer);     this._wrappedFunc(subscriber);   } }   // Usage // Now 4 won't be nexted after we complete. const source = new Observable((subscriber) => {   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   subscriber.complete();   subscriber.next(4); // this does nothing. });

Обработка Partial Observers

Другой возможный сценарий — «partial» observer. Другими словами, observable, у которого есть только метод next или complete (или, возможно, метод ошибки, но мы вернемся к этому позже). Теперь мы можем легко разобраться с этим сценарием имея наш тип observable, указанный выше, потому что мы можем реализовать это в нашем SafeSubscriber:

class SafeSubscriber<T> {   closed = false;     constructor(private destination: Partial<Observer<T>>) {}     next(value: T) {     if (!this.closed) {       this.destination.next?.(value); // Note the ?. check here.     }   }     complete() {     if (!this.closed) {       this.closed = true;       this.destination.complete?.(); // And here.     }   } }

Уведомления об ошибке

Уведомить нашего подписчика об ошибке так же просто, как добавить дополнительный обработчик к нашим Observer и SafeSubscriber. Семантика очень похожа на complete выше. И ошибка, и complete считаются прекращением наблюдения.

Observer просто немного изменится, чтобы появился обработчик ошибок:

interface Observer<T> {   next: (value: T) => void;   complete: () => void;   error: (err: any) => void; }

После этого мы можем добавить в наш SafeSubscriber error метод:

class SafeSubscriber<T> {   closed = false;     constructor(private destination: Partial<Observer<T>>) {}     next(value: T) {     if (!this.closed) {       this.destination.next?.(value);     }   }     complete() {     if (!this.closed) {       this.closed = true;       this.destination.complete?.();     }   }     error(err: any) {     if (!this.closed) {       this.closed = true;       this.destination.error?.(err);     }   } }

Источники данных и кейс для отмены подписки

Основной вариант использования observable — обертывание асинхронного источника данных, например WebSocket.

Чтобы сделать что-то подобное, вы можете использовать наш самодельный observable созданный выше, например:

const helloSocket = new Observable<string>((subscriber) => {   // Open a socket.   const socket = new WebSocket('wss://echo.websocket.org');     socket.onopen = () => {     // Once it's open, send some text.     socket.send('Hello, World!');   };     socket.onmessage = (e) => {     // When it echoes the text back (in the case of this particular server)     // notify the consumer.     subscriber.next(e.data);   };     socket.onclose = (e) => {     // Oh! we closed!     if (e.wasClean) {       // ...because the server said it was done.       subscriber.complete();     } else {       // ...because of something bad. Maybe we lost network or something.       subscriber.error(new Error('Socket closed dirty!'));     }   }; });   // Start the websocket and log the echoes helloSocket.subscribe({   next: console.log,   complete: () => console.log('server closed'),   error: console.error, });

Но теперь у нас появилась проблема. Пользователь, который подписывается на наш observable, не имеет возможности отменить его и закрыть сокет. Нам нужен способ прекратить работу. Если бы мы просто использовали функцию, мы могли бы вернуть функцию, содержащую нашу логику прекращения подписки.

const source = (subscriber: Observer<string>) => {   const socket = new WebSocket('wss://echo.websocket.org');     socket.onopen = () => {     socket.send('Hello, World!');   };     socket.onmessage = (e) => {     subscriber.next(e.data);   };     socket.onclose = (e) => {     if (e.wasClean) {       subscriber.complete();     } else {       subscriber.error(new Error('Socket closed dirty!'));     }   };     return () => {     if (socket.readyState <= WebSocket.OPEN) {       socket.close();     }   }; };   const teardown = source({   next: console.log,   complete: () => console.log('done'),   error: console.error, });   // Decide you really don't want to keep the socket open. teardown();

Еще одна проблема, которая возникает не обязательно связанная с WebSocket, но и с другими ситуациями, — это ситуации, когда автор observable решает, что он столкнулся с ошибкой или состоянием завершения, и он хочет уведомить пользователя, а затем удалить его. Было бы неплохо иметь единое место для этого, чтобы при вызове subscriber.error или subscriber.complete удаление выполнялось как можно быстрее.

Мы можем добиться всего этого с помощью некоторых изменений SafeSubscriber и добавления типа Subscription.

/**  * Our subscription type. This is to manage teardowns.  */ class Subscription {   private teardowns = new Set<() => void>();     add(teardown: () => void) {     this.teardowns.add(teardown);   }     unsubscribe() {     for (const teardown of this.teardowns) {       teardown();     }     this.teardowns.clear();   } }   class SafeSubscriber<T> {   closed = false;     constructor(     private destination: Partial<Observer<T>>,     private subscription: Subscription,   ) {     // Make sure that if the subscription is unsubscribed,     // we don't let any more notifications through this subscriber.     subscription.add(() => (this.closed = true));   }     next(value: T) {     if (!this.closed) {       this.destination.next?.(value);     }   }     complete() {     if (!this.closed) {       this.closed = true;       this.destination.complete?.();       this.subscription.unsubscribe();     }   }     error(err: any) {     if (!this.closed) {       this.closed = true;       this.destination.error?.(err);       this.subscription.unsubscribe();     }   } }   class Observable<T> {   constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {}     subscribe(observer: Observer<T>) {     const subscription = new Subscription();     const subscriber = new SafeSubscriber(observer, subscription);     subscription.add(this._wrappedFunc(subscriber));     return subscription;   } }   const helloSocket = new Observable<string>((subscriber) => {   const socket = new WebSocket('wss://echo.websocket.org');     socket.onopen = () => {     socket.send('Hello, World!');   };     socket.onmessage = (e) => {     subscriber.next(e.data);   };     socket.onclose = (e) => {     if (e.wasClean) {       subscriber.complete();     } else {       subscriber.error(new Error('Socket closed dirty!'));     }   };     return () => {     if (socket.readyState <= WebSocket.OPEN) {       socket.close();     }   }; });   const subscription = helloSocket.subscribe({   next: console.log,   complete: () => console.log('server closed'),   error: console.error, });   // Later, we can unsubscribe! subscription.unsubscribe();

Это всё! Observables — это просто специализированные функции!

Я знаю, что это было очень МНОГО кода, надеюсь, пошаговое руководство помогло вам понять, что входит в тип observable (например, тип из rxjs), и помогло немного прояснить его. Это уж точно не волшебство.

Тогда я бы посоветовал вам спросить себя: «Для чего нужны observables?» и, честно говоря, самый короткий ответ: вы можете использовать observables для всего, для чего вы можете использовать функцию. Самая большая разница — это небольшой набор гарантий в отношении обратных вызовов и то, что, возможно, наиболее важно, в отношении прекращения работы.

Важно отметить, что описанная выше реализация НЕ является чем-то, что вы должны воссоздавать или использовать в продакшене. Я бы рекомендовал использовать observable из RxJS, поскольку он охватывает гораздо больше ситуаций, чтобы помочь сохранить ваш код надежным и безопасным при составлении сложных потоков данных.

ссылка на оригинал статьи https://habr.com/ru/post/568064/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *