Add chat and event logging

Signed-off-by: Ari Archer <ari.web.xyz@gmail.com>
This commit is contained in:
Ari Archer 2022-07-17 11:26:10 +03:00
parent e54d2665a5
commit c930b03613
Signed by untrusted user who does not match committer: ari
GPG key ID: A50D5B4B599AF8A2
2 changed files with 60 additions and 21 deletions

View file

@ -3,3 +3,4 @@ uuid
aiohttp
requests
discord-webhook
DateTime

View file

@ -3,10 +3,10 @@
"""Abot for collabvm"""
import asyncio
import datetime
import json
import os
import sys
from datetime import datetime
from html import unescape as html_unescape
from secrets import SystemRandom
from time import sleep
@ -60,6 +60,7 @@ CONFIG: Dict[str, Any] = {
"aliases": {},
"report-webhook-url": "",
"authkey-webhook-url": "",
"chatlog-limit": 150,
}
RANDOM: SystemRandom = SystemRandom()
@ -74,7 +75,7 @@ def gen_key() -> str:
GUAC_CACHE: Dict[str, Dict[Any, Any]] = {"guac": {}, "unguac": {}}
AUTH: Dict[str, Any] = {"users": set(), "key": gen_key()}
STATE: Dict[str, Union[bool, str]] = {"run": True, "vm": ""}
STATE: Dict[str, Any] = {"run": True, "vm": "", "chatlog": []}
def paste(content: str, no_content_msg: str) -> Union[str, Tuple[None, str]]:
@ -109,22 +110,14 @@ def create_wh(url: str) -> dw.DiscordWebhook:
def random_embed(url: str, title: str, content: str) -> dw.DiscordWebhook:
wh = create_wh(url)
_utc_time: datetime.datetime = datetime.datetime.utcnow()
_time_format: str = "%Y-%m-%d-%H-%M"
_delta: datetime.timedelta = datetime.timedelta(hours=0, minutes=5)
def _format_time(time: datetime.datetime) -> str:
return datetime.datetime.strftime(time, _time_format)
_from: str = _format_time(_utc_time - _delta)
_to: str = _format_time(_utc_time)
_chatlog: str = "\n".join(STATE["chatlog"])
wh.add_embed(
dw.DiscordEmbed(
title=title,
description=f"""{content}
Chatlog: https://elijah-dev.tk/pullcvmlog.php?from={_from}&to={_to}&vm={str(STATE['vm']).upper()}""",
Chatlog: {paste(_chatlog, 'No chatlog')}""",
color="%06x" % RANDOM.randint(0, 0xFFFFFF),
)
)
@ -638,6 +631,34 @@ class CommandParser:
),
)
@staticmethod
def cmd_chatlog(user: str, args: List[str]) -> Tuple[str]:
"""Auth command, gets current chatlog
Syntax: chatlog"""
pid = paste(
"\n".join(STATE["chatlog"]),
f"@{user} Chatlog is empty, lmao, how is that even possible???",
)
if pid[0] is None:
return (pid[1],)
return (
guac_msg(
"chat",
f"@{user} Current chatlog (limit: {CONFIG['chatlog-limit']}): {pid}",
),
)
def chatlog_entry(message: str, user: str, header: Optional[str] = None) -> None:
STATE["chatlog"].append(
f"\n{(str(header) + ' ') if header is not None else ''}\
{user!r} @ {datetime.strftime(datetime.utcnow(), '%Y-%m-%d %H:%M:%S (%f microseconds)')} UTC: \
{message}"
)
class MessageParser:
@staticmethod
@ -646,18 +667,24 @@ class MessageParser:
@classmethod
def type_chat(cls, content: List[str]) -> Tuple[str]:
str_msg: str = " ".join(content[1:])
user: str = content[0].strip()
if len(STATE["chatlog"]) > CONFIG["chatlog-limit"]:
STATE["chatlog"].clear()
if user and user != CONFIG["bot-name"]:
chatlog_entry(str_msg, user)
if (
len(content) > 3
or len(" ".join(content[1:])) > (100 + len(CONFIG["bot-name"]))
or not content[0].strip()
or len(str_msg) > (100 + len(CONFIG["bot-name"]))
or not user
or user in CONFIG["ignored"]
or user == CONFIG["bot-name"]
):
return cls.type_nop(content)
user: str = content[0]
if user in CONFIG["ignored"] or user == CONFIG["bot-name"]:
return cls.type_nop(content)
if content[1].lower().strip() in (
f"@{CONFIG['user-name']}",
CONFIG["user-name"],
@ -668,7 +695,12 @@ class MessageParser:
log(f"{user!r} mentioned the owner without any conrext")
return (guac_msg("chat", f"@{user} smh whattttttttttttt"),)
command: List[str] = list(map(lambda s: html_unescape(s).replace("`", " "), " ".join(content[1:]).split()[1:])) # type: ignore
command: List[str] = list(
map(
lambda s: html_unescape(s).replace("`", " "),
str_msg.split()[1:],
)
)
_dad_joke_im: str = content[1].lower().split(" ", 1)[0]
def _check_command() -> Optional[Tuple[str]]:
@ -757,6 +789,8 @@ class MessageParser:
@classmethod
def type_adduser(cls, content: List[str]) -> Tuple[str]:
chatlog_entry("Joined", content[1], "JOIN")
if RANDOM.randint(0, 1000) == 420:
log(f"Welcoming {content[1]!r}")
return (guac_msg("chat", f"Welcome, {content[1]!r}. How are you?"),)
@ -765,6 +799,8 @@ class MessageParser:
@classmethod
def type_remuser(cls, content: List[str]) -> Tuple[str]:
chatlog_entry("Left", content[1], "LEAVE")
if content[1] in AUTH["users"]:
log(f"Logging {content[1]!r} out")
AUTH["users"].remove(content[1])
@ -777,6 +813,8 @@ class MessageParser:
@classmethod
def type_rename(cls, content: List[str]) -> Tuple[str]:
chatlog_entry(f"{content[1]!r} -> {content[2]!r}", content[1], "RENANE")
if content[2] in AUTH["users"]:
log(f"User has renamed themselves so logging {content[2]!r} out")
AUTH["users"].remove(content[2])