Coverage for aiopromql/client.py: 91%

61 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-05-30 13:59 +0300

1import warnings 

2from datetime import datetime 

3from typing import Optional, Union 

4 

5import httpx 

6 

7from .models.prometheus import PrometheusResponseModel 

8 

9 

10class PrometheusClientBase: 

11 """Base Prometheus client with common utilities.""" 

12 

13 def __init__(self, url: str): 

14 self.base_url = url 

15 

16 def _parse_response(self, response: dict) -> PrometheusResponseModel: 

17 """Parse Prometheus JSON response into model.""" 

18 return PrometheusResponseModel(**response) 

19 

20 

21class PrometheusSync(PrometheusClientBase): 

22 """Synchronous Prometheus client using httpx.""" 

23 

24 def __init__(self, url: str, timeout: Optional[float] = 2.0): 

25 super().__init__(url) 

26 self.session = httpx.Client(timeout=httpx.Timeout(timeout)) 

27 

28 def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]: 

29 """ 

30 Run an instant PromQL query. 

31 

32 :param promql: The PromQL query string to execute. 

33 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

34 :return: Parsed Prometheus response model or raw JSON dict. 

35 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx. 

36 :raises httpx.RequestError: If a network error occurs. 

37 """ 

38 response = self.session.get(f"{self.base_url}/api/v1/query", params={"query": promql}) 

39 response.raise_for_status() 

40 data = response.json() 

41 return data if raw else self._parse_response(data) 

42 

43 def query_range( 

44 self, 

45 promql: str, 

46 start: datetime, 

47 end: datetime, 

48 step: str = "30s", 

49 raw: bool = False, 

50 ) -> Union[PrometheusResponseModel, dict]: 

51 """ 

52 Run a ranged PromQL query over a time window. 

53 

54 :param promql: The PromQL query string to execute. 

55 :param start: Start datetime of the query range. 

56 :param end: End datetime of the query range. 

57 :param step: Query resolution step width (e.g., '30s', '1m'). 

58 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

59 :return: Parsed Prometheus response model or raw JSON dict. 

60 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx. 

61 :raises httpx.RequestError: If a network error occurs. 

62 """ 

63 start_ts = start.timestamp() 

64 end_ts = end.timestamp() 

65 response = self.session.get( 

66 f"{self.base_url}/api/v1/query_range", 

67 params={"query": promql, "start": start_ts, "end": end_ts, "step": step}, 

68 ) 

69 response.raise_for_status() 

70 data = response.json() 

71 return data if raw else self._parse_response(data) 

72 

73 def close(self): 

74 """Close the sync client session.""" 

75 self.session.close() 

76 

77 def __enter__(self): 

78 return self 

79 

80 def __exit__(self, exc_type, exc, tb): 

81 self.close() 

82 

83 def __del__(self): 

84 if not self.session.is_closed: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 warnings.warn("PrometheusSync was not closed. Use 'with' statement or call .close()") 

86 self.close() 

87 

88 

89class PrometheusAsync(PrometheusClientBase): 

90 """Asynchronous Prometheus client using httpx.""" 

91 

92 def __init__(self, url: str, timeout: Optional[float] = 2.0): 

93 super().__init__(url) 

94 self.client = httpx.AsyncClient(base_url=url, timeout=httpx.Timeout(timeout)) 

95 

96 async def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]: 

97 """ 

98 Run an instant PromQL query asynchronously. 

99 

100 :param promql: The PromQL query string to execute. 

101 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

102 :return: Parsed Prometheus response model or raw JSON dict. 

103 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx. 

104 :raises httpx.RequestError: If a network error occurs. 

105 """ 

106 response = await self.client.get("/api/v1/query", params={"query": promql}) 

107 response.raise_for_status() 

108 data = response.json() 

109 return data if raw else self._parse_response(data) 

110 

111 async def query_range( 

112 self, 

113 promql: str, 

114 start: datetime, 

115 end: datetime, 

116 step: str = "30s", 

117 raw: bool = False, 

118 ) -> Union[PrometheusResponseModel, dict]: 

119 """ 

120 Run a ranged PromQL query over a time window asynchronously. 

121 

122 :param promql: The PromQL query string to execute. 

123 :param start: Start datetime of the query range. 

124 :param end: End datetime of the query range. 

125 :param step: Query resolution step width (e.g., '30s', '1m'). 

126 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

127 :return: Parsed Prometheus response model or raw JSON dict. 

128 :raises httpx.HTTPStatusError: If HTTP response status is 4xx or 5xx. 

129 :raises httpx.RequestError: If a network error occurs. 

130 """ 

131 start_ts = start.timestamp() 

132 end_ts = end.timestamp() 

133 response = await self.client.get( 

134 "/api/v1/query_range", 

135 params={"query": promql, "start": start_ts, "end": end_ts, "step": step}, 

136 ) 

137 response.raise_for_status() 

138 data = response.json() 

139 return data if raw else self._parse_response(data) 

140 

141 async def aclose(self): 

142 """Close the async client session.""" 

143 await self.client.aclose() 

144 

145 async def __aenter__(self): 

146 return self 

147 

148 async def __aexit__(self, exc_type, exc, tb): 

149 await self.aclose() 

150 

151 def __del__(self): 

152 if not self.client.is_closed: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 warnings.warn("PrometheusAsync was not closed. Use 'async with' or call 'await .aclose()'")