TODO: Add content length or chunking headers in requests. (kms) Signed-off-by: Arija A. <ari@ari.lt>
456 lines
13 KiB
Python
Executable file
456 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Session parser"""
|
|
|
|
import os
|
|
import re
|
|
import secrets
|
|
import socket
|
|
import ssl
|
|
import string
|
|
import sys
|
|
import uuid
|
|
from typing import Any
|
|
|
|
rand: secrets.SystemRandom = secrets.SystemRandom()
|
|
mcp: re.Pattern[str] = re.compile(r"%(.+?)%")
|
|
|
|
|
|
def generate_random_http_header_key(length: int = 3) -> str:
|
|
"""Generates a random HTTP header key"""
|
|
|
|
key: str = ""
|
|
|
|
for _ in range(length):
|
|
word_length: int = rand.randint(3, 8)
|
|
word: str = "".join(
|
|
rand.choices(string.ascii_lowercase, k=word_length)
|
|
).capitalize()
|
|
key += f"{word}-"
|
|
|
|
return key[:-1]
|
|
|
|
|
|
def split_cmd(s: str) -> list[str]:
|
|
"""Split a command to tokens"""
|
|
|
|
tokens: list[str] = []
|
|
current_token: str = ""
|
|
|
|
in_quotes: bool = False
|
|
escape_next: bool = False
|
|
|
|
for char in s:
|
|
if escape_next:
|
|
current_token += char
|
|
escape_next = False
|
|
elif char == "\\":
|
|
escape_next = True
|
|
elif char == '"':
|
|
in_quotes = not in_quotes
|
|
elif char.isspace() and not in_quotes:
|
|
if current_token:
|
|
tokens.append(current_token)
|
|
current_token = ""
|
|
else:
|
|
current_token += char
|
|
|
|
if current_token:
|
|
tokens.append(current_token)
|
|
|
|
if in_quotes or escape_next:
|
|
raise Exception(
|
|
f"Command: {s!r}. Invalid syntax ({in_quotes = }, {escape_next = })"
|
|
)
|
|
|
|
return tokens
|
|
|
|
|
|
def _line_process_token(match: re.Match) -> str:
|
|
"""Process tokens"""
|
|
|
|
macro: str = match.group(1)
|
|
macro = mcp.sub(_line_process_token, macro)
|
|
|
|
command: list[str] = split_cmd(macro)
|
|
|
|
if not command:
|
|
return ""
|
|
|
|
if command[0] == "randstr":
|
|
length: int = 64
|
|
if len(command) > 1:
|
|
length = int(command[1])
|
|
|
|
return "".join(rand.choices(string.ascii_letters + string.digits, k=length))
|
|
elif command[0] == "randnum":
|
|
k: int = 10000000
|
|
if len(command) > 1:
|
|
k = int(command[1])
|
|
|
|
return str(rand.random() * k)
|
|
elif command[0] == "randhead":
|
|
w: int = 3
|
|
if len(command) > 1:
|
|
w = int(command[1])
|
|
|
|
return generate_random_http_header_key(w)
|
|
elif command[0] == "uuid":
|
|
v: int = 4
|
|
if len(command) > 1:
|
|
v = int(command[1])
|
|
|
|
if v == 1:
|
|
return str(uuid.uuid1())
|
|
elif v == 4:
|
|
return str(uuid.uuid4())
|
|
else:
|
|
return ""
|
|
elif command[0] == "env":
|
|
if len(command) < 2:
|
|
return ""
|
|
|
|
default: str = command[2] if len(command) > 2 else ""
|
|
return os.environ.get(command[1], default)
|
|
elif command[0] == "lit":
|
|
if len(command) < 2:
|
|
return ""
|
|
return command[1]
|
|
|
|
return ""
|
|
|
|
|
|
def preprocess_line(line: str) -> str:
|
|
"""preprocesses a line"""
|
|
|
|
cidx: int = line.find("--")
|
|
if cidx != -1:
|
|
print(f"\033[38;5;248mComment: {line[cidx:][2:].strip()}\033[0m")
|
|
line = line[:cidx]
|
|
line = line.replace("-=-=", "--")
|
|
|
|
line = (
|
|
mcp.sub(_line_process_token, line.strip())
|
|
.encode("ascii")
|
|
.decode("unicode_escape")
|
|
)
|
|
|
|
return line
|
|
|
|
|
|
def run_command(
|
|
command: list[str], sessions: dict[str, tuple[str, Any]], stack: list[str]
|
|
) -> bool:
|
|
"""Run a command"""
|
|
|
|
if not command:
|
|
print("Empty commands are not allowed.", file=sys.stderr)
|
|
return False
|
|
|
|
if command[0] == "begin":
|
|
if len(command) < 4:
|
|
print(
|
|
"Begin: begin requires 3 arguments: name, host, port (+ timeout (optional)).",
|
|
file=sys.stderr,
|
|
)
|
|
return False
|
|
|
|
if command[1] in sessions:
|
|
print(f"Begin: session {command[1]!r} already exists.", file=sys.stderr)
|
|
return False
|
|
|
|
try:
|
|
s: socket.socket = socket.socket(
|
|
socket.AF_INET6 if ":" in command[2] else socket.AF_INET,
|
|
socket.SOCK_STREAM,
|
|
)
|
|
|
|
if len(command) > 4:
|
|
to: float = float(command[4])
|
|
|
|
if to > 0.005:
|
|
s.settimeout(to)
|
|
else:
|
|
s.settimeout(0.005)
|
|
else:
|
|
s.settimeout(0.5)
|
|
|
|
s.connect((command[2], int(command[3])))
|
|
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
sessions[command[1]] = (command[2], s)
|
|
stack.append(command[1])
|
|
except socket.error as e:
|
|
print(f"Begin: socket error: {e}", file=sys.stderr)
|
|
return False
|
|
except Exception as e:
|
|
print(f"Begin: general error: {e}", file=sys.stderr)
|
|
return False
|
|
elif command[0] == "end":
|
|
if len(command) < 2:
|
|
print("End: end requires 1 argument: name.", file=sys.stderr)
|
|
return False
|
|
|
|
if command[1] not in sessions:
|
|
print(f"End: session {command[1]!r} does not exist.", file=sys.stderr)
|
|
return False
|
|
|
|
print(f"--- ENDING SESSION {command[1]!r} ({sessions[command[1]][0]!r}) ---")
|
|
|
|
try:
|
|
do_read: bool = True
|
|
|
|
if len(command) > 2 and command[2] == "quiet":
|
|
do_read = False
|
|
|
|
while do_read:
|
|
chunk: bytes = sessions[command[1]][1].recv(1024)
|
|
|
|
if not chunk:
|
|
break
|
|
|
|
sys.stdout.buffer.write(chunk)
|
|
except socket.timeout:
|
|
pass
|
|
except socket.error as e:
|
|
print(f"End: Socket error: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"End: General error: {e}", file=sys.stderr)
|
|
|
|
try:
|
|
sessions[command[1]][1].shutdown(socket.SHUT_RDWR)
|
|
sessions[command[1]][1].close()
|
|
except Exception as e:
|
|
print(f"End: Failed to end session {command[1]!r}: {e}", file=sys.stderr)
|
|
|
|
print(f"\n--- ENDED SESSION {command[1]!r} ({sessions[command[1]][0]!r}) ---")
|
|
|
|
del sessions[command[1]]
|
|
|
|
new_stack: list[str] = list(filter(lambda s: s != command[1], stack))
|
|
stack.clear()
|
|
stack.extend(new_stack)
|
|
del new_stack
|
|
elif command[0] == "read":
|
|
if len(command) < 2:
|
|
print(
|
|
"Read: read requires 1 argument: name (+ optional size).",
|
|
file=sys.stderr,
|
|
)
|
|
return False
|
|
|
|
if command[1] not in sessions:
|
|
print(f"Read: session {command[1]!r} does not exist.", file=sys.stderr)
|
|
return False
|
|
|
|
once: bool = False
|
|
|
|
if len(command) > 2:
|
|
chunksize: int = int(command[2])
|
|
once = True
|
|
else:
|
|
chunksize = 1024
|
|
|
|
print(f"--- BEGIN READ FROM {command[1]!r} ({sessions[command[1]][0]!r}) ---")
|
|
try:
|
|
while True:
|
|
chunk = sessions[command[1]][1].recv(chunksize)
|
|
|
|
if not chunk:
|
|
break
|
|
|
|
sys.stdout.buffer.write(chunk)
|
|
|
|
if once:
|
|
break
|
|
except socket.timeout:
|
|
pass
|
|
except socket.error as e:
|
|
print(f"Read: socket error: {e}", file=sys.stderr)
|
|
return False
|
|
except Exception as e:
|
|
print(f"Read: general error: {e}", file=sys.stderr)
|
|
return False
|
|
print(f"\n--- END READ FROM {command[1]!r} ---")
|
|
elif command[0] == "push":
|
|
if len(command) < 2:
|
|
print("Push: push requires 1 argument: name.", file=sys.stderr)
|
|
return False
|
|
|
|
if command[1] not in sessions:
|
|
print(f"Push: session {command[1]!r} does not exist.", file=sys.stderr)
|
|
return False
|
|
|
|
stack.append(command[1])
|
|
elif command[0] == "pop":
|
|
if not stack:
|
|
print(f"Pop: Empty stack.", file=sys.stderr)
|
|
return False
|
|
|
|
stack.pop()
|
|
elif command[0] == "ssl":
|
|
if len(command) < 2:
|
|
print("SSL: ssl requires 1 argument: name.", file=sys.stderr)
|
|
return False
|
|
|
|
if command[1] not in sessions:
|
|
print(f"SSL: session {command[1]!r} does not exist.", file=sys.stderr)
|
|
return False
|
|
|
|
client_plain: tuple[str, Any] = sessions[command[1]]
|
|
|
|
try:
|
|
client_ssl: Any = ssl.create_default_context().wrap_socket(
|
|
client_plain[1], server_hostname=client_plain[0]
|
|
)
|
|
|
|
sessions[command[1]] = (client_plain[0], client_ssl)
|
|
except Exception as e:
|
|
print(f"SSL: Error: {e}")
|
|
elif command[0] == "print":
|
|
if not stack:
|
|
print(f"Print: Empty stack.", file=sys.stderr)
|
|
return False
|
|
|
|
print(f"Current session stack:")
|
|
for session in stack:
|
|
print(f" * {session!r} ({sessions[session][0]!r})")
|
|
else:
|
|
print(f"Command {command[1]!r} does not exist.", file=sys.stderr)
|
|
|
|
return True
|
|
|
|
|
|
def interpret_session(session_program: tuple[str, ...]) -> bool:
|
|
"""Interprets a session script"""
|
|
|
|
sessions: dict[str, tuple[str, Any]] = {}
|
|
stack: list[str] = []
|
|
|
|
ret: bool = True
|
|
|
|
for line in session_program:
|
|
line = preprocess_line(line)
|
|
|
|
if not line or line.startswith("#!"):
|
|
continue
|
|
|
|
if line[0] == "$":
|
|
cmd: list[str] = split_cmd(line[1:].strip())
|
|
|
|
if cmd[0] == "exit":
|
|
break
|
|
|
|
if not run_command(cmd, sessions, stack):
|
|
ret = False
|
|
break
|
|
else:
|
|
if not stack:
|
|
print(
|
|
f"Error: No session on the stack, but tried to write '{line!r}' line. Stopping.",
|
|
file=sys.stderr,
|
|
)
|
|
break
|
|
|
|
try:
|
|
sessions[stack[-1]][1].sendall(line.encode("ascii"))
|
|
except socket.timeout:
|
|
pass
|
|
except socket.error as e:
|
|
print(f"Socket error: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"General error: {e}", file=sys.stderr)
|
|
|
|
for name, session in sessions.items():
|
|
print(f"--- ENDING SESSION {name!r} ({session[0]!r}) ---")
|
|
|
|
try:
|
|
while True:
|
|
chunk: bytes = session[1].recv(1024)
|
|
|
|
if not chunk:
|
|
break
|
|
|
|
sys.stdout.buffer.write(chunk)
|
|
except socket.timeout:
|
|
pass
|
|
except socket.error as e:
|
|
print(f"Socket error: {e}", file=sys.stderr)
|
|
ret = False
|
|
except Exception as e:
|
|
print(f"General error: {e}", file=sys.stderr)
|
|
ret = False
|
|
|
|
try:
|
|
session[1].shutdown(socket.SHUT_RDWR)
|
|
session[1].close()
|
|
except Exception as e:
|
|
print(f"Failed to end session {name!r}: {e}", file=sys.stderr)
|
|
ret = False
|
|
|
|
print(f"\n--- SESSION {name!r} ENDED ---")
|
|
|
|
return ret
|
|
|
|
|
|
def main() -> int:
|
|
"""entry / main function"""
|
|
|
|
if len(sys.argv) < 2:
|
|
print(
|
|
f"""Session interpreter
|
|
|
|
Usage: {sys.argv[0]} <example.session>
|
|
|
|
Keywords:
|
|
|
|
* begin <name> <host> <port> [timeout (optional)]: Begins a new session. Automatically pushes it to the stack.
|
|
* end <name> [optional 'quiet']: Ends a session. Removes all occurances of the session off the stack in order.
|
|
* read <name> [bytes (optional)]: Reads response data from a session.
|
|
* push <name>: Push session onto the session stack.
|
|
* ssl <name>: Enables SSL on a session.
|
|
* pop: Pop the current session off the stack. (does not end it)
|
|
* print: Print the session stack.
|
|
* exit: Exit from the session script.
|
|
|
|
Request macros:
|
|
|
|
* randstr [optional length =64]: Replaced with a random string.
|
|
* randnum [optional max, default 10000000]: Replaced with a random number.
|
|
* randhead [optional component count =5]: Random header key.
|
|
* uuid [optional version =4]: Random UUID.
|
|
* env <name> [optional default ...] (like env HOST 127.0.0.1): Get an environment variable.
|
|
* lit [arg]: Literally expand to argument.
|
|
|
|
Syntax:
|
|
|
|
* Commands: $ <command> <args>
|
|
* Comments: Start with `--`
|
|
* Macros: %<name>%
|
|
* Strings: "..."
|
|
* Literal -- (shorthand): -=-=
|
|
|
|
Notes:
|
|
|
|
1. Extra arguments are ignored.
|
|
2. Sessions are parsed line-by-line.
|
|
3. Escape sequences are interpreted.""",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
return 1
|
|
|
|
ret: int = 0
|
|
|
|
with open(sys.argv[1], "r") as fp:
|
|
if not interpret_session(tuple(map(str.strip, fp.readlines()))):
|
|
ret = 1
|
|
|
|
return ret
|
|
|
|
|
|
if __name__ == "__main__":
|
|
assert main.__annotations__.get("return") is int, "main() should return an integer"
|
|
|
|
# filter_warnings("error", category=Warning)
|
|
raise SystemExit(main())
|