Add autotyping support

Signed-off-by: Ari Archer <ari.web.xyz@gmail.com>
This commit is contained in:
Ari Archer 2022-07-26 15:28:04 +03:00
parent f3fbad7e5a
commit df4d87f0e6
Signed by untrusted user who does not match committer: ari
GPG key ID: A50D5B4B599AF8A2
2 changed files with 316 additions and 10 deletions

View file

@ -4,3 +4,4 @@ aiohttp
requests
discord-webhook
DateTime
guacamole-keysyms

View file

@ -18,6 +18,7 @@ from warnings import filterwarnings as filter_warnings
import aiohttp # type: ignore
import discord_webhook as dw # type: ignore
import requests # type: ignore
from guacamole_keysyms import KeyIdentifiers, UnshiftedKeyCodes # type: ignore
CONFIG_FILE: str = "config.json"
CONFIG: Dict[str, Any] = {
@ -71,19 +72,20 @@ CONFIG: Dict[str, Any] = {
"emperor palpatine",
"mr. ware",
],
"keys": {},
}
RANDOM: SystemRandom = SystemRandom()
def gen_key() -> str:
_key: list[str] = list(
f"{hex(RANDOM.randint(0, 123456789))}{uuid4().hex}{RANDOM.randint(0, 123456789101112)}"
f"{hex(RANDOM.randint(0, 123456789))}:{uuid4().hex}:{RANDOM.randint(0, 123456789101112)}"
)
RANDOM.shuffle(_key)
return "".join(_key)
GUAC_CACHE: Dict[str, Dict[Any, Any]] = {"guac": {}, "unguac": {}}
GUAC_CACHE: Dict[str, Dict[Any, Any]] = {"guac": {}, "unguac": {}, "guac-keys": {}}
AUTH: Dict[str, Any] = {"users": set(), "key": ""}
STATE: Dict[str, Any] = {"run": True, "vm": "", "chatlog": []}
VOTE_STATES: Dict[int, str] = {
@ -92,6 +94,230 @@ VOTE_STATES: Dict[int, str] = {
2: "Reset vote ended",
3: "Timeout for reset",
}
GUAC_KEYS_SPECIAL_MAPPING: Dict[str, Dict[str, UnshiftedKeyCodes]] = {
"escape": {
"n": UnshiftedKeyCodes.ENTER,
"e": KeyIdentifiers.ESCAPE,
"c": UnshiftedKeyCodes.CTRL,
"a": UnshiftedKeyCodes.ALT,
"b": UnshiftedKeyCodes.BACKSPACE,
"w": UnshiftedKeyCodes.LEFT_WINDOW_KEY,
")": ord(")"),
},
"arrow": {
"l": UnshiftedKeyCodes.LEFT_ARROW,
"u": UnshiftedKeyCodes.UP_ARROW,
"r": UnshiftedKeyCodes.RIGHT_ARROW,
"d": UnshiftedKeyCodes.DOWN_ARROW,
},
}
def parse_guac_keys(keys: str) -> Union[List[Tuple[int, int]], str]:
_cache_name: str = "guac-keys"
if len(GUAC_CACHE[_cache_name]) > CONFIG["max-cache"]:
GUAC_CACHE[_cache_name].clear()
elif keys in GUAC_CACHE[_cache_name]:
return GUAC_CACHE[_cache_name][keys]
results: List[Tuple[int, int]] = []
max_ip: int = len(keys)
ip: int = 0
special_chars: Tuple[str, ...] = ("^", "\\", "~", "[", "(", "!", "{", "|")
while ip < max_ip:
char: str = keys[ip]
def check_inc_ip(msg: str, /, ip: int = ip) -> Optional[str]:
ip += 1
if ip > max_ip:
print(f"IP overflow: {msg}", file=sys.stderr)
return msg
return None
if char in special_chars:
match char:
case "^":
ip += 1
if (
ret := check_inc_ip("^ is missing a control character")
) is not None:
return ret
char = keys[ip]
results.extend(
(
(UnshiftedKeyCodes.CTRL, 1),
(ord(char), 1),
(ord(char), 0),
(UnshiftedKeyCodes.CTRL, 0),
)
)
case "\\":
ip += 1
if (
ret := check_inc_ip("\\ is missing a special key character")
) is not None:
return ret
char = keys[ip]
if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]:
return f"Invalid special key char: {char!r}"
results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 1))
case "~":
ip += 1
if (
ret := check_inc_ip("^ is missing a special key character")
) is not None:
return ret
char = keys[ip]
if char not in GUAC_KEYS_SPECIAL_MAPPING["arrow"]:
return f"Invalid arrow key char: {char!r}"
results.extend(
(
(GUAC_KEYS_SPECIAL_MAPPING["arrow"][char], 1),
(GUAC_KEYS_SPECIAL_MAPPING["arrow"][char], 0),
)
)
case "[":
_f_key: str = ""
while char != "]":
_f_key += char
ip += 1
if (ret := check_inc_ip("No F escape end")) is not None:
return ret
char = keys[ip]
_f_key = _f_key[1:]
if (
f_key := getattr(UnshiftedKeyCodes, f"F{_f_key}", None)
) is None:
return f"Invalid F key: {_f_key!r}"
results.extend(((f_key, 1), (f_key, 0)))
case "(":
_ascii_keys: str = ""
while char != ")":
_ascii_keys += char
ip += 1
if (ret := check_inc_ip("No ASCII keys end")) is not None:
return ret
char = keys[ip]
_ascii_keys = _ascii_keys[1:]
results.extend(
tuple((ord(c), state) for c in _ascii_keys for state in (1, 0))
)
case "\\":
ip += 1
if (
ret := check_inc_ip("! is missing a special key character")
) is not None:
return ret
char = keys[ip]
if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]:
return f"Invalid special key char: {char!r}"
results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 1))
case "!":
ip += 1
if (
ret := check_inc_ip("! is missing a special key character")
) is not None:
return ret
char = keys[ip]
if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]:
return f"Invalid special key char: {char!r}"
results.append((GUAC_KEYS_SPECIAL_MAPPING["escape"][char], 0))
case "{":
_repeat_ammount: str = ""
_repeat_ammount_ip: int = ip
_repeat_hit_group: bool = False
while char != "}":
if char == ":" and not _repeat_hit_group:
_repeat_hit_group = True
if (
not char.isnumeric()
and ip != _repeat_ammount_ip
and char != ":"
and not _repeat_hit_group
):
return f"Invalid character in repeat: {char!r}"
_repeat_ammount += char
ip += 1
if (ret := check_inc_ip("No repeat end")) is not None:
return ret
char = keys[ip]
_repeat_ammount = _repeat_ammount[1:]
repeat_groups: List = _repeat_ammount.split(":")
if len(repeat_groups) < 2:
repeat_groups.append(1)
repeat_groups = list(map(int, repeat_groups))
repeat_group: List[Tuple[int, int]] = results[-repeat_groups[0] :]
for _ in range(repeat_groups[1]):
results.extend(repeat_group)
case "|":
ip += 1
if (
ret := check_inc_ip("| is missing a special key character")
) is not None:
return ret
char = keys[ip]
if char not in GUAC_KEYS_SPECIAL_MAPPING["escape"]:
return f"Invalid special key char: {char!r}"
results.extend(
(GUAC_KEYS_SPECIAL_MAPPING["escape"][char], state)
for state in (1, 0)
)
else:
results.extend(
(
(ord(char), 1),
(ord(char), 0),
)
)
ip += 1
GUAC_CACHE["guac-keys"][keys] = results
return results
def paste(content: str, no_content_msg: str) -> Union[str, Tuple[None, str]]:
@ -794,6 +1020,80 @@ class CommandParser:
"chat", f"@{user} {args[0]!r} is now marked as not an impersonator"
)
@staticmethod
def cmd_turn(user: str, args: List[str]) -> str:
"""Auth command, takes turn
Syntax: turn"""
return guac_msg("turn")
@staticmethod
def cmd_keys(user: str, args: List[str]) -> Union[Tuple[str, ...], str]:
"""Auth command, types a supplied key combo
Syntax: keys <combo>"""
if not args:
return guac_msg(f"@{user} ??? What, what do I type, like? Huh?")
keys: Union[List[Tuple[int, int]], str] = parse_guac_keys(" ".join(args))
if type(keys) is str:
return guac_msg("chat", f"@{user} {keys}")
return tuple(guac_msg("key", str(code), str(state)) for code, state in keys) # type: ignore
@staticmethod
def cmd_endturn(user: str, args: List[str]) -> str:
"""Auth command, ends turn
Syntax: endturn"""
return guac_msg("turn", "0")
@staticmethod
def cmd_skeys(user: str, args: List[str]) -> str:
"""Auth command, lists the keys saved
Syntax: skeys"""
pid = paste(
"\n".join(
f"* {key_name} ({key})" for key_name, key in CONFIG["keys"].items()
),
f"@{user} No key combos saved so... what do we do?",
)
if pid[0] is None:
return pid[1]
return guac_msg("chat", f"@{user} Here's a list of keys: {pid}")
@staticmethod
def cmd_skey(user: str, args: List[str]) -> str:
"""Auth command, save a key combo
Syntax: skey <name> <combo>"""
if len(args) < 2:
return guac_msg(
"chat", f"@{user} I need both the name of the key combo and the content"
)
CONFIG["keys"][args[0]] = " ".join(args[1:])
save_config()
return guac_msg("chat", f"@{user} Key combo {args[0]!r} saved")
@classmethod
def cmd_ikey(cls, user: str, args: List[str]) -> Union[str, Tuple[str, ...]]:
"""Auth command, invoke a key combo
Syntax: ikey <combo_name>"""
if not args:
return guac_msg("chat", f"@{user} Gimme the key combo name")
if args[0] not in CONFIG["keys"]:
return guac_msg("chat", f"@{user} Coun't find {args[0]!r} :shrug:")
return cls.cmd_keys(user, [CONFIG["keys"][args[0]]])
class MessageParser:
@staticmethod
@ -801,7 +1101,7 @@ class MessageParser:
return guac_msg("nop")
@classmethod
def type_chat(cls, content: List[str]) -> str:
def type_chat(cls, content: List[str]) -> Union[str, Tuple[str, ...]]:
str_msg: str = " ".join(content[1:])
user: str = content[0].strip()
@ -1029,17 +1329,22 @@ async def main() -> int:
parsed_msg: Optional[List[str]] = unguac_msg(msg.data)
if parsed_msg is None:
ws.send_str(
ws.send_str( # type: ignore
guac_msg("chat", f"The guac parser failed on message: {msg!r}")
)
continue
await ws.send_str(
(
getattr(MessageParser, f"type_{parsed_msg[0]}", None)
or MessageParser.type_nop
)(parsed_msg[1:])
)
result: Union[str, Tuple[str]] = (
getattr(MessageParser, f"type_{parsed_msg[0]}", None)
or MessageParser.type_nop
)(parsed_msg[1:])
if type(result) is str:
result = (result,)
for send_msg in result:
await asyncio.sleep(0.08)
await ws.send_str(send_msg)
if not STATE["run"]:
log("Run state was set to false")