feat: vibed out some slop over here also

This commit is contained in:
h
2026-05-19 11:20:14 +02:00
commit bf6116dc8b
34 changed files with 6531 additions and 0 deletions
+178
View File
@@ -0,0 +1,178 @@
"""Minimal stdio MCP server with one tool: `echo`.
Used by the tool-calling smoke test to exercise the `--mcp-config` path
end-to-end with a real `claude` interactive process. We hand-roll the
JSON-RPC framing rather than pulling in the `mcp` Python SDK so the test
has the same zero-dep posture as the rest of the package.
Protocol surface (Model Context Protocol, stdio transport):
- `initialize` -> capabilities + serverInfo + protocolVersion
- `notifications/initialized` -> no response
- `tools/list` -> list of {name, description, inputSchema}
- `tools/call` -> content blocks for the named tool
- `ping` -> empty result
The `echo` tool returns its `text` argument verbatim as a single
`{"type":"text","text":...}` content block. That is enough for claude to
surface a `tool_use` -> `tool_result` pair in the JSONL session file.
Frame format on the wire is the stdio MCP convention: one JSON object per
line on stdin/stdout. We deliberately do NOT speak the alternative
`Content-Length`-framed flavor — claude's `--mcp-config` stdio transport
uses line framing for spawn-and-pipe servers.
Run standalone (for protocol sanity-checking) with:
printf '%s\\n%s\\n%s\\n' \\
'{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"x","version":"1"}}}' \\
'{"jsonrpc":"2.0","method":"notifications/initialized"}' \\
'{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"echo","arguments":{"text":"hi"}}}' \\
| python scripts/echo_mcp_server.py
"""
from __future__ import annotations
import json
import os
import sys
import traceback
from pathlib import Path
from typing import Any
_LOG_PATH = os.environ.get("ECHO_MCP_LOG")
def _log(message: str) -> None:
if not _LOG_PATH:
return
try:
with Path(_LOG_PATH).open("a", encoding="utf-8") as f:
f.write(message.rstrip("\n") + "\n")
except OSError:
pass
DEFAULT_PROTOCOL_VERSION = "2025-06-18"
SERVER_NAME = "echo-server"
SERVER_VERSION = "0.0.1"
ECHO_TOOL = {
"name": "echo",
"description": "Return the supplied `text` argument verbatim.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to echo back."},
},
"required": ["text"],
"additionalProperties": False,
},
}
def _write(message: dict[str, Any]) -> None:
"""Emit one JSON-RPC message as a single newline-terminated stdout line."""
sys.stdout.write(json.dumps(message, ensure_ascii=False) + "\n")
sys.stdout.flush()
def _ok(req_id: Any, result: dict[str, Any]) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def _err(req_id: Any, code: int, message: str) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}}
def _handle_initialize(params: dict[str, Any]) -> dict[str, Any]:
requested = params.get("protocolVersion") if isinstance(params, dict) else None
version = requested if isinstance(requested, str) else DEFAULT_PROTOCOL_VERSION
return {
"protocolVersion": version,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
}
def _handle_tools_list() -> dict[str, Any]:
return {"tools": [ECHO_TOOL]}
def _handle_tools_call(params: dict[str, Any]) -> dict[str, Any]:
name = params.get("name") if isinstance(params, dict) else None
arguments = params.get("arguments") if isinstance(params, dict) else None
if name != "echo":
return {
"content": [{"type": "text", "text": f"unknown tool: {name!r}"}],
"isError": True,
}
text = ""
if isinstance(arguments, dict):
raw = arguments.get("text", "")
text = raw if isinstance(raw, str) else json.dumps(raw)
return {"content": [{"type": "text", "text": text}], "isError": False}
def _dispatch(message: dict[str, Any]) -> dict[str, Any] | None:
method = message.get("method")
req_id = message.get("id")
params = message.get("params") or {}
is_notification = "id" not in message
if method == "initialize":
return _ok(req_id, _handle_initialize(params))
if method == "notifications/initialized":
return None
if method == "tools/list":
return _ok(req_id, _handle_tools_list())
if method == "tools/call":
return _ok(req_id, _handle_tools_call(params))
if method == "ping":
return _ok(req_id, {})
if method == "prompts/list":
return _ok(req_id, {"prompts": []})
if method == "resources/list":
return _ok(req_id, {"resources": []})
if method == "resources/templates/list":
return _ok(req_id, {"resourceTemplates": []})
if is_notification:
return None
return _err(req_id, -32601, f"method not found: {method!r}")
def main() -> int:
_log(f"START pid={os.getpid()} python={sys.executable} cwd={Path.cwd()}")
try:
while True:
raw_line = sys.stdin.readline()
if not raw_line:
_log("EOF on stdin")
break
line = raw_line.strip()
if not line:
continue
_log(f"RX {line[:2000]}")
try:
message = json.loads(line)
except json.JSONDecodeError as exc:
_log(f"parse error: {exc}")
_write(_err(None, -32700, f"parse error: {exc}"))
continue
if not isinstance(message, dict):
_write(_err(None, -32600, "invalid request: not a JSON object"))
continue
response = _dispatch(message)
if response is not None:
_log(f"TX {json.dumps(response)[:2000]}")
_write(response)
except Exception as exc:
_log("CRASH: " + "".join(traceback.format_exception(exc)))
raise
_log("EXIT 0")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+93
View File
@@ -0,0 +1,93 @@
"""Re-derive `claude_code_api.models` constants from a `claude` binary.
The `claude` CLI is a Mach-O (or ELF) executable that bundles its JS
runtime as plain strings. Model ids and the `/model` alias list show up
verbatim — no decryption needed. This script `strings`-greps a given
binary and prints a refreshed `src/claude_code_api/models.py`.
Usage:
uv run python scripts/extract_models.py # auto-locate
uv run python scripts/extract_models.py /path/to/claude # explicit
Auto-location order (macOS):
~/.local/share/claude/versions/<latest>
$(which claude) (follows symlinks)
"""
from __future__ import annotations
import argparse
import re
import shutil
import subprocess
import sys
from pathlib import Path
_MODEL_ID_RE = re.compile(
rb'"(claude-(?:opus|sonnet|haiku|[0-9])[a-z0-9-]*-(?:opus|sonnet|haiku|[0-9])[a-z0-9-]*)"'
)
_ALIAS_LIST_RE = re.compile(rb'\["sonnet","opus","haiku".*?"opusplan"\]')
def find_default_binary() -> Path | None:
home = Path.home()
versions_dir = home / ".local" / "share" / "claude" / "versions"
if versions_dir.is_dir():
versions = sorted(
(p for p in versions_dir.iterdir() if p.is_file()),
key=lambda p: p.name,
)
if versions:
return versions[-1]
on_path = shutil.which("claude")
if on_path is not None:
return Path(on_path).resolve()
return None
def extract_strings(binary: Path) -> bytes:
result = subprocess.run(
["strings", str(binary)],
check=True,
capture_output=True,
)
return result.stdout
def extract_model_ids(blob: bytes) -> set[str]:
return {m.group(1).decode("ascii") for m in _MODEL_ID_RE.finditer(blob)}
def extract_aliases(blob: bytes) -> list[str]:
match = _ALIAS_LIST_RE.search(blob)
if match is None:
return []
raw = match.group(0).decode("ascii")
return re.findall(r'"([^"]+)"', raw)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument("binary", nargs="?", help="Path to claude executable")
args = parser.parse_args()
binary = Path(args.binary) if args.binary else find_default_binary()
if binary is None or not binary.is_file():
print(f"could not locate claude binary: {binary}", file=sys.stderr)
return 2
blob = extract_strings(binary)
ids = sorted(extract_model_ids(blob))
aliases = extract_aliases(blob)
print(f"# binary: {binary}", file=sys.stderr)
print(f"# {len(ids)} model ids, {len(aliases)} aliases", file=sys.stderr)
print("aliases:", aliases)
print("models:")
for model_id in ids:
print(f" {model_id}")
return 0
if __name__ == "__main__":
raise SystemExit(main())