Angular2 + Websocket + RxJS + Rails5

от автора

image

Всем привет! Эта статья о том как связать клиентское приложение Angular2 с Rails 5 сервером используя Websocket.

Rails

Для примера нам потребуется простейший ApplicationCable::Channel:

class ChatChannel < ApplicationCable::Channel   def subscribed     stream_from 'chat_channel'   end    def index     ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }   end    def create( data )     Message.create( name: data[ 'name' ], text: data[ 'text' ] );     ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) }   end    def unsubscribed   end end

Angular2

WebSocketService

Для начала нам потребуется сервис, который будет обеспечивать наше приложение обменом данных с сервером:

import { Injectable }                           from "@angular/core"; import { Subject, Observable, Subscription }    from 'rxjs/Rx'; import { WebSocketSubject }                     from "rxjs/observable/dom/WebSocketSubject";  @Injectable() export class WebSocketService {      private ws: WebSocketSubject<Object>;     private socket: Subscription;     private url: string;      public message: Subject<Object> = new Subject();     public opened: Subject<boolean> = new Subject();      public close():void{         this.socket.unsubscribe();         this.ws.complete();     }      public sendMessage( message:string ):void{         this.ws.next( message );     }      public start( url: string ):void{         let self = this;          this.url = url;          this.ws = Observable.webSocket( this.url );          this.socket = this.ws.subscribe( {                          next: ( data:MessageEvent ) => {                 if( data[ 'type' ] == 'welcome' ){                     self.opened.next( true );                 }                 this.message.next( data );             },             error: () => {                  self.opened.next( false );                 this.message.next( { type: 'closed' } );                  self.socket.unsubscribe();                  setTimeout( () => {                     self.start( self.url );                 }, 1000 );              },             complete: () => {                 this.message.next( { type: 'closed' } );             }                      } );              }      } 

В данном сервисе есть 3 приватные и 2 публичные переменные, а также 3 публичные функции.

 private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string; 

ws — наблюдаемая переменная WebSocketSubject.
socket — переменная для подписки на ws.
url — ссылка на сокет.

 public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject(); 

message — наблюдаемая переменная в которую транслируются все данные из сокета.
opened — наблюдаемая переменная которая следит за открытием/закрытием соединения с сокетом.

 public close():void{     this.socket.unsubscribe();     this.ws.complete(); } 

Функция для закрытия сокета.

 public sendMessage( message:string ):void{     this.ws.next( message ); } 

Функция для отправки данных в сокет.

 public start( url: string ):void{     let self = this;      this.url = url;      this.ws = Observable.webSocket( this.url );      this.socket = this.ws.subscribe( {          next: ( data:MessageEvent ) => {             if( data[ 'type' ] == 'welcome' ){                 self.opened.next( true );             }             this.message.next( data );         },         error: () => {              self.opened.next( false );             this.message.next( { type: 'closed' } );              self.socket.unsubscribe();              setTimeout( () => {                 self.start( self.url );             }, 1000 );          },         complete: () => {             this.message.next( { type: 'closed' } );         }      } );  } 

Эта функция открывает соединение с сокетом, записывая его объект в наблюдаемую переменную и подписывается на трансляцию из нее. При потере связи каждую секунду пытается восстановить связь.

ChannelWebsocketService

Наследуемый сервис для подписки на каналы Rails5 Action Cable:

 import { Injectable }              from "@angular/core"; import { Subject }                 from "rxjs/Subject";  import { WebSocketService }        from "./websocket.service";  @Injectable() export class ChannelWebsocketService {      private socketStarted: boolean;      public observableData: Subject<Object> = new Subject();     public identifier:Object = {};     public identifierStr: string;     public subscribed: Subject<boolean> = new Subject();      constructor( private websocketService: WebSocketService ){          this.observeOpened();         this.observeMessage();      }      private static encodeIdentifier( identifier:string ):Object{          return JSON.parse( identifier );      }      private static getDataString( parameters:Object ):string{          let first = true,             result = '';          for ( let key in parameters ){              if( first ){                  first = false;                 result +=  `\"${ key }\":\"${ parameters[ key ] }\"`;              } else {                  result += `, \"${ key }\":\"${ parameters[ key ] }\"`;              }          }          return `{ ${ result } }`;      }      private getSubscribeString():string{          this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );          return JSON.stringify( {             command: 'subscribe',             identifier: this.identifierStr         } );      };      private isThisChannel( data:Object ):boolean {          if( data[ 'identifier' ] ){              let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );              if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){                  return true;              }          }          return false;      }      private observeMessage(){          let self = this;          this.websocketService.message.subscribe( ( data: Object ) => {              if( self.isThisChannel( data ) ){                  if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){                      this.subscribed.next( true );                  } else if ( data[ 'message' ] ){                      this.observableData.next( data[ 'message' ] );                  }              }          } );      }      private observeOpened(){          let self = this;          this.websocketService.opened.subscribe( ( data: boolean ) => {              self.socketStarted = data;              if( data ){                 self.subscribe();             }          } );      }          private subscribe(){          this.websocketService.sendMessage( this.getSubscribeString() );      }           public send( data: Object ){          this.websocketService.sendMessage( JSON.stringify( {             command:'message',             identifier: this.identifierStr,             data: ChannelWebsocketService.getDataString( data )         } ) );      }      public unsubscribe(){          this.websocketService.sendMessage( JSON.stringify( {             command: 'unsubscribe',             identifier: this.identifierStr } ) );         this.subscribed.next( false );      }  } 

В данном сервисе есть 2 приватные и 3 публичные переменные, а также 7 приватных и 2 публичные функции.

 private socketStarted: boolean; private identifierStr: string; 

socketStarted — переменная в которую транслируется состояние подписки на сокет.
identifierStr — специально подготовленная строка идентификатор для Rails5 Action Cable канала.

 public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public subscribed: Subject<boolean> = new Subject(); 

observableData — наблюдаемая переменная в которую записывается сообщение из сокета для канала.
identifier — объект идентификатор для Rails5 Action Cable канала.
subscribed — наблюдаемая переменная в которую записывается состояние подписки.

 constructor( private websocketService: WebSocketService ){      this.observeOpened();     this.observeMessage();  }  ...  private observeMessage(){      let self = this;      this.websocketService.message.subscribe( ( data: Object ) => {          if( self.isThisChannel( data ) ){              if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){                  this.subscribed.next( true );              } else if ( data[ 'message' ] ){                  this.observableData.next( data[ 'message' ] );              }          }      } );  }  private observeOpened(){      let self = this;      this.websocketService.opened.subscribe( ( data: boolean ) => {          self.socketStarted = data;          if( data ){             self.subscribe();         }      } );  } 

В конструкторе этого сервиса мы вызываем 2 функции: observeMessage и observeOpened, которые отслеживают присланные сокетом данные и состояние сокета соответственно.

 private static encodeIdentifier( identifier:string ):Object{      return JSON.parse( identifier );  }  private static getDataString( parameters:Object ):string{      let first = true,         result = '';      for ( let key in parameters ){          if( first ){              first = false;             result +=  `\"${ key }\":\"${ parameters[ key ] }\"`;          } else {              result += `, \"${ key }\":\"${ parameters[ key ] }\"`;          }      }      return `{ ${ result } }`;  } 

encodeIdentifier — статическая приватная функция декодирует строку идентификатора, которую вернул сокет для идентификации сообщения на принадлежность к каналу.
getDataString — преобразовывает объект в формат строки, который принимает Rails5 Action Cable.

 private getSubscribeString():string{      this.identifierStr = ChannelWebsocketService.getDataString( this.identifier );      return JSON.stringify( {         command: 'subscribe',         identifier: this.identifierStr     } );  }; 

Возвращает строку для подписки на канал Rails5 Action Cable.

 private isThisChannel( data:Object ):boolean {      if( data[ 'identifier' ] ){          let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] );          if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){              return true;          }      }      return false;  } 

Определяет принадлежность сообщения от сокета к данному каналу.

  private subscribe(){      this.websocketService.sendMessage( this.getSubscribeString() );  } 

Подписывает на канал.

 public send( data: Object ){      this.websocketService.sendMessage( JSON.stringify( {         command:'message',         identifier: this.identifierStr,         data: ChannelWebsocketService.getDataString( data )     } ) );  } 

Отправляет данные в канал Rails5 Action Cable;

 public unsubscribe(){      this.websocketService.sendMessage( JSON.stringify( {         command: 'unsubscribe',         identifier: this.identifierStr } ) );     this.subscribed.next( false );  } 

Отписывает от канала.

ChatChannelService

Cервис, унаследованный от ChannelWebsocketService для подписки на канал ChatChannel:

 import { Injectable }                           from "@angular/core";  import { ChannelWebsocketService }    from "./channel.websocket.service"; import { WebSocketService }               from "./websocket.service";  @Injectable() export class ChatChannelService extends ChannelWebsocketService {      constructor( websocketService: WebSocketService ){          super( websocketService );          this.identifier = {             channel: 'ChatChannel'         };      }  } 

В конструкторе этого сервиса переопределяется переменная identifier для идентификации канала.

ChatComponent

Компонент который используя ChatChannelService принимает/отправляет данные в канал.
Пример кода не привожу, он есть в GitHub, ссылка на который приведена ниже

Пример

Здесь можно скачать пример.

Для старта клиентского приложения переходим в папку «client» и вызываем:

npm install gulp 

Надеюсь эта статья поможет разобраться в данном вопросе.
ссылка на оригинал статьи https://habrahabr.ru/post/318040/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *