summaryrefslogtreecommitdiff
path: root/together/src
diff options
context:
space:
mode:
authorMinteck <contact@minteck.org>2022-08-21 17:31:56 +0200
committerMinteck <contact@minteck.org>2022-08-21 17:31:56 +0200
commita2df9a69dcc14cb70118cda2ded499055e7ee358 (patch)
tree6dd283e4e9452d38bce81ddaaae49b5335755842 /together/src
parent84dd0735820b16b60f600284d35183d76547a71f (diff)
downloadpluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.tar.gz
pluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.tar.bz2
pluralconnect-a2df9a69dcc14cb70118cda2ded499055e7ee358.zip
m. update
Diffstat (limited to 'together/src')
-rw-r--r--together/src/index.ts413
-rw-r--r--together/src/types/Session.ts39
-rw-r--r--together/src/types/User.ts44
-rw-r--r--together/src/types/Video.ts41
-rw-r--r--together/src/utils/InternalAPI.ts10
-rw-r--r--together/src/utils/PartyCodes.ts5
6 files changed, 552 insertions, 0 deletions
diff --git a/together/src/index.ts b/together/src/index.ts
new file mode 100644
index 0000000..bfe0d0e
--- /dev/null
+++ b/together/src/index.ts
@@ -0,0 +1,413 @@
+// pony pone pone pony
+
+import {WebSocket, WebSocketServer} from "ws";
+import {isSessionValid} from "./utils/InternalAPI";
+import User, {APIUser} from "./types/User";
+import Session from "./types/Session";
+import {VideoState} from "./types/Video";
+import {createHash} from 'crypto';
+
+const wss = new WebSocketServer({port: 22666});
+
+let sessions: Map<string, Session> = new Map();
+let partycodes: Map<string, string> = new Map();
+
+wss.on('connection', (ws: WebSocket) => {
+ let user: User;
+ let timeoutProcess = setTimeout(() => {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "NO_IDENT",
+ "reason": "The client did not identify in time."
+ }
+ }));
+
+ ws.close();
+ }, 1000);
+ ws.on("message", async data => {
+ let event = JSON.parse(data.toString()) as WSEvent;
+
+ if (event.task === "IDENTIFY") {
+ if(user !== undefined) {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "ALREADY_IDENT",
+ "reason": "This session has already identified."
+ }
+ }));
+
+ return ws.close();
+ }
+ clearTimeout(timeoutProcess);
+
+ if (["", undefined, null].includes(event.payload["token"]) || typeof event.payload["token"] !== "string") {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "BAD_TOKEN",
+ "reason": "The token is not provided or malformed."
+ }
+ }));
+
+ return ws.close();
+ }
+
+ let tokenValid = await isSessionValid(event.payload["token"]);
+
+ if (!tokenValid) {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "INVALID_TOKEN",
+ "reason": "The token provided is not valid."
+ }
+ }));
+
+ return ws.close();
+ }
+
+ ws.send(JSON.stringify({
+ "task": "CONFIG",
+ "payload": {
+ "heartbeatInterval": 100
+ }
+ }));
+
+ timeoutProcess = setTimeout(() => {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "HEARTBEAT_MISS",
+ "reason": "The client missed a heartbeat (dead connection)."
+ }
+ }))
+
+ ws.close();
+ }, 10000);
+
+ user = new User(ws, event.payload["token"]);
+ }
+ if (event.task == "HEARTBEAT") {
+ clearInterval(timeoutProcess);
+
+ timeoutProcess = setTimeout(() => {
+ ws.send(JSON.stringify({
+ "task": "TERMINATE",
+ "payload": {
+ "code": "HEARTBEAT_MISS",
+ "reason": "The client missed a heartbeat (dead connection)."
+ }
+ }))
+
+ ws.close();
+ }, 2000);
+
+ if(ws["currentSessionId"] !== undefined) {
+ let session: Session = sessions.get(ws["currentSessionId"]);
+
+ user.videoPositon = event.payload["videoPosition"];
+ let index = session.users.findIndex(iuser => iuser.id == user.id);
+
+ session.users[index].videoPositon = event.payload["videoPositon"];
+
+ sessions.set(session.id, session);
+
+ let delays = {};
+
+ session.users.forEach(iuser => {
+ delays[iuser.id] = Math.floor((iuser.videoPositon * 1000) - (user.videoPositon * 1000));
+
+ if(session.currentVideo === null) return;
+ if(session.currentVideo.state !== VideoState.Playing) return;
+ if(delays[iuser.id] > 1000 || delays[iuser.id] < -1000) {
+ ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "position": user.videoPositon
+ }
+ }))
+ }
+ });
+ ws.send(JSON.stringify({
+ "task": "HEARTBEAT_ACK",
+ "payload": {
+ "delays": delays
+ }
+ }))
+ } else {
+ ws.send(JSON.stringify({
+ "task": "HEARTBEAT_ACK",
+ "payload": {
+ "delays": []
+ }
+ }))
+ }
+ }
+ if (event.task == "SESSION") {
+ if (event.payload["id"] == null) {
+ let session = new Session();
+ session.users.push(user);
+
+ sessions.set(session.id, session);
+ partycodes.set(session.partyCode, session.id);
+
+ ws["currentSessionId"] = session.id;
+
+ let safeUsers: APIUser[] = [];
+
+ session.users.forEach(iuser => {
+ safeUsers.push({
+ id: iuser.id
+ });
+ });
+
+ ws.send(JSON.stringify({
+ "task": "SESSION",
+ "payload": {
+ "code": session.partyCode,
+ "users": safeUsers,
+ "queue": session.videoQueue
+ }
+ }));
+
+ /*let video = await user.getYoutubeVideo("wDVLrJESFNI");
+ let ongoingVideo = video.toOngoingVideo();
+
+ session.currentVideo = ongoingVideo;
+
+ sessions.set(session.id, session);
+
+ ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "url": ongoingVideo.url,
+ "title": ongoingVideo.title,
+ "author": ongoingVideo.author,
+ "thumbnail": ongoingVideo.thumbnail,
+ "state": ongoingVideo.state,
+ "position": ongoingVideo.position
+ }
+ }));*/
+ } else {
+ if (["", undefined, null].includes(event.payload["id"]) || typeof event.payload["id"] != "string") return ws.send(JSON.stringify({
+ "task": "FAILURE",
+ "payload": {
+ "code": "BAD_CODE",
+ "reason": "The party code is not present or is malformed."
+ }
+ }));
+
+ if (!partycodes.has(event.payload["id"])) return ws.send(JSON.stringify({
+ "task": "FAILURE",
+ "payload": {
+ "code": "INVALID_CODE",
+ "reason": "This party code is not valid."
+ }
+ }));
+
+ let sessionId: string = partycodes.get(event.payload["id"]);
+ let session: Session = sessions.get(sessionId);
+
+ ws["currentSessionId"] = session.id;
+ session.users.push(user);
+
+ sessions.set(session.id, session);
+
+ let safeUsers: APIUser[] = [];
+
+ session.users.forEach(iuser => {
+ safeUsers.push({
+ id: iuser.id
+ });
+ });
+
+ session.users.forEach(iuser => {
+ if (iuser.id == user.id) return;
+
+ iuser.ws.send(JSON.stringify({
+ "task": "UPDATE_USERS",
+ "payload": {
+ "users": safeUsers
+ }
+ }))
+ });
+
+ ws.send(JSON.stringify({
+ "task": "SESSION",
+ "payload": {
+ "code": session.partyCode,
+ "users": safeUsers,
+ "queue": session.videoQueue
+ }
+ }));
+
+ if(session.currentVideo != null) {
+ ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "id": session.currentVideo.id,
+ "url": session.currentVideo.url,
+ "title": session.currentVideo.title,
+ "author": session.currentVideo.author,
+ "duration": session.currentVideo.duration,
+ "duration_pretty": session.currentVideo.duration_pretty,
+ "thumbnail": session.currentVideo.thumbnail,
+ "state": session.currentVideo.state,
+ "position": session.currentVideo.position
+ }
+ }));
+ }
+ }
+ }
+ if(event.task == "VIDEO_UPDATE") {
+ let session: Session = sessions.get(ws["currentSessionId"]);
+
+ if (session.currentVideo !== null && session.currentVideo.state === VideoState.Loading) return;
+
+ if (session.currentVideo) {
+ if(event.payload["state"] === 2) {
+ session.currentVideo.state = VideoState.Buffering;
+ } else if (event.payload["state"] === 1) {
+ session.currentVideo.state = VideoState.Playing;
+ } else if (event.payload["state"] === 0) {
+ session.currentVideo.state = VideoState.Paused
+ }
+
+ session.currentVideo.position = event.payload["position"];
+
+ if(event.payload["state"] === 0 && Math.floor(event.payload["position"]) >= (session.currentVideo.duration - 1)) {
+ console.log("aaaa gotta change (Twi is cute btw)");
+
+ session.currentVideo.state = VideoState.Loading;
+
+ setTimeout(() => {
+ if (session.videoQueue.length === 0) session.currentVideo = null;
+ else session.currentVideo = (session.videoQueue.shift()).toOngoingVideo();
+
+ if(session.currentVideo !== null) session.currentVideo.state = VideoState.Playing;
+
+ session.users.forEach(iuser => {
+ iuser.ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "id": session.currentVideo ? session.currentVideo.id : null,
+ "sha": session.currentVideo ? createHash("sha256").update(session.currentVideo.id).digest("hex") : null,
+ "url": session.currentVideo ? session.currentVideo.url : null,
+ "title": session.currentVideo ? session.currentVideo.title : null,
+ "duration": session.currentVideo ? session.currentVideo.duration : null,
+ "duration_pretty": session.currentVideo ? session.currentVideo.duration_pretty : null,
+ "author": session.currentVideo ? session.currentVideo.author : null,
+ "thumbnail": session.currentVideo ? session.currentVideo.thumbnail : null,
+ "state": session.currentVideo ? session.currentVideo.state : null,
+ "position": session.currentVideo ? session.currentVideo.position : null
+ }
+ }));
+ iuser.ws.send(JSON.stringify({
+ "task": "UPDATE_QUEUE",
+ "payload": {
+ "queue": session.videoQueue,
+ "poster": user.id
+ }
+ }));
+ });
+ }, 5000);
+ } else {
+ session.users.forEach(iuser => {
+ if (iuser.id == user.id) return;
+
+ iuser.ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "state": event.payload["state"],
+ "position": event.payload["position"]
+ }
+ }))
+ });
+ }
+ }
+
+ sessions.set(session.id, session);
+ }
+ if(event.task == "UPDATE_QUEUE") {
+ let session: Session = sessions.get(ws["currentSessionId"]);
+
+ if(event.payload["operation"] == "+") {
+ let video = await user.getYoutubeVideo(event.payload["video"]);
+
+ session.videoQueue.push(video);
+
+ if(session.videoQueue.length === 1 && session.currentVideo === null) {
+ session.currentVideo = (session.videoQueue.shift()).toOngoingVideo();
+
+ session.users.forEach(user => {
+ user.ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "id": session.currentVideo.id,
+ "sha": createHash("sha256").update(session.currentVideo.id).digest("hex"),
+ "url": session.currentVideo.url,
+ "title": session.currentVideo.title,
+ "author": session.currentVideo.author,
+ "thumbnail": session.currentVideo.thumbnail,
+ "state": session.currentVideo.state,
+ "position": session.currentVideo.position
+ }
+ }))
+ });
+ }
+
+ sessions.set(session.id, session);
+ }
+
+ session.users.forEach(iuser => {
+ iuser.ws.send(JSON.stringify({
+ "task": "UPDATE_QUEUE",
+ "payload": {
+ "queue": session.videoQueue,
+ "poster": user.id
+ }
+ }));
+ });
+ }
+ });
+
+ ws.on("close", () => {
+ if (ws["currentSessionId"] != undefined) {
+ if (sessions.has(ws["currentSessionId"])) {
+ let session = sessions.get(ws["currentSessionId"]);
+ session.users = session.users.filter((iuser) => iuser.id != user.id);
+ sessions.set(session.id, session);
+ if (session.users.length == 0) {
+ sessions.delete(session.id);
+ partycodes.delete(session.partyCode);
+ } else {
+ let safeUsers: APIUser[] = [];
+
+ session.users.forEach(iuser => {
+ safeUsers.push({
+ id: iuser.id
+ });
+ });
+
+ session.users.forEach(iuser => {
+ if (iuser.id == user.id) return;
+
+ iuser.ws.send(JSON.stringify({
+ "task": "UPDATE_USERS",
+ "payload": {
+ "users": safeUsers
+ }
+ }))
+ });
+ }
+ }
+ }
+ });
+});
+
+interface WSEvent {
+ task: string,
+ payload: object
+} \ No newline at end of file
diff --git a/together/src/types/Session.ts b/together/src/types/Session.ts
new file mode 100644
index 0000000..5d3e799
--- /dev/null
+++ b/together/src/types/Session.ts
@@ -0,0 +1,39 @@
+import {v4 as uuidv4} from 'uuid';
+import User from "./User";
+import Video, {OngoingVideo} from "./Video";
+import {generateParyCode} from "../utils/PartyCodes";
+
+export default class Session {
+ public id: string;
+ public partyCode: string;
+ public videoQueue: Video[];
+ public currentVideo: OngoingVideo;
+ public users: User[];
+
+ constructor() {
+ this.id = uuidv4();
+ this.partyCode = generateParyCode();
+ this.videoQueue = [];
+ this.currentVideo = null;
+ this.users = [];
+ }
+
+ update() {
+ if(this.videoQueue.length === 0) this.currentVideo = null;
+ else this.currentVideo = (this.videoQueue.shift()).toOngoingVideo();
+
+ this.users.forEach(user => {
+ user.ws.send(JSON.stringify({
+ "task": "VIDEO_UPDATE",
+ "payload": {
+ "url": this.currentVideo.url,
+ "title": this.currentVideo.title,
+ "author": this.currentVideo.author,
+ "thumbnail": this.currentVideo.thumbnail,
+ "state": this.currentVideo.state,
+ "position": this.currentVideo.position
+ }
+ }))
+ });
+ }
+} \ No newline at end of file
diff --git a/together/src/types/User.ts b/together/src/types/User.ts
new file mode 100644
index 0000000..900acb8
--- /dev/null
+++ b/together/src/types/User.ts
@@ -0,0 +1,44 @@
+import {WebSocket} from "ws";
+import Video from "./Video";
+import superagent from "superagent";
+
+export default class User {
+ public id: string;
+ public ws: WebSocket;
+ public videoPositon: number;
+ private apiToken: string;
+
+ constructor(ws: WebSocket, apiToken: string) {
+ this.videoPositon = 0;
+ this.ws = ws;
+ this.apiToken = apiToken;
+
+ this.getUserData();
+ }
+
+ async getUserData() {
+ let res = await superagent.get("https://peh-internal.minteck.org/api/me")
+ .set("Cookie", "PEH2_SESSION_TOKEN=" + this.apiToken)
+ .send();
+
+ this.id = res.body["id"];
+ }
+
+ async getYoutubeVideo(id: string): Promise<Video> {
+ let res = await superagent.get("https://peh-internal.minteck.org/api/video?id=" + id)
+ .set("Cookie", "PEH2_SESSION_TOKEN=" + this.apiToken)
+ .send();
+
+ return new Video(id, res.body["title"], res.body["author"], res.body["duration"], res.body["duration_pretty"], res.body["url"], res.body["poster"]);
+ }
+
+ toAPIUser(): APIUser {
+ return {
+ id: this.id
+ }
+ }
+}
+
+export interface APIUser {
+ id: string;
+} \ No newline at end of file
diff --git a/together/src/types/Video.ts b/together/src/types/Video.ts
new file mode 100644
index 0000000..9bba13f
--- /dev/null
+++ b/together/src/types/Video.ts
@@ -0,0 +1,41 @@
+export default class Video {
+ public id: string;
+ public title: string;
+ public author: string;
+ public duration: number;
+ public duration_pretty: string;
+ public url: string;
+ public thumbnail: string;
+
+ constructor(id, title, author, duration, duration_pretty, url, thumbnail) {
+ this.id = id;
+ this.title = title;
+ this.author = author;
+ this.duration = duration;
+ this.duration_pretty = duration_pretty
+ this.url = url;
+ this.thumbnail = thumbnail;
+ }
+
+ toOngoingVideo() {
+ return new OngoingVideo(this.id, this.title, this.author, this.duration, this.duration_pretty, this.url, this.thumbnail);
+ }
+}
+
+export class OngoingVideo extends Video {
+ public state: VideoState;
+ public position: number;
+
+ constructor(id, title, author, duration, duration_pretty, url, thumbnail) {
+ super(id, title, author, duration, duration_pretty, url, thumbnail);
+ this.state = VideoState.Paused;
+ this.position = 0;
+ }
+}
+
+export enum VideoState {
+ Paused,
+ Playing,
+ Buffering,
+ Loading
+} \ No newline at end of file
diff --git a/together/src/utils/InternalAPI.ts b/together/src/utils/InternalAPI.ts
new file mode 100644
index 0000000..b64f576
--- /dev/null
+++ b/together/src/utils/InternalAPI.ts
@@ -0,0 +1,10 @@
+import superagent from "superagent";
+
+export async function isSessionValid(token: string): Promise<boolean> {
+ let res = await superagent.get("https://peh-internal.minteck.org/api/session")
+ .set("Cookie", "PEH2_SESSION_TOKEN=" + token)
+ .send();
+
+ if (res.text === "VALID") return true;
+ return false;
+} \ No newline at end of file
diff --git a/together/src/utils/PartyCodes.ts b/together/src/utils/PartyCodes.ts
new file mode 100644
index 0000000..1acc82e
--- /dev/null
+++ b/together/src/utils/PartyCodes.ts
@@ -0,0 +1,5 @@
+import {randomBytes} from 'crypto';
+
+export function generateParyCode(): string {
+ return parseInt(randomBytes(16).toString("hex"), 16).toString(36).substring(0, 8);
+} \ No newline at end of file