lib/core/src/rcc-rtc/rcc-rtc.class.ts
Properties |
|
Methods |
|
constructor(config: RccRtcConfig)
|
||||||
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:167
|
||||||
Parameters :
|
Public data$ |
Type : Observable<any>
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:166
|
Protected encryptionHandler |
Type : EncryptionHandler
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:141
|
Protected incoming$ |
Type : Observable<any>
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:159
|
Protected messageNum |
Type : number
|
Default value : 0
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:158
|
Protected outgoingReceipts |
Default value : new Set<any>()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:156
|
Protected pendingReceipts |
Default value : new Set<any>()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:155
|
Public ready |
Type : Promise<void>
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:165
|
Protected rejectReady |
Type : function
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:147
|
Protected rejectRemoteDone |
Type : function
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:151
|
Protected remoteDone |
Type : Promise<any>
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:153
|
Protected resolveReady |
Type : function
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:146
|
Protected resolveRemoteDone |
Type : function
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:150
|
Protected rtcDataChannel |
Type : RTCDataChannel
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:143
|
Protected rtcPeerConnection |
Type : RTCPeerConnection
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:142
|
Protected signal |
Type : RccWebsocketSignal
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:140
|
Public signalReliable |
Default value : false
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:167
|
Protected stopIncoming |
Default value : new Subject<any>()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:160
|
Protected stunServers |
Type : string[]
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:144
|
Protected subscriptions |
Type : Subscription[]
|
Default value : []
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:162
|
Protected userCancel |
Type : UserCancel
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:148
|
Async cancel |
cancel()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:449
|
Returns :
any
|
Async close |
close()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:457
|
Returns :
any
|
Async done |
done()
|
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:435
|
Returns :
any
|
Async open | ||||||
open(config: literal type)
|
||||||
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:207
|
||||||
Parameters :
Returns :
any
|
Async send | ||||||
send(data: any)
|
||||||
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:381
|
||||||
Parameters :
Returns :
any
|
Async sendEncryptedData | ||||||
sendEncryptedData(data: any)
|
||||||
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:419
|
||||||
Parameters :
Returns :
any
|
Async sendReceipt | ||||||
sendReceipt(id: number)
|
||||||
Defined in lib/core/src/rcc-rtc/rcc-rtc.class.ts:406
|
||||||
Parameters :
Returns :
any
|
import {
log,
timeoutPromise,
throwError,
} from '../utils'
import {
EmptyError,
firstValueFrom,
fromEvent,
lastValueFrom,
Observable,
Subject,
Subscription,
} from 'rxjs'
import {
map,
tap,
filter,
pluck,
takeWhile,
share,
takeUntil,
} from 'rxjs/operators'
import {
RccWebSocket,
RccWebSocketConfig,
assertRccWebSocketConfig
} from '../rcc-websocket'
import {
EncryptionHandler,
decrypt
} from '../encryption'
import {
UserCancel,
UserCanceledError
} from '../user-cancel'
import {
RccRtcConfig,
assertRccRtcConfig
} from './rcc-rtc.commons'
export class RccWebsocketSignal {
protected rccWebSocket : RccWebSocket
protected closing : boolean = false
protected timeout : number
public iceCandidate$ : Observable<RTCIceCandidate>
public offer$ : Observable<RTCSessionDescriptionInit>
public answer$ : Observable<RTCSessionDescriptionInit>
public randomValue$ : Observable<number>
public ready : Promise<void>
public isReliable = true
constructor(config: RccWebSocketConfig, timeout = 1000){
assertRccWebSocketConfig(config)
this.rccWebSocket = new RccWebSocket(config)
this.timeout = timeout
this.ready = this.rccWebSocket.ready
this.iceCandidate$ = this.rccWebSocket.data$.pipe(pluck('iceCandidate'), filter( x => x !== undefined))
this.offer$ = this.rccWebSocket.data$.pipe(pluck('offer'), filter( x => x !== undefined))
this.answer$ = this.rccWebSocket.data$.pipe(pluck('answer'), filter( x => x !== undefined))
this.randomValue$ = this.rccWebSocket.data$.pipe(pluck('randomValue'), filter( x => x !== undefined))
}
async open(){
return await this.rccWebSocket.open()
}
async done(){
return await this.rccWebSocket.done()
}
async close(){
await this.rccWebSocket.close()
}
async send(data:any) {
try {
return await Promise.race([
this.rccWebSocket.send(data),
timeoutPromise(this.timeout, 'timeout rccWebsocketSignal.send()'+JSON.stringify(data) )
])
} catch(e){
this.isReliable = false
throwError(e)
}
}
async sendIceCandidate(iceCandidate: RTCIceCandidate ){
return await this.send({iceCandidate})
}
async sendOffer(offer: RTCSessionDescriptionInit){
return await this.send({offer})
}
async sendAnswer(answer: RTCSessionDescriptionInit){
return await this.send({answer})
}
async sendRandomValue(randomValue: number){
return await this.send({randomValue})
}
}
export class RccRtc {
protected signal : RccWebsocketSignal
protected encryptionHandler : EncryptionHandler
protected rtcPeerConnection : RTCPeerConnection
protected rtcDataChannel : RTCDataChannel
protected stunServers : string[]
protected resolveReady : (...argy:any[]) => any
protected rejectReady : (...argy:any[]) => any
protected userCancel : UserCancel
protected resolveRemoteDone : (...args: any[]) => any
protected rejectRemoteDone : (...args: any[]) => any
protected remoteDone : Promise<any>
protected pendingReceipts = new Set<any>()
protected outgoingReceipts = new Set<any>()
protected messageNum : number = 0
protected incoming$ : Observable<any>
protected stopIncoming = new Subject<any>()
protected subscriptions : Subscription[] = []
public ready : Promise<void>
public data$ : Observable<any>
public signalReliable = false
constructor(config: RccRtcConfig){
assertRccRtcConfig(config)
const {stunServers, ...webSocketConfig} = config
this.signal = new RccWebsocketSignal(webSocketConfig)
this.stunServers = config.stunServers
this.encryptionHandler = config.encryptionHandler
this.rtcPeerConnection = new RTCPeerConnection({iceServers: [{urls:this.stunServers}]})
this.rtcDataChannel = this.rtcPeerConnection.createDataChannel("rcc", {negotiated: true, id: 1 })
this.incoming$ = fromEvent(this.rtcDataChannel, 'message')
.pipe(
takeUntil(this.stopIncoming),
map( (event:MessageEvent) => event.data),
decrypt(this.encryptionHandler),
filter( x => !!x), //just drop messages, that could not be decrypted. or should this throw an error?
share()
)
this.data$ = this.incoming$
.pipe(
tap( message => message.id && this.sendReceipt(message.id).catch( () => {} ) ),
pluck('data'),
filter( x => !!x),
share()
)
this.ready = new Promise( (resolve, reject) => { this.resolveReady = resolve; this.rejectReady = reject })
this.userCancel = new UserCancel('RccRtc: user canceled')
}
async open(config: {timeoutPeer: number, timeoutConnection:number }){
///timeout should not be configurable like this. Things will fuck up, as soon as client and peer have slightly different settings
console.warn('timeout for .open() needs rework!')
this.remoteDone = firstValueFrom(this.data$.pipe( filter( data => data.done)))
.catch( e => {}) // regard as done, even if an error occured, such as EmptyError
const channelOpen = new Promise( (resolve) => this.rtcDataChannel.addEventListener('open', resolve, {once:true}))
const localIceCandidate$ = fromEvent(this.rtcPeerConnection, 'icecandidate')
.pipe(
pluck('candidate'),
takeWhile( candidate => candidate !== null, true),
takeUntil( this.userCancel )
)
const remoteIceCandidate$ = this.signal.iceCandidate$
.pipe(
takeWhile( candidate => candidate !== null),
takeUntil( this.userCancel )
)
const localIceDone = lastValueFrom(localIceCandidate$)
.catch( e => {}) // regard as done, even if an error occured, such as EmptyError
const remoteIceDone = lastValueFrom(remoteIceCandidate$)
.catch( e => {}) // regard as done, even if an error occured, such as EmptyError
this.subscriptions.push(
localIceCandidate$.subscribe( (iceCandidate : RTCIceCandidate) => this.signal.sendIceCandidate(iceCandidate).catch( () => {} ) ), //TODO: handle errors?
remoteIceCandidate$.subscribe( (iceCandidate : RTCIceCandidate) => this.rtcPeerConnection.addIceCandidate(iceCandidate).catch( () => {} ) ), //TODO: handle errors?
)
await Promise.race([
this.signal.open(),
this.userCancel,
timeoutPromise( config.timeoutPeer , "timeout peer" )
])
const determineRoles = async () => {
const localRandomValue = Math.random()
const nextRandomValue = firstValueFrom(this.signal.randomValue$)
.catch( () => Promise.reject("RccRtc.open(): Never received random value."))
await this.signal.sendRandomValue(localRandomValue)
const remoteRandomValue = await nextRandomValue
return localRandomValue > remoteRandomValue
? 'primary'
: 'secondary'
}
const role = await Promise.race([
determineRoles(),
this.userCancel,
timeoutPromise(2000, 'timeout determine roles')
])
const prepareAsPrimary = async () => {
const offer = await this.rtcPeerConnection.createOffer()
await this.rtcPeerConnection.setLocalDescription(offer)
const nextAnswer = firstValueFrom(this.signal.answer$)
await this.signal.sendOffer(offer)
const remoteAnswer = await nextAnswer
await this.rtcPeerConnection.setRemoteDescription(remoteAnswer)
}
const prepareAsSecondary = async () => {
const nextOffer = firstValueFrom(this.signal.offer$)
const remoteOffer = await nextOffer
await this.rtcPeerConnection.setRemoteDescription(remoteOffer)
const answer = await this.rtcPeerConnection.createAnswer()
await this.rtcPeerConnection.setLocalDescription(answer)
await this.signal.sendAnswer(answer)
}
const establishRtcChannel = async () => {
role == 'primary'
? await prepareAsPrimary()
: await prepareAsSecondary()
await localIceDone
await remoteIceDone
await channelOpen
}
let error
try {
await Promise.race([
establishRtcChannel(),
this.userCancel,
timeoutPromise(config.timeoutConnection, "timeout rtc channel")
])
}
catch(e){ error = e }
const timoutFinalize = error
? config.timeoutConnection*2
: 2000
// If we had a non timeout error We have to wait a bit: Our peer may have not encountered an error as we did,
// but will eventually run into the timeout we set. So we have to wait at least for that amount of time.
try{
this.signal.isReliable || throwError("Signal not reliable.")
await Promise.race([
this.signal.done(),
this.userCancel,
timeoutPromise(timoutFinalize, 'timeout signal.done()')
])
this.signalReliable = this.signal.isReliable //this.signal.isReliable can change during .done()!
}
catch(e){
console.log('try...', e)
this.signalReliable = false
}
this.signal.close().catch( () => {})
// If we got to this point an error is rather obscure and of no further significance
// for the things to come.
// Maybe still log it somewhere?
error
? throwError(error)
: this.resolveReady()
}
async send(data: any){
console.log('webrtc send:', data)
this.messageNum ++ //TODO: For some reason it was important that ids are never the same, or was it? Why not some random number?
const id = this.messageNum
const combined = {id, data}
const receipt = firstValueFrom( this.incoming$.pipe( filter(
(message:any) => message.receipt
&&
message.receipt == id
)))
this.pendingReceipts.add(receipt)
await this.sendEncryptedData(combined)
await receipt
this.pendingReceipts.delete(receipt)
}
async sendReceipt(id: number){
const outgoing = this.sendEncryptedData({receipt: id})
this.outgoingReceipts.add(outgoing)
await outgoing
this.outgoingReceipts.delete(outgoing)
}
async sendEncryptedData(data:any){
// Maybe this is over the top, since WebRTC is encrypted anyway.
// Adding an extra layer of encryption does not neccessarly improve the security,
// but might screw things up instead. Then again, we do have a shared seceret already (via encryptionHandler,
// through the QR code scan), why rely on another key negotiation?
await this.ready
const cipher = await this.encryptionHandler.encrypt(data)
this.rtcDataChannel.send(cipher)
}
async done(){
await Promise.all(this.pendingReceipts)
await this.send({done:true})
await (this.remoteDone || Promise.reject(new Error("RccRtc.done() no open channel.")))
this.stopIncoming.next(null)
await Promise.all(this.outgoingReceipts)
}
async cancel(){
this.userCancel.cancel()
this.close()
}
async close(){
this.remoteDone = undefined
await this.signal.close()
//clean up subscriptions
this.subscriptions.forEach( sub => sub.unsubscribe() )
//clean up rtcDataChannel
if(
this.rtcDataChannel
&& this.rtcDataChannel.readyState != 'closed'
){
const channelClosed = new Promise( (resolve) => this.rtcDataChannel.addEventListener('close', resolve, {once:true}))
this.rtcDataChannel.close()
await channelClosed
this.rtcDataChannel = null
console.log('rtcDataChannel closed')
}
//clean up rtcPeerConnection
if(
this.rtcPeerConnection
&& this.rtcPeerConnection.signalingState != 'closed'
){
timeoutPromise(5000).catch( ()=>{
this.rtcPeerConnection.close()
this.rtcPeerConnection = null
})
console.log('rtcPeerConnection closed')
}
}
}