lib/core/src/rcc-websocket/rcc-websocket.class.ts
Properties |
|
Methods |
|
constructor(config: RccWebSocketConfig)
|
||||||
Parameters :
|
Protected channel |
Type : string
|
Public data$ |
Type : Observable<any>
|
Protected encryptionHandler |
Type : EncryptionHandler
|
Protected message$ |
Type : Observable<any>
|
Protected messageCount |
Type : number
|
Default value : 0
|
Protected outgoingReceipts |
Default value : new Set<any>()
|
Protected pendingReceipts |
Default value : new Set<any>()
|
Public ready |
Type : Promise<any>
|
Protected receivedIds |
Default value : new Set<string>()
|
Protected rejectReady |
Type : function
|
Protected remoteDone |
Type : Promise<any>
|
Protected resolveReady |
Type : function
|
Public status |
Default value : new BehaviorSubject<string>('closed')
|
Protected subscriptions |
Type : Subscription[]
|
Default value : []
|
Protected ws |
Type : WebSocketSubject<any>
|
Async close |
close()
|
Returns :
any
|
Async done |
done()
|
Returns :
any
|
isNoDuplicate | ||||||
isNoDuplicate(message: any)
|
||||||
Parameters :
Returns :
boolean
|
Async open |
open()
|
Returns :
any
|
Async send | ||||||
send(data: any)
|
||||||
Parameters :
Returns :
Promise<any>
|
Async sendEnryptedMessage | ||||||
sendEnryptedMessage(message: any)
|
||||||
Parameters :
Returns :
Promise<any>
|
Async sendReceipt | ||||||
sendReceipt(id: string | number)
|
||||||
Parameters :
Returns :
Promise<any>
|
import {
BehaviorSubject,
firstValueFrom,
Observable,
Subscription,
} from 'rxjs'
import {
webSocket,
WebSocketSubject,
} from 'rxjs/webSocket'
import {
filter,
pluck,
tap,
share,
throwIfEmpty
} from 'rxjs/operators'
import {
decrypt,
EncryptionHandler,
randomString
} from '../encryption'
import {
throwError,
log
} from '../utils'
import {
RccWebSocketConfig,
assertRccWebSocketConfig
} from './rcc-websocket.commons'
/*
This whole thing should be revisited and probably be replaced with a commonly used protocol
*/
/*
REPLACE rxjs with native websockets
*/
export class RccWebSocket {
protected ws : WebSocketSubject<any>
protected channel : string
protected encryptionHandler : EncryptionHandler
protected subscriptions : Subscription[] = []
protected resolveReady : (...args: any[]) => any
protected rejectReady : (...args: any[]) => any
protected messageCount = 0
protected receivedIds = new Set<string>()
protected pendingReceipts = new Set<any>()
protected outgoingReceipts = new Set<any>()
protected remoteDone : Promise<any>
protected message$ : Observable<any>
public data$ : Observable<any>
public ready : Promise<any>
public status = new BehaviorSubject<string>('closed')
constructor(config: RccWebSocketConfig){
assertRccWebSocketConfig(config)
const url = config.url
const openObserver = {next: () => this.status.next('open') }
const closeObserver = {next: () => this.status.next('closed') }
const closingObserver = {next: () => this.status.next('closed') }
this.ws = webSocket({url, openObserver, closeObserver})
this.channel = config.channel
this.encryptionHandler = config.encryptionHandler
this.message$ = this.ws.pipe(
decrypt(this.encryptionHandler),
filter( message => !!message ), //dropping messages that could not be decrypted
tap( m => console.log('WS received: ', m)),
share()
)
this.data$ = this.message$
.pipe(
filter( message => message.type == 'data'),
tap( message => this.sendReceipt(message.id) ),
filter( message => this.isNoDuplicate(message) ),
pluck('data'),
share()
)
this.ready = new Promise( (resolve, reject) => { this.resolveReady = resolve; this.rejectReady = reject })
}
async open(){
this.remoteDone = firstValueFrom(this.data$.pipe( filter( data => data.done ))).catch(() => {}) //If data$ errors or completes without items, consider the remote peer done.
this.subscriptions.push(
this.ws.subscribe({
next: (x:any) => { },
error: (e:any) => { throw e },
complete: () => { throw new Error("RccWebSocket: connection closed before transmission was complete.") }
})
)
const peers_present = firstValueFrom( this.ws.pipe( filter(
(message: any) => message
&& message.type == 'joined'
&& message.count >= 2
&& message.channel == this.channel
)))
this.ws.next({type: 'join', channel: this.channel})
await peers_present
this.resolveReady()
}
async send(data:any): Promise<any> {
// Uses ids to make sure the other side got the data and can send a receipt,
// Since there is no cononical way to hash json data, we'll use a random string as id
// Cannot use the json data itself as the receipt, because then anyone could just send back the cipher as receipt
this.messageCount++
const id = randomString(20)+this.messageCount //ids are random and unique
const message = {type: 'data', data, id }
const receipt = firstValueFrom( this.message$.pipe( filter(
(message: any) => message
&& message.type == 'receipt'
&& message.id == id
), throwIfEmpty(()=> 'RECIPET MGHNA')))
this.pendingReceipts.add(receipt)
await this.sendEnryptedMessage(message)
console.log('WS sent: ', data)
await receipt
this.pendingReceipts.delete(receipt)
}
async sendReceipt(id:string|number): Promise<any>{
id || throwError(new Error("RccWebSocket.sendReceipt: missing id"))
const message = { type: 'receipt', id}
const outgoing = this.sendEnryptedMessage(message)
this.outgoingReceipts.add(outgoing)
await outgoing
this.outgoingReceipts.delete(outgoing)
}
async sendEnryptedMessage(message:any): Promise<any>{
await this.ready
const cipher = await this.encryptionHandler.encrypt(message)
this.ws.next(cipher)
}
isNoDuplicate(message:any): boolean {
const isDuplicate = this.receivedIds.has(message.id)
this.receivedIds.add(message.id)
return !isDuplicate
}
async done(){
console.log( Math.floor(Date.now()/1000) % (60*5) )
console.log('done', 1)
await Promise.all(this.pendingReceipts)
console.log('done', 2)
await this.send({done:true})
console.log('done', 3)
await (this.remoteDone || throwError("RccWebSocket.done() no open channel.") )
console.log('done', 4)
await Promise.all(this.outgoingReceipts)
console.log('done', 5)
}
async close(){
console.warn('websocket close()')
this.subscriptions.forEach( sub => sub.unsubscribe() )
this.remoteDone = undefined
const completion = firstValueFrom(this.status.pipe( filter( status => status == 'closed' ), throwIfEmpty( () => 'MUUP') ))
this.ws.complete()
await completion
console.log('websocket closed')
}
}