vessel/scripts/session.py
Arija A. 7cf6d80fac
Fix Keep-Alive requests.
TODO: Add content length or chunking headers in requests.
(kms)

Signed-off-by: Arija A. <ari@ari.lt>
2025-03-24 19:22:36 +02:00

456 lines
13 KiB
Python
Executable file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Session parser"""
import os
import re
import secrets
import socket
import ssl
import string
import sys
import uuid
from typing import Any
rand: secrets.SystemRandom = secrets.SystemRandom()
mcp: re.Pattern[str] = re.compile(r"%(.+?)%")
def generate_random_http_header_key(length: int = 3) -> str:
"""Generates a random HTTP header key"""
key: str = ""
for _ in range(length):
word_length: int = rand.randint(3, 8)
word: str = "".join(
rand.choices(string.ascii_lowercase, k=word_length)
).capitalize()
key += f"{word}-"
return key[:-1]
def split_cmd(s: str) -> list[str]:
"""Split a command to tokens"""
tokens: list[str] = []
current_token: str = ""
in_quotes: bool = False
escape_next: bool = False
for char in s:
if escape_next:
current_token += char
escape_next = False
elif char == "\\":
escape_next = True
elif char == '"':
in_quotes = not in_quotes
elif char.isspace() and not in_quotes:
if current_token:
tokens.append(current_token)
current_token = ""
else:
current_token += char
if current_token:
tokens.append(current_token)
if in_quotes or escape_next:
raise Exception(
f"Command: {s!r}. Invalid syntax ({in_quotes = }, {escape_next = })"
)
return tokens
def _line_process_token(match: re.Match) -> str:
"""Process tokens"""
macro: str = match.group(1)
macro = mcp.sub(_line_process_token, macro)
command: list[str] = split_cmd(macro)
if not command:
return ""
if command[0] == "randstr":
length: int = 64
if len(command) > 1:
length = int(command[1])
return "".join(rand.choices(string.ascii_letters + string.digits, k=length))
elif command[0] == "randnum":
k: int = 10000000
if len(command) > 1:
k = int(command[1])
return str(rand.random() * k)
elif command[0] == "randhead":
w: int = 3
if len(command) > 1:
w = int(command[1])
return generate_random_http_header_key(w)
elif command[0] == "uuid":
v: int = 4
if len(command) > 1:
v = int(command[1])
if v == 1:
return str(uuid.uuid1())
elif v == 4:
return str(uuid.uuid4())
else:
return ""
elif command[0] == "env":
if len(command) < 2:
return ""
default: str = command[2] if len(command) > 2 else ""
return os.environ.get(command[1], default)
elif command[0] == "lit":
if len(command) < 2:
return ""
return command[1]
return ""
def preprocess_line(line: str) -> str:
"""preprocesses a line"""
cidx: int = line.find("--")
if cidx != -1:
print(f"\033[38;5;248mComment: {line[cidx:][2:].strip()}\033[0m")
line = line[:cidx]
line = line.replace("-=-=", "--")
line = (
mcp.sub(_line_process_token, line.strip())
.encode("ascii")
.decode("unicode_escape")
)
return line
def run_command(
command: list[str], sessions: dict[str, tuple[str, Any]], stack: list[str]
) -> bool:
"""Run a command"""
if not command:
print("Empty commands are not allowed.", file=sys.stderr)
return False
if command[0] == "begin":
if len(command) < 4:
print(
"Begin: begin requires 3 arguments: name, host, port (+ timeout (optional)).",
file=sys.stderr,
)
return False
if command[1] in sessions:
print(f"Begin: session {command[1]!r} already exists.", file=sys.stderr)
return False
try:
s: socket.socket = socket.socket(
socket.AF_INET6 if ":" in command[2] else socket.AF_INET,
socket.SOCK_STREAM,
)
if len(command) > 4:
to: float = float(command[4])
if to > 0.005:
s.settimeout(to)
else:
s.settimeout(0.005)
else:
s.settimeout(0.5)
s.connect((command[2], int(command[3])))
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sessions[command[1]] = (command[2], s)
stack.append(command[1])
except socket.error as e:
print(f"Begin: socket error: {e}", file=sys.stderr)
return False
except Exception as e:
print(f"Begin: general error: {e}", file=sys.stderr)
return False
elif command[0] == "end":
if len(command) < 2:
print("End: end requires 1 argument: name.", file=sys.stderr)
return False
if command[1] not in sessions:
print(f"End: session {command[1]!r} does not exist.", file=sys.stderr)
return False
print(f"--- ENDING SESSION {command[1]!r} ({sessions[command[1]][0]!r}) ---")
try:
do_read: bool = True
if len(command) > 2 and command[2] == "quiet":
do_read = False
while do_read:
chunk: bytes = sessions[command[1]][1].recv(1024)
if not chunk:
break
sys.stdout.buffer.write(chunk)
except socket.timeout:
pass
except socket.error as e:
print(f"End: Socket error: {e}", file=sys.stderr)
except Exception as e:
print(f"End: General error: {e}", file=sys.stderr)
try:
sessions[command[1]][1].shutdown(socket.SHUT_RDWR)
sessions[command[1]][1].close()
except Exception as e:
print(f"End: Failed to end session {command[1]!r}: {e}", file=sys.stderr)
print(f"\n--- ENDED SESSION {command[1]!r} ({sessions[command[1]][0]!r}) ---")
del sessions[command[1]]
new_stack: list[str] = list(filter(lambda s: s != command[1], stack))
stack.clear()
stack.extend(new_stack)
del new_stack
elif command[0] == "read":
if len(command) < 2:
print(
"Read: read requires 1 argument: name (+ optional size).",
file=sys.stderr,
)
return False
if command[1] not in sessions:
print(f"Read: session {command[1]!r} does not exist.", file=sys.stderr)
return False
once: bool = False
if len(command) > 2:
chunksize: int = int(command[2])
once = True
else:
chunksize = 1024
print(f"--- BEGIN READ FROM {command[1]!r} ({sessions[command[1]][0]!r}) ---")
try:
while True:
chunk = sessions[command[1]][1].recv(chunksize)
if not chunk:
break
sys.stdout.buffer.write(chunk)
if once:
break
except socket.timeout:
pass
except socket.error as e:
print(f"Read: socket error: {e}", file=sys.stderr)
return False
except Exception as e:
print(f"Read: general error: {e}", file=sys.stderr)
return False
print(f"\n--- END READ FROM {command[1]!r} ---")
elif command[0] == "push":
if len(command) < 2:
print("Push: push requires 1 argument: name.", file=sys.stderr)
return False
if command[1] not in sessions:
print(f"Push: session {command[1]!r} does not exist.", file=sys.stderr)
return False
stack.append(command[1])
elif command[0] == "pop":
if not stack:
print(f"Pop: Empty stack.", file=sys.stderr)
return False
stack.pop()
elif command[0] == "ssl":
if len(command) < 2:
print("SSL: ssl requires 1 argument: name.", file=sys.stderr)
return False
if command[1] not in sessions:
print(f"SSL: session {command[1]!r} does not exist.", file=sys.stderr)
return False
client_plain: tuple[str, Any] = sessions[command[1]]
try:
client_ssl: Any = ssl.create_default_context().wrap_socket(
client_plain[1], server_hostname=client_plain[0]
)
sessions[command[1]] = (client_plain[0], client_ssl)
except Exception as e:
print(f"SSL: Error: {e}")
elif command[0] == "print":
if not stack:
print(f"Print: Empty stack.", file=sys.stderr)
return False
print(f"Current session stack:")
for session in stack:
print(f" * {session!r} ({sessions[session][0]!r})")
else:
print(f"Command {command[1]!r} does not exist.", file=sys.stderr)
return True
def interpret_session(session_program: tuple[str, ...]) -> bool:
"""Interprets a session script"""
sessions: dict[str, tuple[str, Any]] = {}
stack: list[str] = []
ret: bool = True
for line in session_program:
line = preprocess_line(line)
if not line or line.startswith("#!"):
continue
if line[0] == "$":
cmd: list[str] = split_cmd(line[1:].strip())
if cmd[0] == "exit":
break
if not run_command(cmd, sessions, stack):
ret = False
break
else:
if not stack:
print(
f"Error: No session on the stack, but tried to write '{line!r}' line. Stopping.",
file=sys.stderr,
)
break
try:
sessions[stack[-1]][1].sendall(line.encode("ascii"))
except socket.timeout:
pass
except socket.error as e:
print(f"Socket error: {e}", file=sys.stderr)
except Exception as e:
print(f"General error: {e}", file=sys.stderr)
for name, session in sessions.items():
print(f"--- ENDING SESSION {name!r} ({session[0]!r}) ---")
try:
while True:
chunk: bytes = session[1].recv(1024)
if not chunk:
break
sys.stdout.buffer.write(chunk)
except socket.timeout:
pass
except socket.error as e:
print(f"Socket error: {e}", file=sys.stderr)
ret = False
except Exception as e:
print(f"General error: {e}", file=sys.stderr)
ret = False
try:
session[1].shutdown(socket.SHUT_RDWR)
session[1].close()
except Exception as e:
print(f"Failed to end session {name!r}: {e}", file=sys.stderr)
ret = False
print(f"\n--- SESSION {name!r} ENDED ---")
return ret
def main() -> int:
"""entry / main function"""
if len(sys.argv) < 2:
print(
f"""Session interpreter
Usage: {sys.argv[0]} <example.session>
Keywords:
* begin <name> <host> <port> [timeout (optional)]: Begins a new session. Automatically pushes it to the stack.
* end <name> [optional 'quiet']: Ends a session. Removes all occurances of the session off the stack in order.
* read <name> [bytes (optional)]: Reads response data from a session.
* push <name>: Push session onto the session stack.
* ssl <name>: Enables SSL on a session.
* pop: Pop the current session off the stack. (does not end it)
* print: Print the session stack.
* exit: Exit from the session script.
Request macros:
* randstr [optional length =64]: Replaced with a random string.
* randnum [optional max, default 10000000]: Replaced with a random number.
* randhead [optional component count =5]: Random header key.
* uuid [optional version =4]: Random UUID.
* env <name> [optional default ...] (like env HOST 127.0.0.1): Get an environment variable.
* lit [arg]: Literally expand to argument.
Syntax:
* Commands: $ <command> <args>
* Comments: Start with `--`
* Macros: %<name>%
* Strings: "..."
* Literal -- (shorthand): -=-=
Notes:
1. Extra arguments are ignored.
2. Sessions are parsed line-by-line.
3. Escape sequences are interpreted.""",
file=sys.stderr,
)
return 1
ret: int = 0
with open(sys.argv[1], "r") as fp:
if not interpret_session(tuple(map(str.strip, fp.readlines()))):
ret = 1
return ret
if __name__ == "__main__":
assert main.__annotations__.get("return") is int, "main() should return an integer"
# filter_warnings("error", category=Warning)
raise SystemExit(main())