#!/usr/bin/env python3 """Google Workspace OAuth2 setup for Hermes Agent. Fully non-interactive — designed to be driven by the agent via terminal commands. The agent mediates between this script and the user (works on CLI, Telegram, Discord, etc.) Commands: setup.py --check # Is auth valid? Exit 0 = yes, 1 = no setup.py --client-secret /path/to.json # Store OAuth client credentials setup.py --auth-url # Print the OAuth URL for user to visit setup.py --auth-code CODE # Exchange auth code for token setup.py --revoke # Revoke and delete stored token setup.py --install-deps # Install Python dependencies only Agent workflow: 1. Run --check. If exit 0, auth is good — skip setup. 2. Ask user for client_secret.json path. Run --client-secret PATH. 3. Run --auth-url. Send the printed URL to the user. 4. User opens URL, authorizes, gets redirected to a page with a code. 5. User pastes the code. Agent runs --auth-code CODE. 6. Run --check to verify. Done. """ import argparse import json import os import subprocess import sys from pathlib import Path try: from hermes_constants import display_hermes_home, get_hermes_home except ModuleNotFoundError: HERMES_AGENT_ROOT = Path(__file__).resolve().parents[4] if HERMES_AGENT_ROOT.exists(): sys.path.insert(0, str(HERMES_AGENT_ROOT)) from hermes_constants import display_hermes_home, get_hermes_home HERMES_HOME = get_hermes_home() TOKEN_PATH = HERMES_HOME / "google_token.json" CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json" PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json" SCOPES = [ "https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/gmail.send", "https://www.googleapis.com/auth/gmail.modify", "https://www.googleapis.com/auth/calendar", "https://www.googleapis.com/auth/drive.readonly", "https://www.googleapis.com/auth/contacts.readonly", "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/documents.readonly", ] REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google-auth-httplib2"] # OAuth redirect for "out of band" manual code copy flow. # Google deprecated OOB, so we use a localhost redirect and tell the user to # copy the code from the browser's URL bar (or the page body). REDIRECT_URI = "http://localhost:1" def _load_token_payload(path: Path = TOKEN_PATH) -> dict: try: return json.loads(path.read_text()) except Exception: return {} def _missing_scopes_from_payload(payload: dict) -> list[str]: raw = payload.get("scopes") or payload.get("scope") if not raw: return [] granted = {s.strip() for s in (raw.split() if isinstance(raw, str) else raw) if s.strip()} return sorted(scope for scope in SCOPES if scope not in granted) def _format_missing_scopes(missing_scopes: list[str]) -> str: bullets = "\n".join(f" - {scope}" for scope in missing_scopes) return ( "Token is valid but missing required Google Workspace scopes:\n" f"{bullets}\n" "Run the Google Workspace setup again from this same Hermes profile to refresh consent." ) def install_deps(): """Install Google API packages if missing. Returns True on success.""" try: import googleapiclient # noqa: F401 import google_auth_oauthlib # noqa: F401 print("Dependencies already installed.") return True except ImportError: pass print("Installing Google API dependencies...") try: subprocess.check_call( [sys.executable, "-m", "pip", "install", "--quiet"] + REQUIRED_PACKAGES, stdout=subprocess.DEVNULL, ) print("Dependencies installed.") return True except subprocess.CalledProcessError as e: print(f"ERROR: Failed to install dependencies: {e}") print(f"Try manually: {sys.executable} -m pip install {' '.join(REQUIRED_PACKAGES)}") return False def _ensure_deps(): """Check deps are available, install if not, exit on failure.""" try: import googleapiclient # noqa: F401 import google_auth_oauthlib # noqa: F401 except ImportError: if not install_deps(): sys.exit(1) def check_auth(): """Check if stored credentials are valid. Prints status, exits 0 or 1.""" if not TOKEN_PATH.exists(): print(f"NOT_AUTHENTICATED: No token at {TOKEN_PATH}") return False _ensure_deps() from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request try: # Don't pass scopes — user may have authorized only a subset. # Passing scopes forces google-auth to validate them on refresh, # which fails with invalid_scope if the token has fewer scopes # than requested. creds = Credentials.from_authorized_user_file(str(TOKEN_PATH)) except Exception as e: print(f"TOKEN_CORRUPT: {e}") return False payload = _load_token_payload(TOKEN_PATH) if creds.valid: missing_scopes = _missing_scopes_from_payload(payload) if missing_scopes: print(f"AUTHENTICATED (partial): Token valid but missing {len(missing_scopes)} scopes:") for s in missing_scopes: print(f" - {s}") print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}") return True if creds.expired and creds.refresh_token: try: creds.refresh(Request()) TOKEN_PATH.write_text(creds.to_json()) missing_scopes = _missing_scopes_from_payload(_load_token_payload(TOKEN_PATH)) if missing_scopes: print(f"AUTHENTICATED (partial): Token refreshed but missing {len(missing_scopes)} scopes:") for s in missing_scopes: print(f" - {s}") print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}") return True except Exception as e: print(f"REFRESH_FAILED: {e}") return False print("TOKEN_INVALID: Re-run setup.") return False def store_client_secret(path: str): """Copy and validate client_secret.json to Hermes home.""" src = Path(path).expanduser().resolve() if not src.exists(): print(f"ERROR: File not found: {src}") sys.exit(1) try: data = json.loads(src.read_text()) except json.JSONDecodeError: print("ERROR: File is not valid JSON.") sys.exit(1) if "installed" not in data and "web" not in data: print("ERROR: Not a Google OAuth client secret file (missing 'installed' key).") print("Download the correct file from: https://console.cloud.google.com/apis/credentials") sys.exit(1) CLIENT_SECRET_PATH.write_text(json.dumps(data, indent=2)) print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}") def _save_pending_auth(*, state: str, code_verifier: str): """Persist the OAuth session bits needed for a later token exchange.""" PENDING_AUTH_PATH.write_text( json.dumps( { "state": state, "code_verifier": code_verifier, "redirect_uri": REDIRECT_URI, }, indent=2, ) ) def _load_pending_auth() -> dict: """Load the pending OAuth session created by get_auth_url().""" if not PENDING_AUTH_PATH.exists(): print("ERROR: No pending OAuth session found. Run --auth-url first.") sys.exit(1) try: data = json.loads(PENDING_AUTH_PATH.read_text()) except Exception as e: print(f"ERROR: Could not read pending OAuth session: {e}") print("Run --auth-url again to start a fresh OAuth session.") sys.exit(1) if not data.get("state") or not data.get("code_verifier"): print("ERROR: Pending OAuth session is missing PKCE data.") print("Run --auth-url again to start a fresh OAuth session.") sys.exit(1) return data def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]: """Accept either a raw auth code or the full redirect URL pasted by the user.""" if not code_or_url.startswith("http"): return code_or_url, None from urllib.parse import parse_qs, urlparse parsed = urlparse(code_or_url) params = parse_qs(parsed.query) if "code" not in params: print("ERROR: No 'code' parameter found in URL.") sys.exit(1) state = params.get("state", [None])[0] return params["code"][0], state def get_auth_url(): """Print the OAuth authorization URL. User visits this in a browser.""" if not CLIENT_SECRET_PATH.exists(): print("ERROR: No client secret stored. Run --client-secret first.") sys.exit(1) _ensure_deps() from google_auth_oauthlib.flow import Flow flow = Flow.from_client_secrets_file( str(CLIENT_SECRET_PATH), scopes=SCOPES, redirect_uri=REDIRECT_URI, autogenerate_code_verifier=True, ) auth_url, state = flow.authorization_url( access_type="offline", prompt="consent", ) _save_pending_auth(state=state, code_verifier=flow.code_verifier) # Print just the URL so the agent can extract it cleanly print(auth_url) def exchange_auth_code(code: str): """Exchange the authorization code for a token and save it.""" if not CLIENT_SECRET_PATH.exists(): print("ERROR: No client secret stored. Run --client-secret first.") sys.exit(1) pending_auth = _load_pending_auth() code, returned_state = _extract_code_and_state(code) if returned_state and returned_state != pending_auth["state"]: print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.") sys.exit(1) _ensure_deps() from google_auth_oauthlib.flow import Flow from urllib.parse import parse_qs, urlparse # Extract granted scopes from the callback URL if present if returned_state and "scope" in parse_qs(urlparse(code).query if isinstance(code, str) and code.startswith("http") else {}): granted_scopes = parse_qs(urlparse(code).query)["scope"][0].split() else: # Try to extract from code_or_url parameter if isinstance(code, str) and code.startswith("http"): params = parse_qs(urlparse(code).query) if "scope" in params: granted_scopes = params["scope"][0].split() else: granted_scopes = SCOPES else: granted_scopes = SCOPES flow = Flow.from_client_secrets_file( str(CLIENT_SECRET_PATH), scopes=granted_scopes, redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI), state=pending_auth["state"], code_verifier=pending_auth["code_verifier"], ) try: # Accept partial scopes — user may deselect some permissions in the consent screen os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1" flow.fetch_token(code=code) except Exception as e: print(f"ERROR: Token exchange failed: {e}") print("The code may have expired. Run --auth-url to get a fresh URL.") sys.exit(1) creds = flow.credentials token_payload = json.loads(creds.to_json()) # Store only the scopes actually granted by the user, not what was requested. # creds.to_json() writes the requested scopes, which causes refresh to fail # with invalid_scope if the user only authorized a subset. actually_granted = list(creds.granted_scopes or []) if hasattr(creds, "granted_scopes") and creds.granted_scopes else [] if actually_granted: token_payload["scopes"] = actually_granted elif granted_scopes != SCOPES: # granted_scopes was extracted from the callback URL token_payload["scopes"] = granted_scopes missing_scopes = _missing_scopes_from_payload(token_payload) if missing_scopes: print(f"WARNING: Token missing some Google Workspace scopes: {', '.join(missing_scopes)}") print("Some services may not be available.") TOKEN_PATH.write_text(json.dumps(token_payload, indent=2)) PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"OK: Authenticated. Token saved to {TOKEN_PATH}") print(f"Profile-scoped token location: {display_hermes_home()}/google_token.json") def revoke(): """Revoke stored token and delete it.""" if not TOKEN_PATH.exists(): print("No token to revoke.") return _ensure_deps() from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request try: creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES) if creds.expired and creds.refresh_token: creds.refresh(Request()) import urllib.request urllib.request.urlopen( urllib.request.Request( f"https://oauth2.googleapis.com/revoke?token={creds.token}", method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, ) ) print("Token revoked with Google.") except Exception as e: print(f"Remote revocation failed (token may already be invalid): {e}") TOKEN_PATH.unlink(missing_ok=True) PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"Deleted {TOKEN_PATH}") def main(): parser = argparse.ArgumentParser(description="Google Workspace OAuth setup for Hermes") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--check", action="store_true", help="Check if auth is valid (exit 0=yes, 1=no)") group.add_argument("--client-secret", metavar="PATH", help="Store OAuth client_secret.json") group.add_argument("--auth-url", action="store_true", help="Print OAuth URL for user to visit") group.add_argument("--auth-code", metavar="CODE", help="Exchange auth code for token") group.add_argument("--revoke", action="store_true", help="Revoke and delete stored token") group.add_argument("--install-deps", action="store_true", help="Install Python dependencies") args = parser.parse_args() if args.check: sys.exit(0 if check_auth() else 1) elif args.client_secret: store_client_secret(args.client_secret) elif args.auth_url: get_auth_url() elif args.auth_code: exchange_auth_code(args.auth_code) elif args.revoke: revoke() elif args.install_deps: sys.exit(0 if install_deps() else 1) if __name__ == "__main__": main()