""" OpenIPApi -- official Python client (self-hosted). Distribution: https://www.openipapi.com/sdk/python/openipapi.py Docs: https://www.openipapi.com/docs No third-party dependencies -- uses the standard library only (urllib). Compatible with Python 3.8+. Usage: from openipapi import OpenIPApi client = OpenIPApi("YOUR_KEY") data = client.lookup("8.8.8.8") print(data.get("threat", {}).get("is_vpn")) """ from __future__ import annotations import json from typing import Any, Iterable from urllib import request, parse, error class OpenIPApiError(Exception): """Raised when the API returns a non-2xx status or the network call fails.""" def __init__(self, message: str, status: int = 0, code: str = "") -> None: super().__init__(message) self.status = status self.code = code class OpenIPApi: def __init__(self, api_key: str, base_url: str = "https://api.openipapi.com") -> None: if not api_key: raise OpenIPApiError("API key is required", code="missing_api_key") self.api_key = api_key self.base_url = base_url.rstrip("/") # ------------------------------------------------------------------ private def _request(self, method: str, path: str, body: Any = None) -> Any: url = self.base_url + path data = None headers = { "X-API-Key": self.api_key, "Accept": "application/json", "User-Agent": "openipapi-python-sdk/1.0", } if body is not None: data = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" req = request.Request(url, data=data, headers=headers, method=method) try: with request.urlopen(req, timeout=30) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else None except error.HTTPError as e: raw = e.read().decode("utf-8") if e.fp else "" try: payload = json.loads(raw) if raw else {} except json.JSONDecodeError: payload = {} raise OpenIPApiError( payload.get("message") or e.reason or f"HTTP {e.code}", status=e.code, code=payload.get("error") or f"http_{e.code}", ) from None except error.URLError as e: raise OpenIPApiError(f"Network error: {e.reason}", code="network_error") from None # ------------------------------------------------------------------- public def lookup(self, ip: str) -> dict: """GET /v1/lookup/{ip} -- full geo + network + threat data for a single IP.""" return self._request("GET", "/v1/lookup/" + parse.quote(ip, safe=":")) def me(self) -> dict: """GET /v1/me -- geo + threat data for the caller's own IP.""" return self._request("GET", "/v1/me") def batch(self, ips: Iterable[str]) -> dict: """POST /v1/lookup/batch -- look up multiple IPs in one request (plan-dependent batch size).""" ip_list = list(ips) if not ip_list: raise OpenIPApiError("batch() expects a non-empty iterable of IPs", code="invalid_input") return self._request("POST", "/v1/lookup/batch", {"ips": ip_list}) def asn(self, asn: int | str) -> dict: """GET /v1/asn/{asn} -- ASN detail (Starter plan or above).""" s = str(asn).lstrip("AS").lstrip("as") return self._request("GET", "/v1/asn/" + parse.quote(s)) def validate(self, ip: str) -> dict: """GET /v1/validate/{ip} -- validate and classify an IP (does not consume quota).""" return self._request("GET", "/v1/validate/" + parse.quote(ip, safe=":")) __all__ = ["OpenIPApi", "OpenIPApiError"]