import warnings
from datetime import datetime
from typing import Optional, Union
import httpx
from .models.prometheus import PrometheusResponseModel
[docs]
class PrometheusClientBase:
"""Base Prometheus client with common utilities."""
def __init__(self, url: str):
self.base_url = url
[docs]
def _parse_response(self, response: dict) -> PrometheusResponseModel:
"""Parse Prometheus JSON response into model."""
return PrometheusResponseModel(**response)
[docs]
class PrometheusSync(PrometheusClientBase):
"""Synchronous Prometheus client using httpx."""
def __init__(self, url: str, timeout: Optional[float] = 2.0):
super().__init__(url)
self.session = httpx.Client(timeout=httpx.Timeout(timeout))
[docs]
def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
"""
Run an instant PromQL query.
:param promql: The PromQL query string to execute.
:type promql: str
:param raw: If True, return raw JSON response as dict; otherwise parse into model.
:type raw: bool
:return: Parsed Prometheus response or raw JSON dict.
"""
response = self.session.get(f"{self.base_url}/api/v1/query", params={"query": promql})
response.raise_for_status()
return response.json() if raw else self._parse_response(response.json())
[docs]
def query_range(
self,
promql: str,
start: datetime,
end: datetime,
step: str = "30s",
raw: bool = False,
) -> Union[PrometheusResponseModel, dict]:
"""
Run a ranged PromQL query over a time window.
:param promql: The PromQL query string to execute.
:type promql: str
:param start: Start datetime of the query range.
:type start: datetime.datetime
:param end: End datetime of the query range.
:type end: datetime.datetime
:param step: Query resolution step width (e.g., '30s', '1m').
:type step: str
:param raw: If True, return raw JSON response as dict; otherwise parse into model.
:type raw: bool
:return: Parsed Prometheus response or raw JSON dict.
"""
start_ts = start.timestamp()
end_ts = end.timestamp()
response = self.session.get(
f"{self.base_url}/api/v1/query_range",
params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
)
response.raise_for_status()
return response.json() if raw else self._parse_response(response.json())
[docs]
def close(self):
"""Close the sync client session."""
self.session.close()
def __del__(self):
self.close()
[docs]
class PrometheusAsync(PrometheusClientBase):
"""Asynchronous Prometheus client using httpx."""
def __init__(self, url: str, timeout: Optional[float] = 2.0):
super().__init__(url)
self.client = httpx.AsyncClient(base_url=url, timeout=httpx.Timeout(timeout))
[docs]
async def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
"""
Run an instant PromQL query.
:param promql: The PromQL query string to execute.
:type promql: str
:param raw: If True, return raw JSON response as dict; otherwise parse into model.
:type raw: bool
:return: Parsed Prometheus response or raw JSON dict.
"""
response = await self.client.get("/api/v1/query", params={"query": promql})
response.raise_for_status()
return response.json() if raw else self._parse_response(response.json())
[docs]
async def query_range(
self,
promql: str,
start: datetime,
end: datetime,
step: str = "30s",
raw: bool = False,
) -> Union[PrometheusResponseModel, dict]:
"""
Run a ranged PromQL query over a time window.
:param promql: The PromQL query string to execute.
:type promql: str
:param start: Start datetime of the query range.
:type start: datetime.datetime
:param end: End datetime of the query range.
:type end: datetime.datetime
:param step: Query resolution step width (e.g., '30s', '1m').
:type step: str
:param raw: If True, return raw JSON response as dict; otherwise parse into model.
:type raw: bool
:return: Parsed Prometheus response or raw JSON dict.
"""
start_ts = start.timestamp()
end_ts = end.timestamp()
response = await self.client.get(
"/api/v1/query_range",
params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
)
response.raise_for_status()
return response.json() if raw else self._parse_response(response.json())
[docs]
async def aclose(self):
"""Close the async client session."""
await self.client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.aclose()
def __del__(self):
if not self.client.is_closed:
warnings.warn("PrometheusAsync was not closed. Use 'async with' or call 'await .aclose()'")