Coverage for src/pullapprove/matches.py: 80%

172 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-03-16 10:09 -0500

1from __future__ import annotations 

2 

3import hashlib 

4import json 

5from collections.abc import Generator, Iterator 

6from pathlib import Path 

7from typing import Any 

8 

9from pydantic import BaseModel, ConfigDict, Field, model_validator 

10 

11from .config import ( 

12 ConfigModel, 

13 ConfigModels, 

14 LargeScaleChangeModel, 

15 ScopeModel, 

16) 

17from .diff import DiffCode, DiffFile, iterate_diff_parts 

18from .exceptions import LargeScaleChangeException 

19 

20 

21def match_path( 

22 *, path: Path, config: ConfigModel 

23) -> tuple[ScopePathMatch, list[ScopeModel]]: 

24 path_match = ScopePathMatch(path=str(path), scopes=[]) 

25 

26 scopes_matching_paths = [ 

27 scope for scope in config.scopes if scope.matches_path(path) 

28 ] 

29 code_scopes = [scope for scope in scopes_matching_paths if scope.code] 

30 path_scopes = [scope for scope in scopes_matching_paths if not scope.code] 

31 

32 # Set the scopes on the path itself 

33 for scope in path_scopes: 

34 path_match.add_scope(scope) 

35 

36 return path_match, code_scopes 

37 

38 

39def match_code( 

40 *, path: str, code: str, scopes: list[ScopeModel], line_offset: int = 0 

41) -> Generator[ScopeCodeMatch]: 

42 code_matches: dict[str, ScopeCodeMatch] = {} 

43 

44 for scope in scopes: 

45 for match in scope.matches_code(code): 

46 code_match = ScopeCodeMatch( 

47 path=path, 

48 start_line=line_offset + match["start_line"], 

49 end_line=line_offset + match["end_line"], 

50 start_column=match["start_col"], 

51 end_column=match["end_col"], 

52 scopes=[scope.name], 

53 location_id="", 

54 ) 

55 code_match._scopes = [scope] 

56 

57 if code_match.location_id in code_matches: 

58 # Just add the scopes to it 

59 code_matches[code_match.location_id].add_scope(scope) 

60 else: 

61 code_matches[code_match.location_id] = code_match 

62 

63 yield from code_matches.values() 

64 

65 

66def match_files(configs: ConfigModels, files: Iterator[str]) -> ChangeMatches: 

67 def _iterate() -> Generator[ScopePathMatch | ScopeCodeMatch]: 

68 for f in files: 

69 file_path = Path(f) 

70 

71 config = configs.compile_closest_config(file_path) 

72 

73 path_match, code_scopes = match_path( 

74 path=file_path, 

75 config=config, 

76 ) 

77 

78 # Yield the paths first 

79 yield path_match 

80 

81 # Then go line by line to find scopes that match lines 

82 if code_scopes: 

83 try: 

84 code = file_path.read_text() 

85 yield from match_code( 

86 path=str(file_path), 

87 code=code, 

88 scopes=code_scopes, 

89 ) 

90 except UnicodeDecodeError: 

91 # Skip binary files that can't be decoded as text 

92 pass 

93 

94 return ChangeMatches.from_config_matches(configs, _iterate()) 

95 

96 

97def iterate_diff( 

98 configs: ConfigModels, diff: Iterator[str] | str 

99) -> Generator[tuple[DiffFile | DiffCode, list[ScopePathMatch | ScopeCodeMatch]]]: 

100 # We can still iterate a diff without configs, just by yield the diff objs 

101 if not configs: 

102 for diff_obj in iterate_diff_parts(diff): 

103 yield diff_obj, [] 

104 

105 return 

106 

107 # Keep track of these as we go and jump between file header 

108 # and raw code during iteration 

109 check_code_scopes: list[ScopeModel] = [] 

110 current_code_path = None 

111 

112 current_code_diffs = [] 

113 

114 # TODO get root config here, check diff size as we go and raise exception? 

115 # or we need to keep track per LSC? should be a compiled value... 

116 

117 def yield_code_diffs() -> Generator[tuple[DiffCode, list[ScopeCodeMatch]]]: 

118 # We're passing the entire diff chunk to see if there's a match inside, 

119 # but if there is, it probably won't match EVERY line in the chunk 

120 assert current_code_path is not None, "current_code_path must be set" 

121 current_code_chunk = "\n".join([code.raw() for code in current_code_diffs]) 

122 current_code_line_number = current_code_diffs[0].line_number - 1 

123 

124 code_matches = match_code( 

125 path=current_code_path, 

126 code=current_code_chunk, 

127 scopes=check_code_scopes, 

128 line_offset=current_code_line_number, 

129 ) 

130 code_matches = list(code_matches) 

131 

132 for diff_line_index, diff_code in enumerate(current_code_diffs): 

133 subcode_matches = [ 

134 code_match 

135 for code_match in code_matches 

136 if code_match.start_line 

137 <= (current_code_line_number + diff_line_index + 1) 

138 <= code_match.end_line 

139 ] 

140 yield diff_code, subcode_matches 

141 

142 for diff_obj in iterate_diff_parts(diff): 

143 if isinstance(diff_obj, DiffFile): 

144 # Yield a code chunk if we finished one 

145 if current_code_diffs: 

146 yield from yield_code_diffs() 

147 

148 current_code_path = None 

149 current_code_diffs = [] 

150 

151 diff_file = diff_obj 

152 file_path = Path(diff_file.new_path) 

153 config = configs.compile_closest_config(file_path) 

154 

155 path_match, code_scopes = match_path( 

156 path=file_path, 

157 config=config, 

158 ) 

159 

160 current_code_path = str(file_path) 

161 check_code_scopes = code_scopes 

162 

163 yield diff_obj, [path_match] 

164 elif isinstance(diff_obj, DiffCode): 

165 if check_code_scopes: 

166 # It will be yielded later 

167 current_code_diffs.append(diff_obj) 

168 else: 

169 # Skip all code lines if we don't care about code 

170 yield diff_obj, [] 

171 

172 # Yield the last code chunk we saw 

173 if current_code_diffs: 

174 yield from yield_code_diffs() 

175 

176 

177def match_diff(configs: ConfigModels, diff: Iterator[str] | str) -> DiffResults: 

178 config_paths_modified: set[str] = set() 

179 additions = 0 

180 deletions = 0 

181 

182 def iterate() -> Generator[ScopePathMatch | ScopeCodeMatch]: 

183 nonlocal additions, deletions 

184 for diff_obj, matches in iterate_diff(configs, diff): 

185 # Track additions/deletions during existing iteration 

186 if isinstance(diff_obj, DiffCode): 

187 if diff_obj.is_addition(): 

188 additions += 1 

189 elif diff_obj.is_deletion(): 

190 deletions += 1 

191 

192 if isinstance(diff_obj, DiffFile) and diff_obj.new_path in configs: 

193 config_paths_modified.add(diff_obj.new_path) 

194 if isinstance(diff_obj, DiffFile) and diff_obj.old_path in configs: 

195 config_paths_modified.add(diff_obj.old_path) 

196 

197 yield from matches 

198 

199 try: 

200 return DiffResults( 

201 matches=ChangeMatches.from_config_matches(configs, iterate()), 

202 config_paths_modified=list(config_paths_modified), 

203 additions=additions, 

204 deletions=deletions, 

205 ) 

206 except LargeScaleChangeException: 

207 # Get the large scale change config from CODEREVIEW.toml 

208 lsc = configs.get_default_large_scale_change() 

209 

210 return DiffResults( 

211 matches=ChangeMatches.from_large_scale_change( 

212 configs=configs, 

213 large_scale_change=lsc, 

214 ), 

215 config_paths_modified=list(config_paths_modified), 

216 additions=additions, 

217 deletions=deletions, 

218 ) 

219 

220 

221class DiffResults(BaseModel): 

222 """Results from analyzing a diff against configs.""" 

223 

224 model_config = ConfigDict(extra="forbid") 

225 

226 matches: ChangeMatches 

227 config_paths_modified: list[str] = Field(default_factory=list) 

228 additions: int = 0 

229 deletions: int = 0 

230 

231 

232class ChangeMatches(BaseModel): 

233 """ 

234 The matches for a given diff or set of files. 

235 

236 This knows nothing about a pull request (branches, commits, etc.) 

237 """ 

238 

239 model_config = ConfigDict(extra="forbid") 

240 

241 # Instead we could do 

242 # - scopes 

243 # - config 

244 # - paths 

245 # - code 

246 # could add points, reviewers, etc to this 

247 # but then we're mixing concerns... looking at raw files will just have empty values? 

248 

249 # Three modes are: 

250 # - raw files 

251 # - raw diff 

252 # - pull request (has reviews) 

253 

254 configs: dict[str, ConfigModel] = {} 

255 

256 # The matching LSC, if there is one. 

257 large_scale_change: LargeScaleChangeModel | None = None 

258 

259 # All scopes found in the results 

260 scopes: dict[str, ScopeModel] = {} 

261 

262 # All evaluated paths 

263 paths: dict[str, ScopePathMatch] = {} 

264 

265 # All code matches 

266 code: dict[str, ScopeCodeMatch] = {} 

267 

268 def as_dict(self) -> dict[str, Any]: 

269 return self.model_dump() 

270 

271 def __bool__(self) -> bool: 

272 return bool(self.scopes) 

273 

274 @classmethod 

275 def from_config_matches( 

276 cls, configs: ConfigModels, matches: Iterator[ScopePathMatch | ScopeCodeMatch] 

277 ) -> ChangeMatches: 

278 scopes: dict[str, ScopeModel] = {} 

279 paths: dict[str, ScopePathMatch] = {} 

280 code: dict[str, ScopeCodeMatch] = {} 

281 

282 for match in matches: 

283 # Store seen scopes as we go from all matches 

284 for scope in match._scopes: 

285 scopes[scope.name] = scope 

286 

287 if isinstance(match, ScopePathMatch): 

288 if not match._scopes: 

289 # Right now we don't care about storing anything that doesn't have scopes. 

290 # This prevents an unnecessarily huge dump on big repos or PRs. 

291 continue 

292 

293 paths[match.path] = match 

294 

295 elif isinstance(match, ScopeCodeMatch): 

296 code_location_id = match.location_id 

297 

298 # Store it in the code results 

299 code[code_location_id] = match 

300 

301 # Associate it with any path results 

302 # if code_location_id not in paths[match.path].code: 

303 # paths[match.path].code.append(code_location_id) 

304 

305 else: 

306 raise ValueError(f"Unknown match type: {match}") 

307 

308 return cls( 

309 large_scale_change=None, 

310 scopes=scopes, 

311 paths=paths, 

312 code=code, 

313 # Should this be compiled configs? At this point they may be modified (branches, author, etc.) 

314 configs=configs.get_config_models(), 

315 ) 

316 

317 @classmethod 

318 def from_large_scale_change( 

319 cls, 

320 configs: ConfigModels, 

321 large_scale_change: LargeScaleChangeModel, 

322 ) -> ChangeMatches: 

323 return cls( 

324 configs=configs.get_config_models(), 

325 large_scale_change=large_scale_change, 

326 scopes={}, 

327 paths={}, 

328 code={}, 

329 ) 

330 

331 

332class ScopePathMatch(BaseModel): 

333 model_config = ConfigDict(extra="forbid") 

334 

335 path: str = Field(min_length=1) 

336 scopes: list[str] # Field(min_length=1) 

337 # code: list[str] = [] 

338 

339 # Store this internally during processing (full reference of scope models) 

340 _scopes: list[ScopeModel] = [] 

341 

342 def add_scope(self, scope: ScopeModel) -> None: 

343 if not scope.ownership: 

344 # Remove any other scopes that don't have special ownership rules 

345 # (i.e. we only want one primary scope in the end) 

346 self._scopes = [s for s in self._scopes if s.ownership] 

347 

348 self._scopes.append(scope) 

349 

350 self.scopes = [s.name for s in self._scopes] 

351 

352 

353class ScopeCodeMatch(BaseModel): 

354 model_config = ConfigDict(extra="forbid") 

355 

356 # In a diff match, we could see both sides of the diff, i.e. repeated lines if the before and after both match... 

357 path: str = Field(min_length=1) 

358 start_line: int 

359 end_line: int 

360 start_column: int 

361 end_column: int 

362 scopes: list[str] # Field(min_length=1) 

363 location_id: str 

364 

365 # Store this internally during processing (full reference of scope models) 

366 _scopes: list[ScopeModel] = [] 

367 

368 def printed_location(self) -> str: 

369 if self.start_line == self.end_line: 

370 return f"Ln {self.start_line}, Col {self.start_column}-{self.end_column}" 

371 else: 

372 return f"Ln {self.start_line}-{self.end_line}" 

373 

374 def add_scope(self, scope: ScopeModel) -> None: 

375 if not scope.ownership: 

376 # Remove any other scopes that don't have special ownership rules 

377 # (i.e. we only want one primary scope in the end) 

378 self._scopes = [s for s in self._scopes if s.ownership] 

379 

380 self._scopes.append(scope) 

381 

382 self.scopes = [s.name for s in self._scopes] 

383 

384 @model_validator(mode="after") 

385 def compute_location_id(self) -> ScopeCodeMatch: 

386 # only compute if the caller didn't provide one 

387 if not self.location_id: 

388 loc = { 

389 "path": self.path, 

390 "start_line": self.start_line, 

391 "end_line": self.end_line, 

392 "start_column": self.start_column, 

393 "end_column": self.end_column, 

394 } 

395 raw = json.dumps(loc, sort_keys=True, separators=(",", ":")).encode() 

396 self.location_id = hashlib.md5(raw).hexdigest() 

397 return self 

398 

399 

400# how to store what was reviewed? ideally we could be fine-grained, at some point 

401# so we need to know who, which scopes, which paths, which codes (location hash) then we can cross reference everything?