Coverage for aiopromql/client.py: 96%

51 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-05-29 17:49 +0300

1import warnings 

2from datetime import datetime 

3from typing import Optional, Union 

4 

5import httpx 

6 

7from .models.prometheus import PrometheusResponseModel 

8 

9 

10class PrometheusClientBase: 

11 """Base Prometheus client with common utilities.""" 

12 

13 def __init__(self, url: str): 

14 self.base_url = url 

15 

16 def _parse_response(self, response: dict) -> PrometheusResponseModel: 

17 """Parse Prometheus JSON response into model.""" 

18 return PrometheusResponseModel(**response) 

19 

20 

21class PrometheusSync(PrometheusClientBase): 

22 """Synchronous Prometheus client using httpx.""" 

23 

24 def __init__(self, url: str, timeout: Optional[float] = 2.0): 

25 super().__init__(url) 

26 self.session = httpx.Client(timeout=httpx.Timeout(timeout)) 

27 

28 def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]: 

29 """ 

30 Run an instant PromQL query. 

31 

32 :param promql: The PromQL query string to execute. 

33 :type promql: str 

34 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

35 :type raw: bool 

36 :return: Parsed Prometheus response or raw JSON dict. 

37 """ 

38 response = self.session.get(f"{self.base_url}/api/v1/query", params={"query": promql}) 

39 response.raise_for_status() 

40 return response.json() if raw else self._parse_response(response.json()) 

41 

42 def query_range( 

43 self, 

44 promql: str, 

45 start: datetime, 

46 end: datetime, 

47 step: str = "30s", 

48 raw: bool = False, 

49 ) -> Union[PrometheusResponseModel, dict]: 

50 """ 

51 Run a ranged PromQL query over a time window. 

52 

53 :param promql: The PromQL query string to execute. 

54 :type promql: str 

55 :param start: Start datetime of the query range. 

56 :type start: datetime.datetime 

57 :param end: End datetime of the query range. 

58 :type end: datetime.datetime 

59 :param step: Query resolution step width (e.g., '30s', '1m'). 

60 :type step: str 

61 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

62 :type raw: bool 

63 :return: Parsed Prometheus response or raw JSON dict. 

64 """ 

65 start_ts = start.timestamp() 

66 end_ts = end.timestamp() 

67 response = self.session.get( 

68 f"{self.base_url}/api/v1/query_range", 

69 params={"query": promql, "start": start_ts, "end": end_ts, "step": step}, 

70 ) 

71 response.raise_for_status() 

72 return response.json() if raw else self._parse_response(response.json()) 

73 

74 def close(self): 

75 """Close the sync client session.""" 

76 self.session.close() 

77 

78 def __del__(self): 

79 self.close() 

80 

81 

82class PrometheusAsync(PrometheusClientBase): 

83 """Asynchronous Prometheus client using httpx.""" 

84 

85 def __init__(self, url: str, timeout: Optional[float] = 2.0): 

86 super().__init__(url) 

87 self.client = httpx.AsyncClient(base_url=url, timeout=httpx.Timeout(timeout)) 

88 

89 async def query(self, promql: str, raw: bool = False) -> Union[PrometheusResponseModel, dict]: 

90 """ 

91 Run an instant PromQL query. 

92 

93 :param promql: The PromQL query string to execute. 

94 :type promql: str 

95 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

96 :type raw: bool 

97 :return: Parsed Prometheus response or raw JSON dict. 

98 """ 

99 response = await self.client.get("/api/v1/query", params={"query": promql}) 

100 response.raise_for_status() 

101 return response.json() if raw else self._parse_response(response.json()) 

102 

103 async def query_range( 

104 self, 

105 promql: str, 

106 start: datetime, 

107 end: datetime, 

108 step: str = "30s", 

109 raw: bool = False, 

110 ) -> Union[PrometheusResponseModel, dict]: 

111 """ 

112 Run a ranged PromQL query over a time window. 

113 

114 :param promql: The PromQL query string to execute. 

115 :type promql: str 

116 :param start: Start datetime of the query range. 

117 :type start: datetime.datetime 

118 :param end: End datetime of the query range. 

119 :type end: datetime.datetime 

120 :param step: Query resolution step width (e.g., '30s', '1m'). 

121 :type step: str 

122 :param raw: If True, return raw JSON response as dict; otherwise parse into model. 

123 :type raw: bool 

124 :return: Parsed Prometheus response or raw JSON dict. 

125 """ 

126 start_ts = start.timestamp() 

127 end_ts = end.timestamp() 

128 response = await self.client.get( 

129 "/api/v1/query_range", 

130 params={"query": promql, "start": start_ts, "end": end_ts, "step": step}, 

131 ) 

132 response.raise_for_status() 

133 return response.json() if raw else self._parse_response(response.json()) 

134 

135 async def aclose(self): 

136 """Close the async client session.""" 

137 await self.client.aclose() 

138 

139 async def __aenter__(self): 

140 return self 

141 

142 async def __aexit__(self, exc_type, exc, tb): 

143 await self.aclose() 

144 

145 def __del__(self): 

146 if not self.client.is_closed: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 warnings.warn("PrometheusAsync was not closed. Use 'async with' or call 'await .aclose()'")