From df4d87f0e640fa298610916992e1f73bed2910c2 Mon Sep 17 00:00:00 2001 From: Ari Archer Date: Tue, 26 Jul 2022 15:28:04 +0300 Subject: [PATCH] Add autotyping support Signed-off-by: Ari Archer --- requirements.txt | 1 + src/main.py | 325 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 316 insertions(+), 10 deletions(-) diff --git a/requirements.txt b/requirements.txt index 20f89b3..0c4138b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ aiohttp requests discord-webhook DateTime +guacamole-keysyms diff --git a/src/main.py b/src/main.py index 2e35c1c..afc7a72 100644 --- a/src/main.py +++ b/src/main.py @@ -18,6 +18,7 @@ from warnings import filterwarnings as filter_warnings import aiohttp # type: ignore import discord_webhook as dw # type: ignore import requests # type: ignore +from guacamole_keysyms import KeyIdentifiers, UnshiftedKeyCodes # type: ignore CONFIG_FILE: str = "config.json" CONFIG: Dict[str, Any] = { @@ -71,19 +72,20 @@ CONFIG: Dict[str, Any] = { "emperor palpatine", "mr. ware", ], + "keys": {}, } RANDOM: SystemRandom = SystemRandom() def gen_key() -> str: _key: list[str] = list( - f"{hex(RANDOM.randint(0, 123456789))}{uuid4().hex}{RANDOM.randint(0, 123456789101112)}" + f"{hex(RANDOM.randint(0, 123456789))}:{uuid4().hex}:{RANDOM.randint(0, 123456789101112)}" ) RANDOM.shuffle(_key) return "".join(_key) -GUAC_CACHE: Dict[str, Dict[Any, Any]] = {"guac": {}, "unguac": {}} +GUAC_CACHE: Dict[str, Dict[Any, Any]] = {"guac": {}, "unguac": {}, "guac-keys": {}} AUTH: Dict[str, Any] = {"users": set(), "key": ""} STATE: Dict[str, Any] = {"run": True, "vm": "", "chatlog": []} VOTE_STATES: Dict[int, str] = { @@ -92,6 +94,230 @@ VOTE_STATES: Dict[int, str] = { 2: "Reset vote ended", 3: "Timeout for reset", } +GUAC_KEYS_SPECIAL_MAPPING: Dict[str, Dict[str, UnshiftedKeyCodes]] = { + "escape": { + "n": UnshiftedKeyCodes.ENTER, + "e": KeyIdentifiers.ESCAPE, + "c": UnshiftedKeyCodes.CTRL, + "a": UnshiftedKeyCodes.ALT, + "b": UnshiftedKeyCodes.BACKSPACE, + "w": UnshiftedKeyCodes.LEFT_WINDOW_KEY, + ")": ord(")"), + }, + "arrow": { + "l": UnshiftedKeyCodes.LEFT_ARROW, + "u": UnshiftedKeyCodes.UP_ARROW, + "r": UnshiftedKeyCodes.RIGHT_ARROW, + "d": UnshiftedKeyCodes.DOWN_ARROW, + }, +} + + +def parse_guac_keys(keys: str) -> Union[List[Tuple[int, int]], str]: + _cache_name: str = "guac-keys" + + if len(GUAC_CACHE[_cache_name]) > CONFIG["max-cache"]: + GUAC_CACHE[_cache_name].clear() + elif keys in GUAC_CACHE[_cache_name]: + return GUAC_CACHE[_cache_name][keys] + + results: List[Tuple[int, int]] = [] + max_ip: int = len(keys) + ip: int = 0 + + special_chars: Tuple[str, ...] = ("^", "\\", "~", "[", "(", "!", "{", "|") + + while ip < max_ip: + char: str = keys[ip] + + def check_inc_ip(msg: str, /, ip: int = ip) -> Optional[str]: + ip += 1 + + if ip > max_ip: + print(f"IP overflow: {msg}", file=sys.stderr) + return msg + + return None + + if char in special_chars: + match char: + case "^": + ip += 1 + if ( + ret := check_inc_ip("^ is missing a control character") + ) is not None: + return ret + char = keys[ip] + + results.extend( + ( + (UnshiftedKeyCodes.CTRL, 1), + (ord(char), 1), + (ord(char), 0), + (UnshiftedKeyCodes.CTRL, 0), + ) + ) + + case "\\": + ip += 1 + if ( + ret := check_inc_ip("\\ is missing a special key character") + ) is not None: + return ret + char = keys[ip] + + if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]: + return f"Invalid special key char: {char!r}" + + results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 1)) + + case "~": + ip += 1 + if ( + ret := check_inc_ip("^ is missing a special key character") + ) is not None: + return ret + char = keys[ip] + + if char not in GUAC_KEYS_SPECIAL_MAPPING["arrow"]: + return f"Invalid arrow key char: {char!r}" + + results.extend( + ( + (GUAC_KEYS_SPECIAL_MAPPING["arrow"][char], 1), + (GUAC_KEYS_SPECIAL_MAPPING["arrow"][char], 0), + ) + ) + + case "[": + _f_key: str = "" + + while char != "]": + _f_key += char + + ip += 1 + if (ret := check_inc_ip("No F escape end")) is not None: + return ret + char = keys[ip] + + _f_key = _f_key[1:] + + if ( + f_key := getattr(UnshiftedKeyCodes, f"F{_f_key}", None) + ) is None: + return f"Invalid F key: {_f_key!r}" + + results.extend(((f_key, 1), (f_key, 0))) + + case "(": + _ascii_keys: str = "" + + while char != ")": + _ascii_keys += char + + ip += 1 + if (ret := check_inc_ip("No ASCII keys end")) is not None: + return ret + char = keys[ip] + + _ascii_keys = _ascii_keys[1:] + + results.extend( + tuple((ord(c), state) for c in _ascii_keys for state in (1, 0)) + ) + + case "\\": + ip += 1 + if ( + ret := check_inc_ip("! is missing a special key character") + ) is not None: + return ret + char = keys[ip] + + if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]: + return f"Invalid special key char: {char!r}" + + results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 1)) + + case "!": + ip += 1 + if ( + ret := check_inc_ip("! is missing a special key character") + ) is not None: + return ret + char = keys[ip] + + if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]: + return f"Invalid special key char: {char!r}" + + results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 0)) + + case "{": + _repeat_ammount: str = "" + _repeat_ammount_ip: int = ip + _repeat_hit_group: bool = False + + while char != "}": + if char == ":" and not _repeat_hit_group: + _repeat_hit_group = True + + if ( + not char.isnumeric() + and ip != _repeat_ammount_ip + and char != ":" + and not _repeat_hit_group + ): + return f"Invalid character in repeat: {char!r}" + + _repeat_ammount += char + + ip += 1 + if (ret := check_inc_ip("No repeat end")) is not None: + return ret + char = keys[ip] + + _repeat_ammount = _repeat_ammount[1:] + + repeat_groups: List = _repeat_ammount.split(":") + + if len(repeat_groups) < 2: + repeat_groups.append(1) + + repeat_groups = list(map(int, repeat_groups)) + + repeat_group: List[Tuple[int, int]] = results[-repeat_groups[0] :] + + for _ in range(repeat_groups[1]): + results.extend(repeat_group) + + case "|": + ip += 1 + if ( + ret := check_inc_ip("| is missing a special key character") + ) is not None: + return ret + char = keys[ip] + + if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]: + return f"Invalid special key char: {char!r}" + + results.extend( + (GUAC_KEYS_SPECIAL_MAPPING["escape"][char], state) + for state in (1, 0) + ) + else: + results.extend( + ( + (ord(char), 1), + (ord(char), 0), + ) + ) + + ip += 1 + + GUAC_CACHE["guac-keys"][keys] = results + + return results def paste(content: str, no_content_msg: str) -> Union[str, Tuple[None, str]]: @@ -794,6 +1020,80 @@ class CommandParser: "chat", f"@{user} {args[0]!r} is now marked as not an impersonator" ) + @staticmethod + def cmd_turn(user: str, args: List[str]) -> str: + """Auth command, takes turn + Syntax: turn""" + + return guac_msg("turn") + + @staticmethod + def cmd_keys(user: str, args: List[str]) -> Union[Tuple[str, ...], str]: + """Auth command, types a supplied key combo + Syntax: keys """ + + if not args: + return guac_msg(f"@{user} ??? What, what do I type, like? Huh?") + + keys: Union[List[Tuple[int, int]], str] = parse_guac_keys(" ".join(args)) + + if type(keys) is str: + return guac_msg("chat", f"@{user} {keys}") + + return tuple(guac_msg("key", str(code), str(state)) for code, state in keys) # type: ignore + + @staticmethod + def cmd_endturn(user: str, args: List[str]) -> str: + """Auth command, ends turn + Syntax: endturn""" + + return guac_msg("turn", "0") + + @staticmethod + def cmd_skeys(user: str, args: List[str]) -> str: + """Auth command, lists the keys saved + Syntax: skeys""" + + pid = paste( + "\n".join( + f"* {key_name} ({key})" for key_name, key in CONFIG["keys"].items() + ), + f"@{user} No key combos saved so... what do we do?", + ) + + if pid[0] is None: + return pid[1] + + return guac_msg("chat", f"@{user} Here's a list of keys: {pid}") + + @staticmethod + def cmd_skey(user: str, args: List[str]) -> str: + """Auth command, save a key combo + Syntax: skey """ + + if len(args) < 2: + return guac_msg( + "chat", f"@{user} I need both the name of the key combo and the content" + ) + + CONFIG["keys"][args[0]] = " ".join(args[1:]) + save_config() + + return guac_msg("chat", f"@{user} Key combo {args[0]!r} saved") + + @classmethod + def cmd_ikey(cls, user: str, args: List[str]) -> Union[str, Tuple[str, ...]]: + """Auth command, invoke a key combo + Syntax: ikey """ + + if not args: + return guac_msg("chat", f"@{user} Gimme the key combo name") + + if args[0] not in CONFIG["keys"]: + return guac_msg("chat", f"@{user} Coun't find {args[0]!r} :shrug:") + + return cls.cmd_keys(user, [CONFIG["keys"][args[0]]]) + class MessageParser: @staticmethod @@ -801,7 +1101,7 @@ class MessageParser: return guac_msg("nop") @classmethod - def type_chat(cls, content: List[str]) -> str: + def type_chat(cls, content: List[str]) -> Union[str, Tuple[str, ...]]: str_msg: str = " ".join(content[1:]) user: str = content[0].strip() @@ -1029,17 +1329,22 @@ async def main() -> int: parsed_msg: Optional[List[str]] = unguac_msg(msg.data) if parsed_msg is None: - ws.send_str( + ws.send_str( # type: ignore guac_msg("chat", f"The guac parser failed on message: {msg!r}") ) continue - await ws.send_str( - ( - getattr(MessageParser, f"type_{parsed_msg[0]}", None) - or MessageParser.type_nop - )(parsed_msg[1:]) - ) + result: Union[str, Tuple[str]] = ( + getattr(MessageParser, f"type_{parsed_msg[0]}", None) + or MessageParser.type_nop + )(parsed_msg[1:]) + + if type(result) is str: + result = (result,) + + for send_msg in result: + await asyncio.sleep(0.08) + await ws.send_str(send_msg) if not STATE["run"]: log("Run state was set to false")