File

lib/core/src/rcc-websocket/rcc-websocket.class.ts

Index

Properties
Methods

Constructor

constructor(config: RccWebSocketConfig)
Parameters :
Name Type Optional
config RccWebSocketConfig No

Properties

Protected channel
Type : string
Public data$
Type : Observable<any>
Protected encryptionHandler
Type : EncryptionHandler
Protected message$
Type : Observable<any>
Protected messageCount
Type : number
Default value : 0
Protected outgoingReceipts
Default value : new Set<any>()
Protected pendingReceipts
Default value : new Set<any>()
Public ready
Type : Promise<any>
Protected receivedIds
Default value : new Set<string>()
Protected rejectReady
Type : function
Protected remoteDone
Type : Promise<any>
Protected resolveReady
Type : function
Public status
Default value : new BehaviorSubject<string>('closed')
Protected subscriptions
Type : Subscription[]
Default value : []
Protected ws
Type : WebSocketSubject<any>

Methods

Async close
close()
Returns : any
Async done
done()
Returns : any
isNoDuplicate
isNoDuplicate(message: any)
Parameters :
Name Type Optional
message any No
Returns : boolean
Async open
open()
Returns : any
Async send
send(data: any)
Parameters :
Name Type Optional
data any No
Returns : Promise<any>
Async sendEnryptedMessage
sendEnryptedMessage(message: any)
Parameters :
Name Type Optional
message any No
Returns : Promise<any>
Async sendReceipt
sendReceipt(id: string | number)
Parameters :
Name Type Optional
id string | number No
Returns : Promise<any>
import	{
			BehaviorSubject,
			firstValueFrom,			
			Observable,
			Subscription,
		}						from 'rxjs'

import	{
			webSocket,
			WebSocketSubject,
		}						from 'rxjs/webSocket'

import	{
			filter,
			pluck,
			tap,
			share,
			throwIfEmpty
		}						from 'rxjs/operators'

import	{
			decrypt,
			EncryptionHandler,
			randomString
		}						from '../encryption'

import	{
			throwError,
			log
		}						from '../utils'

import	{
			RccWebSocketConfig,
			assertRccWebSocketConfig
		}						from './rcc-websocket.commons'

/*
	This whole thing should be revisited and probably be replaced with a commonly used protocol
*/



/*
	REPLACE rxjs with native websockets

*/


export class RccWebSocket {

	protected ws 				: WebSocketSubject<any>	
	protected channel			: string
	protected encryptionHandler	: EncryptionHandler
	protected subscriptions 	: Subscription[] 			= []
	protected resolveReady		: (...args: any[]) => any
	protected rejectReady		: (...args: any[]) => any
	protected messageCount		= 0

	protected receivedIds		= new Set<string>()
	protected pendingReceipts	= new Set<any>()
	protected outgoingReceipts	= new Set<any>()
	
	protected remoteDone		: Promise<any>

	protected message$			: Observable<any>

	public	data$				: Observable<any>
	public	ready				: Promise<any>
	public	status				= new BehaviorSubject<string>('closed')

	constructor(config: RccWebSocketConfig){ 

		assertRccWebSocketConfig(config)


		const url				= 	config.url
		const openObserver		=	{next: () => this.status.next('open')	}
		const closeObserver		=	{next: () => this.status.next('closed') }
		const closingObserver	=	{next: () => this.status.next('closed') }

		this.ws					= 	webSocket({url, openObserver, closeObserver})
		this.channel			= 	config.channel
		this.encryptionHandler	= 	config.encryptionHandler

		this.message$ 			= 	this.ws.pipe(
										decrypt(this.encryptionHandler),
										filter( message => !!message ), //dropping messages that could not be decrypted
										tap( m => console.log('WS received: ', m)),
										share()
									)

		this.data$				= 	this.message$
									.pipe(
										filter(	message => message.type == 'data'),
										tap( 	message => this.sendReceipt(message.id) ),
										filter(	message => this.isNoDuplicate(message) ),										
										pluck('data'),
										share()
									)
		
		this.ready				=	new Promise( (resolve, reject) => { this.resolveReady = resolve; this.rejectReady = reject })
		

	}




	async open(){		

		this.remoteDone 		= 	firstValueFrom(this.data$.pipe( filter( data => data.done ))).catch(() => {}) //If data$ errors or completes without items, consider the remote peer done.

		this.subscriptions.push(		

			this.ws.subscribe({
				next: 		(x:any)	=> { },
				error:		(e:any)	=> { throw e },
				complete:	()		=> { throw new Error("RccWebSocket: connection closed before transmission was complete.") }
			})

		)

		const peers_present		=	firstValueFrom( this.ws.pipe( filter(
										(message: any) => 	message 
															&& 	message.type 	== 'joined' 															
															&&	message.count	>= 2
															&& 	message.channel == this.channel
												
									)))


		this.ws.next({type: 'join', channel: this.channel})

		await peers_present

		this.resolveReady()

	}



	async send(data:any): Promise<any> {
		// Uses ids to make sure the other side got the data and can send a receipt,
		// Since there is no cononical way to hash json data, we'll use a random string as id
		// Cannot use the json data itself as the receipt, because then anyone could just send back the cipher as receipt

		this.messageCount++

		const id		= 	randomString(20)+this.messageCount //ids are random and unique
		const message	= 	{type: 'data', data, id }

		const receipt	= 	firstValueFrom( this.message$.pipe( filter(
								(message: any) => 	message 
													&& 	message.type 	== 'receipt' 															
													&&	message.id		== id
							), throwIfEmpty(()=> 'RECIPET MGHNA')))

		this.pendingReceipts.add(receipt)

		await this.sendEnryptedMessage(message)

		console.log('WS sent: ', data)
		await receipt

		this.pendingReceipts.delete(receipt)

	}



	async sendReceipt(id:string|number): Promise<any>{

		id || throwError(new Error("RccWebSocket.sendReceipt: missing id"))

		const message	= { type: 'receipt', id}
		const outgoing 	= this.sendEnryptedMessage(message)

		this.outgoingReceipts.add(outgoing)

		await outgoing

		this.outgoingReceipts.delete(outgoing)
	}




	async sendEnryptedMessage(message:any): Promise<any>{


		await this.ready 

		const cipher = await this.encryptionHandler.encrypt(message)

		this.ws.next(cipher)


	}


	isNoDuplicate(message:any): boolean {		
		const isDuplicate = this.receivedIds.has(message.id)		

		this.receivedIds.add(message.id)

		return !isDuplicate
	}


	async done(){

		console.log( Math.floor(Date.now()/1000) % (60*5) )

		console.log('done', 1)
		await Promise.all(this.pendingReceipts)

		console.log('done', 2)

		await this.send({done:true})

		console.log('done', 3)

		await (this.remoteDone || throwError("RccWebSocket.done() no open channel.") )

		console.log('done', 4)

		await Promise.all(this.outgoingReceipts)

		console.log('done', 5)
	}




	async close(){

		console.warn('websocket close()')

		this.subscriptions.forEach( sub => sub.unsubscribe() )

		this.remoteDone		= undefined

		const completion	= firstValueFrom(this.status.pipe( filter( status => status == 'closed' ), throwIfEmpty( () => 'MUUP') ))

		this.ws.complete()

		await completion

		console.log('websocket closed')

	}

}

results matching ""

    No results matching ""