diff options
author | Minteck <contact@minteck.org> | 2022-08-21 17:31:56 +0200 |
---|---|---|
committer | Minteck <contact@minteck.org> | 2022-08-21 17:31:56 +0200 |
commit | a2df9a69dcc14cb70118cda2ded499055e7ee358 (patch) | |
tree | 6dd283e4e9452d38bce81ddaaae49b5335755842 /together/src | |
parent | 84dd0735820b16b60f600284d35183d76547a71f (diff) | |
download | pluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.tar.gz pluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.tar.bz2 pluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.zip |
m. update
Diffstat (limited to 'together/src')
-rw-r--r-- | together/src/index.ts | 413 | ||||
-rw-r--r-- | together/src/types/Session.ts | 39 | ||||
-rw-r--r-- | together/src/types/User.ts | 44 | ||||
-rw-r--r-- | together/src/types/Video.ts | 41 | ||||
-rw-r--r-- | together/src/utils/InternalAPI.ts | 10 | ||||
-rw-r--r-- | together/src/utils/PartyCodes.ts | 5 |
6 files changed, 552 insertions, 0 deletions
diff --git a/together/src/index.ts b/together/src/index.ts new file mode 100644 index 0000000..bfe0d0e --- /dev/null +++ b/together/src/index.ts @@ -0,0 +1,413 @@ +// pony pone pone pony + +import {WebSocket, WebSocketServer} from "ws"; +import {isSessionValid} from "./utils/InternalAPI"; +import User, {APIUser} from "./types/User"; +import Session from "./types/Session"; +import {VideoState} from "./types/Video"; +import {createHash} from 'crypto'; + +const wss = new WebSocketServer({port: 22666}); + +let sessions: Map<string, Session> = new Map(); +let partycodes: Map<string, string> = new Map(); + +wss.on('connection', (ws: WebSocket) => { + let user: User; + let timeoutProcess = setTimeout(() => { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "NO_IDENT", + "reason": "The client did not identify in time." + } + })); + + ws.close(); + }, 1000); + ws.on("message", async data => { + let event = JSON.parse(data.toString()) as WSEvent; + + if (event.task === "IDENTIFY") { + if(user !== undefined) { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "ALREADY_IDENT", + "reason": "This session has already identified." + } + })); + + return ws.close(); + } + clearTimeout(timeoutProcess); + + if (["", undefined, null].includes(event.payload["token"]) || typeof event.payload["token"] !== "string") { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "BAD_TOKEN", + "reason": "The token is not provided or malformed." + } + })); + + return ws.close(); + } + + let tokenValid = await isSessionValid(event.payload["token"]); + + if (!tokenValid) { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "INVALID_TOKEN", + "reason": "The token provided is not valid." + } + })); + + return ws.close(); + } + + ws.send(JSON.stringify({ + "task": "CONFIG", + "payload": { + "heartbeatInterval": 100 + } + })); + + timeoutProcess = setTimeout(() => { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "HEARTBEAT_MISS", + "reason": "The client missed a heartbeat (dead connection)." + } + })) + + ws.close(); + }, 10000); + + user = new User(ws, event.payload["token"]); + } + if (event.task == "HEARTBEAT") { + clearInterval(timeoutProcess); + + timeoutProcess = setTimeout(() => { + ws.send(JSON.stringify({ + "task": "TERMINATE", + "payload": { + "code": "HEARTBEAT_MISS", + "reason": "The client missed a heartbeat (dead connection)." + } + })) + + ws.close(); + }, 2000); + + if(ws["currentSessionId"] !== undefined) { + let session: Session = sessions.get(ws["currentSessionId"]); + + user.videoPositon = event.payload["videoPosition"]; + let index = session.users.findIndex(iuser => iuser.id == user.id); + + session.users[index].videoPositon = event.payload["videoPositon"]; + + sessions.set(session.id, session); + + let delays = {}; + + session.users.forEach(iuser => { + delays[iuser.id] = Math.floor((iuser.videoPositon * 1000) - (user.videoPositon * 1000)); + + if(session.currentVideo === null) return; + if(session.currentVideo.state !== VideoState.Playing) return; + if(delays[iuser.id] > 1000 || delays[iuser.id] < -1000) { + ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "position": user.videoPositon + } + })) + } + }); + ws.send(JSON.stringify({ + "task": "HEARTBEAT_ACK", + "payload": { + "delays": delays + } + })) + } else { + ws.send(JSON.stringify({ + "task": "HEARTBEAT_ACK", + "payload": { + "delays": [] + } + })) + } + } + if (event.task == "SESSION") { + if (event.payload["id"] == null) { + let session = new Session(); + session.users.push(user); + + sessions.set(session.id, session); + partycodes.set(session.partyCode, session.id); + + ws["currentSessionId"] = session.id; + + let safeUsers: APIUser[] = []; + + session.users.forEach(iuser => { + safeUsers.push({ + id: iuser.id + }); + }); + + ws.send(JSON.stringify({ + "task": "SESSION", + "payload": { + "code": session.partyCode, + "users": safeUsers, + "queue": session.videoQueue + } + })); + + /*let video = await user.getYoutubeVideo("wDVLrJESFNI"); + let ongoingVideo = video.toOngoingVideo(); + + session.currentVideo = ongoingVideo; + + sessions.set(session.id, session); + + ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "url": ongoingVideo.url, + "title": ongoingVideo.title, + "author": ongoingVideo.author, + "thumbnail": ongoingVideo.thumbnail, + "state": ongoingVideo.state, + "position": ongoingVideo.position + } + }));*/ + } else { + if (["", undefined, null].includes(event.payload["id"]) || typeof event.payload["id"] != "string") return ws.send(JSON.stringify({ + "task": "FAILURE", + "payload": { + "code": "BAD_CODE", + "reason": "The party code is not present or is malformed." + } + })); + + if (!partycodes.has(event.payload["id"])) return ws.send(JSON.stringify({ + "task": "FAILURE", + "payload": { + "code": "INVALID_CODE", + "reason": "This party code is not valid." + } + })); + + let sessionId: string = partycodes.get(event.payload["id"]); + let session: Session = sessions.get(sessionId); + + ws["currentSessionId"] = session.id; + session.users.push(user); + + sessions.set(session.id, session); + + let safeUsers: APIUser[] = []; + + session.users.forEach(iuser => { + safeUsers.push({ + id: iuser.id + }); + }); + + session.users.forEach(iuser => { + if (iuser.id == user.id) return; + + iuser.ws.send(JSON.stringify({ + "task": "UPDATE_USERS", + "payload": { + "users": safeUsers + } + })) + }); + + ws.send(JSON.stringify({ + "task": "SESSION", + "payload": { + "code": session.partyCode, + "users": safeUsers, + "queue": session.videoQueue + } + })); + + if(session.currentVideo != null) { + ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "id": session.currentVideo.id, + "url": session.currentVideo.url, + "title": session.currentVideo.title, + "author": session.currentVideo.author, + "duration": session.currentVideo.duration, + "duration_pretty": session.currentVideo.duration_pretty, + "thumbnail": session.currentVideo.thumbnail, + "state": session.currentVideo.state, + "position": session.currentVideo.position + } + })); + } + } + } + if(event.task == "VIDEO_UPDATE") { + let session: Session = sessions.get(ws["currentSessionId"]); + + if (session.currentVideo !== null && session.currentVideo.state === VideoState.Loading) return; + + if (session.currentVideo) { + if(event.payload["state"] === 2) { + session.currentVideo.state = VideoState.Buffering; + } else if (event.payload["state"] === 1) { + session.currentVideo.state = VideoState.Playing; + } else if (event.payload["state"] === 0) { + session.currentVideo.state = VideoState.Paused + } + + session.currentVideo.position = event.payload["position"]; + + if(event.payload["state"] === 0 && Math.floor(event.payload["position"]) >= (session.currentVideo.duration - 1)) { + console.log("aaaa gotta change (Twi is cute btw)"); + + session.currentVideo.state = VideoState.Loading; + + setTimeout(() => { + if (session.videoQueue.length === 0) session.currentVideo = null; + else session.currentVideo = (session.videoQueue.shift()).toOngoingVideo(); + + if(session.currentVideo !== null) session.currentVideo.state = VideoState.Playing; + + session.users.forEach(iuser => { + iuser.ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "id": session.currentVideo ? session.currentVideo.id : null, + "sha": session.currentVideo ? createHash("sha256").update(session.currentVideo.id).digest("hex") : null, + "url": session.currentVideo ? session.currentVideo.url : null, + "title": session.currentVideo ? session.currentVideo.title : null, + "duration": session.currentVideo ? session.currentVideo.duration : null, + "duration_pretty": session.currentVideo ? session.currentVideo.duration_pretty : null, + "author": session.currentVideo ? session.currentVideo.author : null, + "thumbnail": session.currentVideo ? session.currentVideo.thumbnail : null, + "state": session.currentVideo ? session.currentVideo.state : null, + "position": session.currentVideo ? session.currentVideo.position : null + } + })); + iuser.ws.send(JSON.stringify({ + "task": "UPDATE_QUEUE", + "payload": { + "queue": session.videoQueue, + "poster": user.id + } + })); + }); + }, 5000); + } else { + session.users.forEach(iuser => { + if (iuser.id == user.id) return; + + iuser.ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "state": event.payload["state"], + "position": event.payload["position"] + } + })) + }); + } + } + + sessions.set(session.id, session); + } + if(event.task == "UPDATE_QUEUE") { + let session: Session = sessions.get(ws["currentSessionId"]); + + if(event.payload["operation"] == "+") { + let video = await user.getYoutubeVideo(event.payload["video"]); + + session.videoQueue.push(video); + + if(session.videoQueue.length === 1 && session.currentVideo === null) { + session.currentVideo = (session.videoQueue.shift()).toOngoingVideo(); + + session.users.forEach(user => { + user.ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "id": session.currentVideo.id, + "sha": createHash("sha256").update(session.currentVideo.id).digest("hex"), + "url": session.currentVideo.url, + "title": session.currentVideo.title, + "author": session.currentVideo.author, + "thumbnail": session.currentVideo.thumbnail, + "state": session.currentVideo.state, + "position": session.currentVideo.position + } + })) + }); + } + + sessions.set(session.id, session); + } + + session.users.forEach(iuser => { + iuser.ws.send(JSON.stringify({ + "task": "UPDATE_QUEUE", + "payload": { + "queue": session.videoQueue, + "poster": user.id + } + })); + }); + } + }); + + ws.on("close", () => { + if (ws["currentSessionId"] != undefined) { + if (sessions.has(ws["currentSessionId"])) { + let session = sessions.get(ws["currentSessionId"]); + session.users = session.users.filter((iuser) => iuser.id != user.id); + sessions.set(session.id, session); + if (session.users.length == 0) { + sessions.delete(session.id); + partycodes.delete(session.partyCode); + } else { + let safeUsers: APIUser[] = []; + + session.users.forEach(iuser => { + safeUsers.push({ + id: iuser.id + }); + }); + + session.users.forEach(iuser => { + if (iuser.id == user.id) return; + + iuser.ws.send(JSON.stringify({ + "task": "UPDATE_USERS", + "payload": { + "users": safeUsers + } + })) + }); + } + } + } + }); +}); + +interface WSEvent { + task: string, + payload: object +}
\ No newline at end of file diff --git a/together/src/types/Session.ts b/together/src/types/Session.ts new file mode 100644 index 0000000..5d3e799 --- /dev/null +++ b/together/src/types/Session.ts @@ -0,0 +1,39 @@ +import {v4 as uuidv4} from 'uuid'; +import User from "./User"; +import Video, {OngoingVideo} from "./Video"; +import {generateParyCode} from "../utils/PartyCodes"; + +export default class Session { + public id: string; + public partyCode: string; + public videoQueue: Video[]; + public currentVideo: OngoingVideo; + public users: User[]; + + constructor() { + this.id = uuidv4(); + this.partyCode = generateParyCode(); + this.videoQueue = []; + this.currentVideo = null; + this.users = []; + } + + update() { + if(this.videoQueue.length === 0) this.currentVideo = null; + else this.currentVideo = (this.videoQueue.shift()).toOngoingVideo(); + + this.users.forEach(user => { + user.ws.send(JSON.stringify({ + "task": "VIDEO_UPDATE", + "payload": { + "url": this.currentVideo.url, + "title": this.currentVideo.title, + "author": this.currentVideo.author, + "thumbnail": this.currentVideo.thumbnail, + "state": this.currentVideo.state, + "position": this.currentVideo.position + } + })) + }); + } +}
\ No newline at end of file diff --git a/together/src/types/User.ts b/together/src/types/User.ts new file mode 100644 index 0000000..900acb8 --- /dev/null +++ b/together/src/types/User.ts @@ -0,0 +1,44 @@ +import {WebSocket} from "ws"; +import Video from "./Video"; +import superagent from "superagent"; + +export default class User { + public id: string; + public ws: WebSocket; + public videoPositon: number; + private apiToken: string; + + constructor(ws: WebSocket, apiToken: string) { + this.videoPositon = 0; + this.ws = ws; + this.apiToken = apiToken; + + this.getUserData(); + } + + async getUserData() { + let res = await superagent.get("https://peh-internal.minteck.org/api/me") + .set("Cookie", "PEH2_SESSION_TOKEN=" + this.apiToken) + .send(); + + this.id = res.body["id"]; + } + + async getYoutubeVideo(id: string): Promise<Video> { + let res = await superagent.get("https://peh-internal.minteck.org/api/video?id=" + id) + .set("Cookie", "PEH2_SESSION_TOKEN=" + this.apiToken) + .send(); + + return new Video(id, res.body["title"], res.body["author"], res.body["duration"], res.body["duration_pretty"], res.body["url"], res.body["poster"]); + } + + toAPIUser(): APIUser { + return { + id: this.id + } + } +} + +export interface APIUser { + id: string; +}
\ No newline at end of file diff --git a/together/src/types/Video.ts b/together/src/types/Video.ts new file mode 100644 index 0000000..9bba13f --- /dev/null +++ b/together/src/types/Video.ts @@ -0,0 +1,41 @@ +export default class Video { + public id: string; + public title: string; + public author: string; + public duration: number; + public duration_pretty: string; + public url: string; + public thumbnail: string; + + constructor(id, title, author, duration, duration_pretty, url, thumbnail) { + this.id = id; + this.title = title; + this.author = author; + this.duration = duration; + this.duration_pretty = duration_pretty + this.url = url; + this.thumbnail = thumbnail; + } + + toOngoingVideo() { + return new OngoingVideo(this.id, this.title, this.author, this.duration, this.duration_pretty, this.url, this.thumbnail); + } +} + +export class OngoingVideo extends Video { + public state: VideoState; + public position: number; + + constructor(id, title, author, duration, duration_pretty, url, thumbnail) { + super(id, title, author, duration, duration_pretty, url, thumbnail); + this.state = VideoState.Paused; + this.position = 0; + } +} + +export enum VideoState { + Paused, + Playing, + Buffering, + Loading +}
\ No newline at end of file diff --git a/together/src/utils/InternalAPI.ts b/together/src/utils/InternalAPI.ts new file mode 100644 index 0000000..b64f576 --- /dev/null +++ b/together/src/utils/InternalAPI.ts @@ -0,0 +1,10 @@ +import superagent from "superagent"; + +export async function isSessionValid(token: string): Promise<boolean> { + let res = await superagent.get("https://peh-internal.minteck.org/api/session") + .set("Cookie", "PEH2_SESSION_TOKEN=" + token) + .send(); + + if (res.text === "VALID") return true; + return false; +}
\ No newline at end of file diff --git a/together/src/utils/PartyCodes.ts b/together/src/utils/PartyCodes.ts new file mode 100644 index 0000000..1acc82e --- /dev/null +++ b/together/src/utils/PartyCodes.ts @@ -0,0 +1,5 @@ +import {randomBytes} from 'crypto'; + +export function generateParyCode(): string { + return parseInt(randomBytes(16).toString("hex"), 16).toString(36).substring(0, 8); +}
\ No newline at end of file |