Coverage for aiopromql/client.py: 96%
51 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-05-29 17:49 +0300
« prev ^ index » next coverage.py v7.8.2, created at 2025-05-29 17:49 +0300
1import warnings
2from datetime import datetime
3from typing import Optional, Union
5import httpx
7from .models.prometheus import PrometheusResponseModel
10class PrometheusClientBase:
11 """Base Prometheus client with common utilities."""
13 def __init__(self, url: str):
14 self.base_url = url
16 def _parse_response(self, response: dict) -> PrometheusResponseModel:
17 """Parse Prometheus JSON response into model."""
18 return PrometheusResponseModel(**response)
21class PrometheusSync(PrometheusClientBase):
22 """Synchronous Prometheus client using httpx."""
24 def __init__(self, url: str, timeout: Optional[float] = 2.0):
25 super().__init__(url)
26 self.session = httpx.Client(timeout=httpx.Timeout(timeout))
28 def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
29 """
30 Run an instant PromQL query.
32 :param promql: The PromQL query string to execute.
33 :type promql: str
34 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
35 :type raw: bool
36 :return: Parsed Prometheus response or raw JSON dict.
37 """
38 response = self.session.get(f"{self.base_url}/api/v1/query", params={"query": promql})
39 response.raise_for_status()
40 return response.json() if raw else self._parse_response(response.json())
42 def query_range(
43 self,
44 promql: str,
45 start: datetime,
46 end: datetime,
47 step: str = "30s",
48 raw: bool = False,
49 ) -> Union[PrometheusResponseModel, dict]:
50 """
51 Run a ranged PromQL query over a time window.
53 :param promql: The PromQL query string to execute.
54 :type promql: str
55 :param start: Start datetime of the query range.
56 :type start: datetime.datetime
57 :param end: End datetime of the query range.
58 :type end: datetime.datetime
59 :param step: Query resolution step width (e.g., '30s', '1m').
60 :type step: str
61 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
62 :type raw: bool
63 :return: Parsed Prometheus response or raw JSON dict.
64 """
65 start_ts = start.timestamp()
66 end_ts = end.timestamp()
67 response = self.session.get(
68 f"{self.base_url}/api/v1/query_range",
69 params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
70 )
71 response.raise_for_status()
72 return response.json() if raw else self._parse_response(response.json())
74 def close(self):
75 """Close the sync client session."""
76 self.session.close()
78 def __del__(self):
79 self.close()
82class PrometheusAsync(PrometheusClientBase):
83 """Asynchronous Prometheus client using httpx."""
85 def __init__(self, url: str, timeout: Optional[float] = 2.0):
86 super().__init__(url)
87 self.client = httpx.AsyncClient(base_url=url, timeout=httpx.Timeout(timeout))
89 async def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]:
90 """
91 Run an instant PromQL query.
93 :param promql: The PromQL query string to execute.
94 :type promql: str
95 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
96 :type raw: bool
97 :return: Parsed Prometheus response or raw JSON dict.
98 """
99 response = await self.client.get("/api/v1/query", params={"query": promql})
100 response.raise_for_status()
101 return response.json() if raw else self._parse_response(response.json())
103 async def query_range(
104 self,
105 promql: str,
106 start: datetime,
107 end: datetime,
108 step: str = "30s",
109 raw: bool = False,
110 ) -> Union[PrometheusResponseModel, dict]:
111 """
112 Run a ranged PromQL query over a time window.
114 :param promql: The PromQL query string to execute.
115 :type promql: str
116 :param start: Start datetime of the query range.
117 :type start: datetime.datetime
118 :param end: End datetime of the query range.
119 :type end: datetime.datetime
120 :param step: Query resolution step width (e.g., '30s', '1m').
121 :type step: str
122 :param raw: If True, return raw JSON response as dict; otherwise parse into model.
123 :type raw: bool
124 :return: Parsed Prometheus response or raw JSON dict.
125 """
126 start_ts = start.timestamp()
127 end_ts = end.timestamp()
128 response = await self.client.get(
129 "/api/v1/query_range",
130 params={"query": promql, "start": start_ts, "end": end_ts, "step": step},
131 )
132 response.raise_for_status()
133 return response.json() if raw else self._parse_response(response.json())
135 async def aclose(self):
136 """Close the async client session."""
137 await self.client.aclose()
139 async def __aenter__(self):
140 return self
142 async def __aexit__(self, exc_type, exc, tb):
143 await self.aclose()
145 def __del__(self):
146 if not self.client.is_closed: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 warnings.warn("PrometheusAsync was not closed. Use 'async with' or call 'await .aclose()'")