
Всем привет! Эта статья о том как связать клиентское приложение Angular2 с Rails 5 сервером используя Websocket.
Rails
Для примера нам потребуется простейший ApplicationCable::Channel:
class ChatChannel < ApplicationCable::Channel def subscribed stream_from 'chat_channel' end def index ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def create( data ) Message.create( name: data[ 'name' ], text: data[ 'text' ] ); ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def unsubscribed end end
Angular2
WebSocketService
Для начала нам потребуется сервис, который будет обеспечивать наше приложение обменом данных с сервером:
import { Injectable } from "@angular/core"; import { Subject, Observable, Subscription } from 'rxjs/Rx'; import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject"; @Injectable() export class WebSocketService { private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string; public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject(); public close():void{ this.socket.unsubscribe(); this.ws.complete(); } public sendMessage( message:string ):void{ this.ws.next( message ); } public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); } }
В данном сервисе есть 3 приватные и 2 публичные переменные, а также 3 публичные функции.
private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string;
ws — наблюдаемая переменная WebSocketSubject.
socket — переменная для подписки на ws.
url — ссылка на сокет.
public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject();
message — наблюдаемая переменная в которую транслируются все данные из сокета.
opened — наблюдаемая переменная которая следит за открытием/закрытием соединения с сокетом.
public close():void{ this.socket.unsubscribe(); this.ws.complete(); }
Функция для закрытия сокета.
public sendMessage( message:string ):void{ this.ws.next( message ); }
Функция для отправки данных в сокет.
public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); }
Эта функция открывает соединение с сокетом, записывая его объект в наблюдаемую переменную и подписывается на трансляцию из нее. При потере связи каждую секунду пытается восстановить связь.
ChannelWebsocketService
Наследуемый сервис для подписки на каналы Rails5 Action Cable:
import { Injectable } from "@angular/core"; import { Subject } from "rxjs/Subject"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChannelWebsocketService { private socketStarted: boolean; public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public identifierStr: string; public subscribed: Subject<boolean> = new Subject(); constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; } private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); }; private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; } private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); } private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); } public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); } public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); } }
В данном сервисе есть 2 приватные и 3 публичные переменные, а также 7 приватных и 2 публичные функции.
private socketStarted: boolean; private identifierStr: string;
socketStarted — переменная в которую транслируется состояние подписки на сокет.
identifierStr — специально подготовленная строка идентификатор для Rails5 Action Cable канала.
public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public subscribed: Subject<boolean> = new Subject();
observableData — наблюдаемая переменная в которую записывается сообщение из сокета для канала.
identifier — объект идентификатор для Rails5 Action Cable канала.
subscribed — наблюдаемая переменная в которую записывается состояние подписки.
constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } ... private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); }
В конструкторе этого сервиса мы вызываем 2 функции: observeMessage и observeOpened, которые отслеживают присланные сокетом данные и состояние сокета соответственно.
private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; }
encodeIdentifier — статическая приватная функция декодирует строку идентификатора, которую вернул сокет для идентификации сообщения на принадлежность к каналу.
getDataString — преобразовывает объект в формат строки, который принимает Rails5 Action Cable.
private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); };
Возвращает строку для подписки на канал Rails5 Action Cable.
private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; }
Определяет принадлежность сообщения от сокета к данному каналу.
private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); }
Подписывает на канал.
public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); }
Отправляет данные в канал Rails5 Action Cable;
public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); }
Отписывает от канала.
ChatChannelService
Cервис, унаследованный от ChannelWebsocketService для подписки на канал ChatChannel:
import { Injectable } from "@angular/core"; import { ChannelWebsocketService } from "./channel.websocket.service"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChatChannelService extends ChannelWebsocketService { constructor( websocketService: WebSocketService ){ super( websocketService ); this.identifier = { channel: 'ChatChannel' }; } }
В конструкторе этого сервиса переопределяется переменная identifier для идентификации канала.
ChatComponent
Компонент который используя ChatChannelService принимает/отправляет данные в канал.
Пример кода не привожу, он есть в GitHub, ссылка на который приведена ниже
Пример
Здесь можно скачать пример.
Для старта клиентского приложения переходим в папку «client» и вызываем:
npm install gulp
Надеюсь эта статья поможет разобраться в данном вопросе.
ссылка на оригинал статьи https://habrahabr.ru/post/318040/
Добавить комментарий