Coverage for src/pullapprove/matches.py: 80%
172 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-03-16 10:09 -0500
« prev ^ index » next coverage.py v7.8.2, created at 2026-03-16 10:09 -0500
1from __future__ import annotations
3import hashlib
4import json
5from collections.abc import Generator, Iterator
6from pathlib import Path
7from typing import Any
9from pydantic import BaseModel, ConfigDict, Field, model_validator
11from .config import (
12 ConfigModel,
13 ConfigModels,
14 LargeScaleChangeModel,
15 ScopeModel,
16)
17from .diff import DiffCode, DiffFile, iterate_diff_parts
18from .exceptions import LargeScaleChangeException
21def match_path(
22 *, path: Path, config: ConfigModel
23) -> tuple[ScopePathMatch, list[ScopeModel]]:
24 path_match = ScopePathMatch(path=str(path), scopes=[])
26 scopes_matching_paths = [
27 scope for scope in config.scopes if scope.matches_path(path)
28 ]
29 code_scopes = [scope for scope in scopes_matching_paths if scope.code]
30 path_scopes = [scope for scope in scopes_matching_paths if not scope.code]
32 # Set the scopes on the path itself
33 for scope in path_scopes:
34 path_match.add_scope(scope)
36 return path_match, code_scopes
39def match_code(
40 *, path: str, code: str, scopes: list[ScopeModel], line_offset: int = 0
41) -> Generator[ScopeCodeMatch]:
42 code_matches: dict[str, ScopeCodeMatch] = {}
44 for scope in scopes:
45 for match in scope.matches_code(code):
46 code_match = ScopeCodeMatch(
47 path=path,
48 start_line=line_offset + match["start_line"],
49 end_line=line_offset + match["end_line"],
50 start_column=match["start_col"],
51 end_column=match["end_col"],
52 scopes=[scope.name],
53 location_id="",
54 )
55 code_match._scopes = [scope]
57 if code_match.location_id in code_matches:
58 # Just add the scopes to it
59 code_matches[code_match.location_id].add_scope(scope)
60 else:
61 code_matches[code_match.location_id] = code_match
63 yield from code_matches.values()
66def match_files(configs: ConfigModels, files: Iterator[str]) -> ChangeMatches:
67 def _iterate() -> Generator[ScopePathMatch | ScopeCodeMatch]:
68 for f in files:
69 file_path = Path(f)
71 config = configs.compile_closest_config(file_path)
73 path_match, code_scopes = match_path(
74 path=file_path,
75 config=config,
76 )
78 # Yield the paths first
79 yield path_match
81 # Then go line by line to find scopes that match lines
82 if code_scopes:
83 try:
84 code = file_path.read_text()
85 yield from match_code(
86 path=str(file_path),
87 code=code,
88 scopes=code_scopes,
89 )
90 except UnicodeDecodeError:
91 # Skip binary files that can't be decoded as text
92 pass
94 return ChangeMatches.from_config_matches(configs, _iterate())
97def iterate_diff(
98 configs: ConfigModels, diff: Iterator[str] | str
99) -> Generator[tuple[DiffFile | DiffCode, list[ScopePathMatch | ScopeCodeMatch]]]:
100 # We can still iterate a diff without configs, just by yield the diff objs
101 if not configs:
102 for diff_obj in iterate_diff_parts(diff):
103 yield diff_obj, []
105 return
107 # Keep track of these as we go and jump between file header
108 # and raw code during iteration
109 check_code_scopes: list[ScopeModel] = []
110 current_code_path = None
112 current_code_diffs = []
114 # TODO get root config here, check diff size as we go and raise exception?
115 # or we need to keep track per LSC? should be a compiled value...
117 def yield_code_diffs() -> Generator[tuple[DiffCode, list[ScopeCodeMatch]]]:
118 # We're passing the entire diff chunk to see if there's a match inside,
119 # but if there is, it probably won't match EVERY line in the chunk
120 assert current_code_path is not None, "current_code_path must be set"
121 current_code_chunk = "\n".join([code.raw() for code in current_code_diffs])
122 current_code_line_number = current_code_diffs[0].line_number - 1
124 code_matches = match_code(
125 path=current_code_path,
126 code=current_code_chunk,
127 scopes=check_code_scopes,
128 line_offset=current_code_line_number,
129 )
130 code_matches = list(code_matches)
132 for diff_line_index, diff_code in enumerate(current_code_diffs):
133 subcode_matches = [
134 code_match
135 for code_match in code_matches
136 if code_match.start_line
137 <= (current_code_line_number + diff_line_index + 1)
138 <= code_match.end_line
139 ]
140 yield diff_code, subcode_matches
142 for diff_obj in iterate_diff_parts(diff):
143 if isinstance(diff_obj, DiffFile):
144 # Yield a code chunk if we finished one
145 if current_code_diffs:
146 yield from yield_code_diffs()
148 current_code_path = None
149 current_code_diffs = []
151 diff_file = diff_obj
152 file_path = Path(diff_file.new_path)
153 config = configs.compile_closest_config(file_path)
155 path_match, code_scopes = match_path(
156 path=file_path,
157 config=config,
158 )
160 current_code_path = str(file_path)
161 check_code_scopes = code_scopes
163 yield diff_obj, [path_match]
164 elif isinstance(diff_obj, DiffCode):
165 if check_code_scopes:
166 # It will be yielded later
167 current_code_diffs.append(diff_obj)
168 else:
169 # Skip all code lines if we don't care about code
170 yield diff_obj, []
172 # Yield the last code chunk we saw
173 if current_code_diffs:
174 yield from yield_code_diffs()
177def match_diff(configs: ConfigModels, diff: Iterator[str] | str) -> DiffResults:
178 config_paths_modified: set[str] = set()
179 additions = 0
180 deletions = 0
182 def iterate() -> Generator[ScopePathMatch | ScopeCodeMatch]:
183 nonlocal additions, deletions
184 for diff_obj, matches in iterate_diff(configs, diff):
185 # Track additions/deletions during existing iteration
186 if isinstance(diff_obj, DiffCode):
187 if diff_obj.is_addition():
188 additions += 1
189 elif diff_obj.is_deletion():
190 deletions += 1
192 if isinstance(diff_obj, DiffFile) and diff_obj.new_path in configs:
193 config_paths_modified.add(diff_obj.new_path)
194 if isinstance(diff_obj, DiffFile) and diff_obj.old_path in configs:
195 config_paths_modified.add(diff_obj.old_path)
197 yield from matches
199 try:
200 return DiffResults(
201 matches=ChangeMatches.from_config_matches(configs, iterate()),
202 config_paths_modified=list(config_paths_modified),
203 additions=additions,
204 deletions=deletions,
205 )
206 except LargeScaleChangeException:
207 # Get the large scale change config from CODEREVIEW.toml
208 lsc = configs.get_default_large_scale_change()
210 return DiffResults(
211 matches=ChangeMatches.from_large_scale_change(
212 configs=configs,
213 large_scale_change=lsc,
214 ),
215 config_paths_modified=list(config_paths_modified),
216 additions=additions,
217 deletions=deletions,
218 )
221class DiffResults(BaseModel):
222 """Results from analyzing a diff against configs."""
224 model_config = ConfigDict(extra="forbid")
226 matches: ChangeMatches
227 config_paths_modified: list[str] = Field(default_factory=list)
228 additions: int = 0
229 deletions: int = 0
232class ChangeMatches(BaseModel):
233 """
234 The matches for a given diff or set of files.
236 This knows nothing about a pull request (branches, commits, etc.)
237 """
239 model_config = ConfigDict(extra="forbid")
241 # Instead we could do
242 # - scopes
243 # - config
244 # - paths
245 # - code
246 # could add points, reviewers, etc to this
247 # but then we're mixing concerns... looking at raw files will just have empty values?
249 # Three modes are:
250 # - raw files
251 # - raw diff
252 # - pull request (has reviews)
254 configs: dict[str, ConfigModel] = {}
256 # The matching LSC, if there is one.
257 large_scale_change: LargeScaleChangeModel | None = None
259 # All scopes found in the results
260 scopes: dict[str, ScopeModel] = {}
262 # All evaluated paths
263 paths: dict[str, ScopePathMatch] = {}
265 # All code matches
266 code: dict[str, ScopeCodeMatch] = {}
268 def as_dict(self) -> dict[str, Any]:
269 return self.model_dump()
271 def __bool__(self) -> bool:
272 return bool(self.scopes)
274 @classmethod
275 def from_config_matches(
276 cls, configs: ConfigModels, matches: Iterator[ScopePathMatch | ScopeCodeMatch]
277 ) -> ChangeMatches:
278 scopes: dict[str, ScopeModel] = {}
279 paths: dict[str, ScopePathMatch] = {}
280 code: dict[str, ScopeCodeMatch] = {}
282 for match in matches:
283 # Store seen scopes as we go from all matches
284 for scope in match._scopes:
285 scopes[scope.name] = scope
287 if isinstance(match, ScopePathMatch):
288 if not match._scopes:
289 # Right now we don't care about storing anything that doesn't have scopes.
290 # This prevents an unnecessarily huge dump on big repos or PRs.
291 continue
293 paths[match.path] = match
295 elif isinstance(match, ScopeCodeMatch):
296 code_location_id = match.location_id
298 # Store it in the code results
299 code[code_location_id] = match
301 # Associate it with any path results
302 # if code_location_id not in paths[match.path].code:
303 # paths[match.path].code.append(code_location_id)
305 else:
306 raise ValueError(f"Unknown match type: {match}")
308 return cls(
309 large_scale_change=None,
310 scopes=scopes,
311 paths=paths,
312 code=code,
313 # Should this be compiled configs? At this point they may be modified (branches, author, etc.)
314 configs=configs.get_config_models(),
315 )
317 @classmethod
318 def from_large_scale_change(
319 cls,
320 configs: ConfigModels,
321 large_scale_change: LargeScaleChangeModel,
322 ) -> ChangeMatches:
323 return cls(
324 configs=configs.get_config_models(),
325 large_scale_change=large_scale_change,
326 scopes={},
327 paths={},
328 code={},
329 )
332class ScopePathMatch(BaseModel):
333 model_config = ConfigDict(extra="forbid")
335 path: str = Field(min_length=1)
336 scopes: list[str] # Field(min_length=1)
337 # code: list[str] = []
339 # Store this internally during processing (full reference of scope models)
340 _scopes: list[ScopeModel] = []
342 def add_scope(self, scope: ScopeModel) -> None:
343 if not scope.ownership:
344 # Remove any other scopes that don't have special ownership rules
345 # (i.e. we only want one primary scope in the end)
346 self._scopes = [s for s in self._scopes if s.ownership]
348 self._scopes.append(scope)
350 self.scopes = [s.name for s in self._scopes]
353class ScopeCodeMatch(BaseModel):
354 model_config = ConfigDict(extra="forbid")
356 # In a diff match, we could see both sides of the diff, i.e. repeated lines if the before and after both match...
357 path: str = Field(min_length=1)
358 start_line: int
359 end_line: int
360 start_column: int
361 end_column: int
362 scopes: list[str] # Field(min_length=1)
363 location_id: str
365 # Store this internally during processing (full reference of scope models)
366 _scopes: list[ScopeModel] = []
368 def printed_location(self) -> str:
369 if self.start_line == self.end_line:
370 return f"Ln {self.start_line}, Col {self.start_column}-{self.end_column}"
371 else:
372 return f"Ln {self.start_line}-{self.end_line}"
374 def add_scope(self, scope: ScopeModel) -> None:
375 if not scope.ownership:
376 # Remove any other scopes that don't have special ownership rules
377 # (i.e. we only want one primary scope in the end)
378 self._scopes = [s for s in self._scopes if s.ownership]
380 self._scopes.append(scope)
382 self.scopes = [s.name for s in self._scopes]
384 @model_validator(mode="after")
385 def compute_location_id(self) -> ScopeCodeMatch:
386 # only compute if the caller didn't provide one
387 if not self.location_id:
388 loc = {
389 "path": self.path,
390 "start_line": self.start_line,
391 "end_line": self.end_line,
392 "start_column": self.start_column,
393 "end_column": self.end_column,
394 }
395 raw = json.dumps(loc, sort_keys=True, separators=(",", ":")).encode()
396 self.location_id = hashlib.md5(raw).hexdigest()
397 return self
400# how to store what was reviewed? ideally we could be fine-grained, at some point
401# so we need to know who, which scopes, which paths, which codes (location hash) then we can cross reference everything?