Coverage for aiopromql/client.py: 91%
61 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-05-30 13:59 +0300
« prev ^ index » next coverage.py v7.8.2, created at 2025-05-30 13:59 +0300
1import warnings
2from datetime import datetime
3from typing import Optional, Union
5import httpx
7from .models.prometheus import PrometheusResponseModel
10class PrometheusClientBase:
11 """Base Prometheus client with common utilities."""
13 def __init__(self, url: str):
14 self.base_url = url
16 def _parse_response(self, response: dict) -> PrometheusResponseModel:
17 """Parse Prometheus JSON response into model."""
18 return PrometheusResponseModel(**response)
21class PrometheusSync(PrometheusClientBase):
22 """Synchronous Prometheus client using httpx."""
24 def __init__(self, url: str, timeout: Optional[float] = 2.0):
25 super().__init__(url)
26 self.session = httpx.Client(timeout=httpx.Timeout(timeout))
28 def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
29 """
30 Run an instant PromQL query.
32 :param promql: The PromQL query string to execute.
33 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
34 :return: Parsed Prometheus response model or raw JSON dict.
35 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx.
36 :raises httpx.RequestError: If a network error occurs.
37 """
38 response = self.session.get(f"{self.base_url}/api/v1/query", params={"query": promql})
39 response.raise_for_status()
40 data = response.json()
41 return data if raw else self._parse_response(data)
43 def query_range(
44 self,
45 promql: str,
46 start: datetime,
47 end: datetime,
48 step: str = "30s",
49 raw: bool = False,
50 ) -> Union[PrometheusResponseModel, dict]:
51 """
52 Run a ranged PromQL query over a time window.
54 :param promql: The PromQL query string to execute.
55 :param start: Start datetime of the query range.
56 :param end: End datetime of the query range.
57 :param step: Query resolution step width (e.g., '30s', '1m').
58 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
59 :return: Parsed Prometheus response model or raw JSON dict.
60 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx.
61 :raises httpx.RequestError: If a network error occurs.
62 """
63 start_ts = start.timestamp()
64 end_ts = end.timestamp()
65 response = self.session.get(
66 f"{self.base_url}/api/v1/query_range",
67 params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
68 )
69 response.raise_for_status()
70 data = response.json()
71 return data if raw else self._parse_response(data)
73 def close(self):
74 """Close the sync client session."""
75 self.session.close()
77 def __enter__(self):
78 return self
80 def __exit__(self, exc_type, exc, tb):
81 self.close()
83 def __del__(self):
84 if not self.session.is_closed: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 warnings.warn("PrometheusSync was not closed. Use 'with' statement or call .close()")
86 self.close()
89class PrometheusAsync(PrometheusClientBase):
90 """Asynchronous Prometheus client using httpx."""
92 def __init__(self, url: str, timeout: Optional[float] = 2.0):
93 super().__init__(url)
94 self.client = httpx.AsyncClient(base_url=url, timeout=httpx.Timeout(timeout))
96 async def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
97 """
98 Run an instant PromQL query asynchronously.
100 :param promql: The PromQL query string to execute.
101 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
102 :return: Parsed Prometheus response model or raw JSON dict.
103 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx.
104 :raises httpx.RequestError: If a network error occurs.
105 """
106 response = await self.client.get("/api/v1/query", params={"query": promql})
107 response.raise_for_status()
108 data = response.json()
109 return data if raw else self._parse_response(data)
111 async def query_range(
112 self,
113 promql: str,
114 start: datetime,
115 end: datetime,
116 step: str = "30s",
117 raw: bool = False,
118 ) -> Union[PrometheusResponseModel, dict]:
119 """
120 Run a ranged PromQL query over a time window asynchronously.
122 :param promql: The PromQL query string to execute.
123 :param start: Start datetime of the query range.
124 :param end: End datetime of the query range.
125 :param step: Query resolution step width (e.g., '30s', '1m').
126 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
127 :return: Parsed Prometheus response model or raw JSON dict.
128 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx.
129 :raises httpx.RequestError: If a network error occurs.
130 """
131 start_ts = start.timestamp()
132 end_ts = end.timestamp()
133 response = await self.client.get(
134 "/api/v1/query_range",
135 params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
136 )
137 response.raise_for_status()
138 data = response.json()
139 return data if raw else self._parse_response(data)
141 async def aclose(self):
142 """Close the async client session."""
143 await self.client.aclose()
145 async def __aenter__(self):
146 return self
148 async def __aexit__(self, exc_type, exc, tb):
149 await self.aclose()
151 def __del__(self):
152 if not self.client.is_closed: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 warnings.warn("PrometheusAsync was not closed. Use 'async with' or call 'await .aclose()'")