File

lib/core/src/rcc-rtc/rcc-rtc.class.ts

Index

Properties
Methods

Constructor

constructor(config: RccRtcConfig)
Parameters :
Name Type Optional
config RccRtcConfig No

Properties

Public data$
Type : Observable<any>
Protected encryptionHandler
Type : EncryptionHandler
Protected incoming$
Type : Observable<any>
Protected messageNum
Type : number
Default value : 0
Protected outgoingReceipts
Default value : new Set<any>()
Protected pendingReceipts
Default value : new Set<any>()
Public ready
Type : Promise<void>
Protected rejectReady
Type : function
Protected rejectRemoteDone
Type : function
Protected remoteDone
Type : Promise<any>
Protected resolveReady
Type : function
Protected resolveRemoteDone
Type : function
Protected rtcDataChannel
Type : RTCDataChannel
Protected rtcPeerConnection
Type : RTCPeerConnection
Protected signal
Type : RccWebsocketSignal
Public signalReliable
Default value : false
Protected stopIncoming
Default value : new Subject<any>()
Protected stunServers
Type : string[]
Protected subscriptions
Type : Subscription[]
Default value : []
Protected userCancel
Type : UserCancel

Methods

Async cancel
cancel()
Returns : any
Async close
close()
Returns : any
Async done
done()
Returns : any
Async open
open(config: literal type)
Parameters :
Name Type Optional
config literal type No
Returns : any
Async send
send(data: any)
Parameters :
Name Type Optional
data any No
Returns : any
Async sendEncryptedData
sendEncryptedData(data: any)
Parameters :
Name Type Optional
data any No
Returns : any
Async sendReceipt
sendReceipt(id: number)
Parameters :
Name Type Optional
id number No
Returns : any
import	{
			log,
			timeoutPromise,
			throwError,
		}							from '../utils'

import	{
			EmptyError,
			firstValueFrom,
			fromEvent,
			lastValueFrom,
			Observable,
			Subject,
			Subscription,
		}							from 'rxjs'

import	{
			map,
			tap,
			filter,
			pluck,
			takeWhile,
			share,
			takeUntil,
		}							from 'rxjs/operators'

import 	{
			RccWebSocket,
			RccWebSocketConfig,
			assertRccWebSocketConfig
		}							from '../rcc-websocket'

import	{
			EncryptionHandler,
			decrypt
		}							from '../encryption'

import	{
			UserCancel,
			UserCanceledError
		}							from '../user-cancel'

import	{
			RccRtcConfig,
			assertRccRtcConfig
		}							from './rcc-rtc.commons'


export class RccWebsocketSignal {

	protected rccWebSocket 		: RccWebSocket
	protected closing			: boolean = false
	protected timeout			: number

	public iceCandidate$		: Observable<RTCIceCandidate>
	public offer$				: Observable<RTCSessionDescriptionInit>
	public answer$				: Observable<RTCSessionDescriptionInit>
	public randomValue$			: Observable<number>

	public ready				: Promise<void>
	public isReliable			= true

	constructor(config: RccWebSocketConfig, timeout = 1000){

		assertRccWebSocketConfig(config)

		this.rccWebSocket 	= 	new RccWebSocket(config)		
		this.timeout		=	timeout
		this.ready 			= 	this.rccWebSocket.ready

		this.iceCandidate$ 	= 	this.rccWebSocket.data$.pipe(pluck('iceCandidate'),	filter( x => x !== undefined))
		this.offer$ 		= 	this.rccWebSocket.data$.pipe(pluck('offer'), 		filter( x => x !== undefined))
		this.answer$		= 	this.rccWebSocket.data$.pipe(pluck('answer'), 		filter( x => x !== undefined))
		this.randomValue$	= 	this.rccWebSocket.data$.pipe(pluck('randomValue'), 	filter( x => x !== undefined))

	}

	async open(){
		return await this.rccWebSocket.open()		
	}

	async done(){		
		return await this.rccWebSocket.done()
	}

	async close(){
		await this.rccWebSocket.close()
	}


	async send(data:any) {

		try {

			return	await	Promise.race([
								this.rccWebSocket.send(data), 
								timeoutPromise(this.timeout, 'timeout rccWebsocketSignal.send()'+JSON.stringify(data) )
							])

		} catch(e){
			this.isReliable = false
			throwError(e)
		}
	}

	async sendIceCandidate(iceCandidate: RTCIceCandidate ){
		return await this.send({iceCandidate})
	}

	async sendOffer(offer: RTCSessionDescriptionInit){
		return await this.send({offer})		
	}

	async sendAnswer(answer: RTCSessionDescriptionInit){
		return await this.send({answer})
	}

	async sendRandomValue(randomValue: number){
		return await this.send({randomValue})
	}


}














export class RccRtc {

	protected signal				: RccWebsocketSignal
	protected encryptionHandler		: EncryptionHandler
	protected rtcPeerConnection		: RTCPeerConnection
	protected rtcDataChannel		: RTCDataChannel
	protected stunServers			: string[]

	protected resolveReady			: (...argy:any[]) => any
	protected rejectReady			: (...argy:any[]) => any
	protected userCancel			: UserCancel

	protected resolveRemoteDone		: (...args: any[]) => any
	protected rejectRemoteDone		: (...args: any[]) => any

	protected remoteDone			: Promise<any>

	protected pendingReceipts		= new Set<any>()
	protected outgoingReceipts		= new Set<any>()

	protected messageNum			: number			= 0
	protected incoming$				: Observable<any>
	protected stopIncoming			= new Subject<any>()

	protected subscriptions			: Subscription[] = []


	public ready					: Promise<void>
	public data$					: Observable<any>
	public signalReliable			= false

	constructor(config: RccRtcConfig){

		assertRccRtcConfig(config)

		const {stunServers, ...webSocketConfig} = config

		this.signal				= 	new RccWebsocketSignal(webSocketConfig)
		this.stunServers		= 	config.stunServers
		this.encryptionHandler	=	config.encryptionHandler

		this.rtcPeerConnection 	= 	new RTCPeerConnection({iceServers: [{urls:this.stunServers}]})
		this.rtcDataChannel		= 	this.rtcPeerConnection.createDataChannel("rcc", {negotiated: true, id: 1 })

		this.incoming$ 			= 	fromEvent(this.rtcDataChannel, 'message')
									.pipe( 
										takeUntil(this.stopIncoming),
										map( (event:MessageEvent) => event.data),
										decrypt(this.encryptionHandler),
										filter(	x => !!x), //just drop messages, that could not be decrypted. or should this throw an error?
										share()
									)

		this.data$				=	this.incoming$
									.pipe( 
										tap( message => message.id && this.sendReceipt(message.id).catch( () => {} ) ),
										pluck('data'),		
										filter( x => !!x),
										share()
									)								


		this.ready				=	new Promise( (resolve, reject) => { this.resolveReady = resolve; this.rejectReady = reject })
		this.userCancel			=	new UserCancel('RccRtc: user canceled') 

	}

	

	async open(config: {timeoutPeer: number, timeoutConnection:number }){		

		///timeout should not be configurable like this. Things will fuck up, as soon as client and peer have slightly different settings
		console.warn('timeout for .open() needs rework!')

		this.remoteDone 			= 	firstValueFrom(this.data$.pipe( filter( data => data.done)))
										.catch( e => {})		//  regard as done, even if an error occured, such as EmptyError


		const channelOpen			=	new Promise( (resolve) => this.rtcDataChannel.addEventListener('open', resolve, {once:true}))


		const localIceCandidate$	= 	fromEvent(this.rtcPeerConnection, 'icecandidate')
										.pipe( 
											pluck('candidate'),
											takeWhile( candidate => candidate !== null, true),											
											takeUntil( this.userCancel )
										)


		const remoteIceCandidate$	=	this.signal.iceCandidate$
										.pipe(
											takeWhile( candidate => candidate !== null),
											takeUntil( this.userCancel )																						
										)

		const localIceDone			=	lastValueFrom(localIceCandidate$)
										.catch( e => {})		//  regard as done, even if an error occured, such as EmptyError

		const remoteIceDone			=	lastValueFrom(remoteIceCandidate$)
										.catch( e => {})		//	regard as done, even if an error occured, such as EmptyError
										


		this.subscriptions.push(
			localIceCandidate$.subscribe(	(iceCandidate : RTCIceCandidate) => this.signal.sendIceCandidate(iceCandidate).catch( () => {} ) ), 			//TODO: handle errors?
			remoteIceCandidate$.subscribe( 	(iceCandidate : RTCIceCandidate) => this.rtcPeerConnection.addIceCandidate(iceCandidate).catch( () => {} ) ),	//TODO: handle errors?
		)

		await 	Promise.race([
					this.signal.open(),
					this.userCancel,
					timeoutPromise( config.timeoutPeer , "timeout peer" )
				])

		const determineRoles		=	async () => {


											const localRandomValue	= 	Math.random()
											const nextRandomValue 	= 	firstValueFrom(this.signal.randomValue$)
																		.catch( () => Promise.reject("RccRtc.open(): Never received random value."))
											
											await this.signal.sendRandomValue(localRandomValue)

											const remoteRandomValue	= 	await nextRandomValue

											return 	localRandomValue > remoteRandomValue 
													?	'primary' 
													:	'secondary'

										}

		const role 					= 	await 	Promise.race([
													determineRoles(), 
													this.userCancel,
													timeoutPromise(2000, 'timeout determine roles')
												])
		


		const prepareAsPrimary		=	async () => {

											const offer 		= await this.rtcPeerConnection.createOffer()

											await this.rtcPeerConnection.setLocalDescription(offer)

											const nextAnswer	= firstValueFrom(this.signal.answer$)

											await this.signal.sendOffer(offer)

											const remoteAnswer	= await nextAnswer

											await this.rtcPeerConnection.setRemoteDescription(remoteAnswer)

										}

		const prepareAsSecondary	=	async () => {

											const nextOffer		= firstValueFrom(this.signal.offer$)
											
											const remoteOffer	= await nextOffer

											await this.rtcPeerConnection.setRemoteDescription(remoteOffer)

											const answer		= await this.rtcPeerConnection.createAnswer() 

											await this.rtcPeerConnection.setLocalDescription(answer)

											await this.signal.sendAnswer(answer)

										}


		const establishRtcChannel	= 	async () => {											

											role == 'primary'
											?	await prepareAsPrimary()
											:	await prepareAsSecondary()


											await localIceDone
											await remoteIceDone

											await channelOpen 		

										}

		let error
										
		try { 

			await 	Promise.race([
						establishRtcChannel(), 
						this.userCancel,
						timeoutPromise(config.timeoutConnection, "timeout rtc channel")
					]) 

		} 
		catch(e){ error = e }


		const timoutFinalize 		= 	error
										?	config.timeoutConnection*2
										:	2000

									// If we had a non timeout error We have to wait a bit: Our peer may have not encountered an error as we did, 
									// but will eventually run into the timeout we set. So we have to wait at least for that amount of time.


		try{

			this.signal.isReliable || throwError("Signal not reliable.")

			await	 Promise.race([
						this.signal.done(),
						this.userCancel,
						timeoutPromise(timoutFinalize, 'timeout signal.done()')
					])

			this.signalReliable = this.signal.isReliable //this.signal.isReliable can change during .done()!

		}
		catch(e){

			console.log('try...', e)

			this.signalReliable = false

		}

		this.signal.close().catch( () => {})
		// If we got to this point an error is rather obscure and of no further significance 
		// for the things to come.
		// Maybe still log it somewhere?

		error
		?	throwError(error)
		:	this.resolveReady()

	}




	async send(data: any){		

		console.log('webrtc send:', data)

		this.messageNum ++ //TODO: For some reason it was important that ids are never the same, or was it? Why not some random number?

		const id 				= 	this.messageNum
		const combined			= 	{id, data}
		const receipt			= 	firstValueFrom( this.incoming$.pipe( filter(
										(message:any) =>	message.receipt
															&&
															message.receipt == id
									)))

		this.pendingReceipts.add(receipt)		

		await this.sendEncryptedData(combined)

		await receipt

		this.pendingReceipts.delete(receipt)		

	}


	async sendReceipt(id: number){

		const outgoing = this.sendEncryptedData({receipt: id})		

		this.outgoingReceipts.add(outgoing)

		await outgoing

		this.outgoingReceipts.delete(outgoing)
	}



	async sendEncryptedData(data:any){

		// Maybe this is over the top, since WebRTC is encrypted anyway.
		// Adding an extra layer of encryption does not neccessarly improve the security, 
		// but might screw things up instead. Then again, we do have a shared seceret already (via encryptionHandler, 
		// through the QR code scan), why rely on another key negotiation?

		await this.ready

		const cipher = await this.encryptionHandler.encrypt(data)
		
		this.rtcDataChannel.send(cipher)

	}


	async done(){

		await Promise.all(this.pendingReceipts)

		await this.send({done:true})

		await (this.remoteDone || Promise.reject(new Error("RccRtc.done() no open channel.")))

		this.stopIncoming.next(null)

		await Promise.all(this.outgoingReceipts)

	}

	async cancel(){

		this.userCancel.cancel()
		this.close()

	}


	async close(){

		this.remoteDone = undefined

		await this.signal.close()

		//clean up subscriptions
		this.subscriptions.forEach( sub => sub.unsubscribe() )


		//clean up rtcDataChannel

		if(
				this.rtcDataChannel
			&&	this.rtcDataChannel.readyState != 'closed'
		){

			const channelClosed = new Promise( (resolve) => this.rtcDataChannel.addEventListener('close', resolve, {once:true}))
			
			this.rtcDataChannel.close()

			await channelClosed			

			this.rtcDataChannel		= null

			console.log('rtcDataChannel closed')
		}


		//clean up rtcPeerConnection
		if(	
				this.rtcPeerConnection
			&&	this.rtcPeerConnection.signalingState != 'closed'
		){

			timeoutPromise(5000).catch( ()=>{
				this.rtcPeerConnection.close()
				this.rtcPeerConnection	= null

			})

			console.log('rtcPeerConnection closed')
		}


	}
	

}

results matching ""

    No results matching ""