Coverage for src/pullapprove/pullrequests.py: 41%

347 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-12-15 22:19 -0600

1from __future__ import annotations 

2 

3import re 

4from collections.abc import Generator 

5from enum import Enum 

6from pathlib import Path 

7from random import Random 

8from typing import Any 

9 

10from pydantic import BaseModel, ConfigDict, Field 

11 

12from .config import ( 

13 ConfigModel, 

14 ConfigModels, 

15 LargeScaleChangeModel, 

16 OwnershipChoices, 

17 ReviewedForChoices, 

18 ScopeModel, 

19) 

20from .matches import ChangeMatches, ScopeCodeMatch, ScopePathMatch, match_diff 

21 

22 

23# Could be a bool if these are literally the only two states? 

24class Status(str, Enum): 

25 PASS = "PASS" 

26 FAIL = "FAIL" 

27 ERROR = "ERROR" 

28 PENDING = "PENDING" 

29 EMPTY = "" 

30 

31 

32class User(BaseModel): 

33 model_config = ConfigDict(extra="forbid") 

34 

35 host_id: str = Field(min_length=1) 

36 username: str = Field(min_length=1) 

37 avatar_url: str 

38 

39 def __str__(self) -> str: 

40 return self.username 

41 

42 def __eq__(self, value: Any) -> bool: 

43 if isinstance(value, User): 

44 return self.host_id == value.host_id 

45 elif isinstance(value, str): 

46 return self.host_id == value or self.username == value 

47 return False 

48 

49 

50class ReviewStates(str, Enum): 

51 APPROVED = "APPROVED" 

52 PENDING = "PENDING" 

53 CHANGES_REQUESTED = "CHANGES_REQUESTED" 

54 EMPTY = "" 

55 

56 

57class Review(BaseModel): 

58 model_config = ConfigDict(extra="forbid") 

59 

60 host_id: str = Field(min_length=1) 

61 host_url: str = Field(min_length=1) 

62 body: str 

63 state: ReviewStates 

64 submitted_at: str 

65 user: User 

66 

67 def get_reviewed_for_scopes(self) -> list[str]: 

68 if self.body: 

69 # Parse Reviewed-for: <scope> from the body (could be comma separated) 

70 if matches := re.findall( 

71 r"Reviewed-for:\s*(\S+)", self.body, re.IGNORECASE 

72 ): 

73 return [match.strip() for match in matches[0].split(",")] 

74 

75 return [] 

76 

77 

78class Reviewer(BaseModel): 

79 model_config = ConfigDict(extra="forbid") 

80 

81 reviews: list[Review] 

82 user: User 

83 

84 def __str__(self) -> str: 

85 return str(self.user) 

86 

87 def latest_review(self, scope: ScopeModel | None = None) -> Review | None: 

88 if not self.reviews: 

89 return None 

90 

91 # Most recent valid review is the one we want 

92 sorted_reviews = sorted( 

93 self.reviews, key=lambda r: r.submitted_at, reverse=True 

94 ) 

95 

96 for review in sorted_reviews: 

97 if scope and scope.reviewed_for != ReviewedForChoices.IGNORED: 

98 review_scopes = review.get_reviewed_for_scopes() 

99 

100 # Some scopes are required, so review_scopes can't be empty 

101 if ( 

102 scope.reviewed_for == ReviewedForChoices.REQUIRED 

103 and not review_scopes 

104 ): 

105 continue 

106 

107 if review_scopes and scope.name not in review_scopes: 

108 continue 

109 

110 # Otherwise review_scopes are [] and that is ok for everything 

111 

112 # If a review has no known state, we skip it (commented on GitHub) 

113 if review.state: 

114 return review 

115 

116 return None 

117 

118 def get_review_state(self) -> ReviewStates: 

119 if review := self.latest_review(): 

120 return review.state 

121 

122 # They are pending if they are a reviewer with no specific state 

123 return ReviewStates.PENDING 

124 

125 

126class Branch(BaseModel): 

127 model_config = ConfigDict(extra="forbid") 

128 

129 name: str = Field(min_length=1) 

130 # could be fork, other repo... 

131 

132 

133class PullRequest(BaseModel): 

134 model_config = ConfigDict(extra="forbid") 

135 

136 base_branch: Branch 

137 head_branch: Branch 

138 reviewers: list[Reviewer] # Includes requested and previous reviewers 

139 author: User 

140 diff: str | Generator = Field(exclude=True, default="") 

141 number: int 

142 draft: bool 

143 

144 # Configs actually come from outside the PR, so we don't attach it here 

145 

146 def get_reviewer(self, identifier: str) -> Reviewer | None: 

147 for reviewer in self.reviewers: 

148 if reviewer.user.host_id == identifier: 

149 return reviewer 

150 

151 if reviewer.user.username == identifier: 

152 return reviewer 

153 

154 return None 

155 

156 def process_configs(self, configs: ConfigModels) -> PullRequestResults | None: 

157 if not configs: 

158 return None 

159 

160 filtered_configs = configs.filter_for_pullrequest(self) 

161 

162 # If there are no configs, or they are are disabled, then we can return early 

163 if not filtered_configs: 

164 return None 

165 

166 diff_results = match_diff(filtered_configs, self.diff) 

167 

168 # If it's a large scale change, that's the only thing we need to consider (after branches) 

169 if diff_results.matches.large_scale_change: 

170 return self.process_large_scale_change( 

171 diff_results.matches, diff_results.additions, diff_results.deletions 

172 ) 

173 

174 results = PullRequestResults( 

175 pullrequest=self, 

176 status=Status.PENDING, 

177 description="", 

178 labels=[], 

179 large_scale_change_results=None, 

180 scope_results={}, 

181 path_results={}, 

182 code_results={}, 

183 review_results={}, 

184 config_results={ 

185 path: ConfigResult.from_config_model(config) 

186 for path, config in diff_results.matches.configs.items() 

187 }, 

188 config_paths_modified=diff_results.config_paths_modified, 

189 additions=diff_results.additions, 

190 deletions=diff_results.deletions, 

191 ) 

192 

193 # Iterate the active scopes and get their results 

194 for scope_name, scope_model in diff_results.matches.scopes.items(): 

195 reviews = [] 

196 review_points = 0 

197 pending_points = 0 

198 

199 for reviewer in self.reviewers: 

200 has_wildcard = "*" in scope_model.reviewers 

201 reviewer_in_scope = ( 

202 reviewer.user.username in scope_model.reviewers 

203 or reviewer.user.username in scope_model.alternates 

204 ) 

205 

206 # Could maybe enable host id, or email too 

207 if not has_wildcard and not reviewer_in_scope: 

208 continue 

209 

210 if review := reviewer.latest_review(scope=scope_model): 

211 reviews.append(review.host_id) 

212 results.review_results[review.host_id] = ReviewResult( 

213 review=review, 

214 scopes=review.get_reviewed_for_scopes(), 

215 ) 

216 

217 if review.state == ReviewStates.APPROVED: 

218 review_points += 1 

219 elif review.state in ( 

220 ReviewStates.PENDING, 

221 ReviewStates.CHANGES_REQUESTED, 

222 ): 

223 pending_points += 1 

224 else: 

225 # They exist on the PR but with no review yet 

226 pending_points += 1 

227 

228 # Author points only count if explicitly listed (wildcard is not converted to usernames) 

229 if self.author.username in scope_model.reviewers: 

230 author_points = scope_model.author_value 

231 else: 

232 author_points = 0 

233 

234 points = review_points + author_points 

235 

236 if any( 

237 results.review_results[review].review.state 

238 == ReviewStates.CHANGES_REQUESTED 

239 for review in reviews 

240 ): 

241 status = Status.FAIL 

242 elif points >= scope_model.require: 

243 status = Status.PASS 

244 else: 

245 status = Status.PENDING 

246 

247 matched_paths = [] 

248 for path, path_match in diff_results.matches.paths.items(): 

249 if scope_name in path_match.scopes: 

250 matched_paths.append(path) 

251 

252 matched_code = [] 

253 for code, code_match in diff_results.matches.code.items(): 

254 if scope_name in code_match.scopes: 

255 matched_code.append(code) 

256 

257 results.scope_results[scope_name] = ScopeResult( 

258 scope=scope_model, 

259 status=status, 

260 points=points, 

261 # separate review points and author points? 

262 points_pending=pending_points, # Not using this anywhere? would tell us how many to request... 

263 reviews=reviews, 

264 matched_paths=matched_paths, 

265 matched_code=matched_code, 

266 ) 

267 

268 # Now we have to get the status of the results overall by looking 

269 # at the paths and code, because scopes can combine based on their ownership model, 

270 # so looking at scopes alone isn't enough. 

271 

272 for path, path_match in diff_results.matches.paths.items(): 

273 results.path_results[path] = PathResult( 

274 path=path_match, 

275 status=results.status_for_scope_names(path_match.scopes), 

276 reviews=results.reviews_for_scope_names(path_match.scopes), 

277 ) 

278 

279 for code_hash, code_match in diff_results.matches.code.items(): 

280 results.code_results[code_hash] = CodeResult( 

281 code=code_match, 

282 status=results.status_for_scope_names(code_match.scopes), 

283 reviews=results.reviews_for_scope_names(code_match.scopes), 

284 ) 

285 

286 # TODO what happens if no scopes match? 

287 # configurable in pullapprove.com? 

288 

289 results.status = results.compute_status() 

290 results.description = results.compute_description() 

291 results.labels = results.compute_labels() 

292 

293 return results 

294 

295 def process_large_scale_change( 

296 self, change_matches: ChangeMatches, additions: int, deletions: int 

297 ) -> PullRequestResults: 

298 lsc = change_matches.large_scale_change 

299 assert lsc is not None, "large_scale_change must be set" 

300 reviews = [] 

301 review_points = 0 

302 pending_points = 0 

303 review_results = {} 

304 

305 config_results = { 

306 path: ConfigResult.from_config_model(config) 

307 for path, config in change_matches.configs.items() 

308 } 

309 

310 for reviewer in self.reviewers: 

311 # Check if wildcard is in reviewers list 

312 has_wildcard = "*" in lsc.reviewers 

313 reviewer_in_scope = reviewer.user.username in lsc.reviewers 

314 

315 # Could maybe enable host id, or email too 

316 if not has_wildcard and not reviewer_in_scope: 

317 continue 

318 

319 # TODO what about Reviewed-for? 

320 if review := reviewer.latest_review(scope=None): 

321 reviews.append(review.host_id) 

322 review_results[review.host_id] = ReviewResult( 

323 review=review, 

324 scopes=review.get_reviewed_for_scopes(), 

325 ) 

326 

327 if review.state == ReviewStates.APPROVED: 

328 review_points += 1 

329 elif review.state in ( 

330 ReviewStates.PENDING, 

331 ReviewStates.CHANGES_REQUESTED, 

332 ): 

333 pending_points += 1 

334 else: 

335 # They exist on the PR but with no review yet 

336 pending_points += 1 

337 

338 if any( 

339 review_results[review].review.state == ReviewStates.CHANGES_REQUESTED 

340 for review in reviews 

341 ): 

342 status = Status.FAIL 

343 description = "Large-scale change: changes requested" 

344 elif review_points >= lsc.require: 

345 status = Status.PASS 

346 description = "Large-scale change: approved" 

347 else: 

348 status = Status.PENDING 

349 description = f"Large-scale change: {review_points} of {lsc.require} reviewers approved" 

350 

351 # If reviewers were not defined (default LSC config), 

352 # then we show an error. 

353 if not lsc.reviewers: 

354 status = Status.FAIL 

355 description = ( 

356 "Large-scale change: configuration required (no reviewers defined)" 

357 ) 

358 

359 return PullRequestResults( 

360 status=status, 

361 description=description, 

362 labels=lsc.labels, 

363 large_scale_change_results=LargeScaleChangeResults( 

364 large_scale_change=lsc, 

365 status=status, 

366 points=review_points, 

367 points_pending=pending_points, 

368 reviews=reviews, 

369 ), 

370 scope_results={}, 

371 path_results={}, 

372 code_results={}, 

373 review_results=review_results, 

374 pullrequest=self, 

375 config_results=config_results, 

376 config_paths_modified=[], 

377 additions=additions, 

378 deletions=deletions, 

379 ) 

380 

381 

382class LargeScaleChangeResults(BaseModel): 

383 model_config = ConfigDict(extra="forbid") 

384 

385 large_scale_change: LargeScaleChangeModel 

386 status: Status 

387 points: int 

388 points_pending: int 

389 reviews: list[str] 

390 

391 

392class PullRequestResults(BaseModel): 

393 model_config = ConfigDict(extra="forbid") 

394 

395 # No defaults in this model, so we will always get all fields represented in the export 

396 status: Status 

397 description: str 

398 labels: list[str] 

399 # comments? 

400 

401 config_paths_modified: list[str] = Field( 

402 default_factory=list 

403 ) # Paths that were modified in the PR 

404 

405 # Diff statistics calculated during processing 

406 additions: int | None = None 

407 deletions: int | None = None 

408 

409 pullrequest: PullRequest 

410 

411 large_scale_change_results: LargeScaleChangeResults | None 

412 scope_results: dict[str, ScopeResult] 

413 path_results: dict[str, PathResult] 

414 code_results: dict[str, CodeResult] 

415 review_results: dict[str, ReviewResult] # Latest reviews and their scopes... 

416 config_results: dict[str, ConfigResult] 

417 

418 def as_dict(self) -> dict[str, Any]: 

419 """ 

420 Dump the results as a dictionary and remove any values that aren't the same 

421 as the defaults (we always use "empty" defaults) -- this keeps the stored JSON more minimal. 

422 

423 In the UI, the actual models are reloaded from the dict, so it is ok that we don't have all the information in the stored dict. 

424 """ 

425 return self.model_dump(exclude_defaults=True) 

426 

427 @classmethod 

428 def from_dict(cls, data: dict[str, Any]) -> PullRequestResults: 

429 return cls(**data) 

430 

431 def get_scope_results_by_name(self, names: list[str]) -> list[ScopeResult]: 

432 """ 

433 Get scopes by name (from other result objects), 

434 and return them in as ordered_scope_results() order. 

435 """ 

436 filtered_scopes = [ 

437 scope_result 

438 for scope_result in self.ordered_scope_results() 

439 if scope_result.scope.name in names 

440 ] 

441 return filtered_scopes 

442 

443 def ordered_scope_results(self) -> list[ScopeResult]: 

444 """Order by ownership (primary will naturally come first, then appended, then global)""" 

445 return sorted( 

446 self.scope_results.values(), 

447 key=lambda s: s.scope.ownership, 

448 ) 

449 

450 def scope_results_pending(self) -> list[ScopeResult]: 

451 """Get all scope results that are pending""" 

452 return [ 

453 scope_result 

454 for scope_result in self.scope_results.values() 

455 if scope_result.status == Status.PENDING 

456 and scope_result.scope.ownership != OwnershipChoices.GLOBAL 

457 ] 

458 

459 def path_results_pending(self) -> list[PathResult]: 

460 """Get all path results that are pending""" 

461 return [ 

462 path_result 

463 for path_result in self.path_results.values() 

464 if path_result.status == Status.PENDING 

465 ] 

466 

467 def code_results_pending(self) -> list[CodeResult]: 

468 """Get all code results that are pending""" 

469 return [ 

470 code_result 

471 for code_result in self.code_results.values() 

472 if code_result.status == Status.PENDING 

473 ] 

474 

475 def status_for_scope_names(self, scope_names: list[str]) -> Status: 

476 """ 

477 Get the status for a list of scopes. 

478 This is used to get the status for a list of scopes. 

479 """ 

480 scope_results = [self.scope_results[scope_name] for scope_name in scope_names] 

481 

482 # If there's a single scope, use that result (whether it is global, or normal, etc) 

483 if len(scope_results) == 1: 

484 return scope_results[0].status 

485 

486 # If any scope failed, then we fail 

487 if any(scope.status == Status.FAIL for scope in scope_results): 

488 return Status.FAIL 

489 

490 global_scopes = [ 

491 scope 

492 for scope in scope_results 

493 if scope.scope.ownership == OwnershipChoices.GLOBAL 

494 ] 

495 nonglobal_scopes = [ 

496 scope 

497 for scope in scope_results 

498 if scope.scope.ownership != OwnershipChoices.GLOBAL 

499 ] 

500 

501 # If any global scopes approved, then we pass 

502 if any(scope.status == Status.PASS for scope in global_scopes): 

503 return Status.PASS 

504 

505 # If all regular scopes approved, then we pass 

506 if all(scope.status == Status.PASS for scope in nonglobal_scopes): 

507 return Status.PASS 

508 

509 return Status.PENDING 

510 

511 def reviews_for_scope_names(self, scope_names: list[str]) -> list[str]: 

512 """ 

513 Get the reviews for a list of scopes. 

514 This is used to get the reviews for a list of scopes. 

515 """ 

516 scope_results = [self.scope_results[scope_name] for scope_name in scope_names] 

517 reviews = [] 

518 for scope in scope_results: 

519 reviews.extend(scope.reviews) 

520 return reviews 

521 

522 def get_scopes_for_review(self, review_id: str) -> list[ScopeResult]: 

523 """ 

524 Get all scopes that this review satisfies. 

525 Returns a list of ScopeResult objects where the review_id is in their reviews list. 

526 """ 

527 return [ 

528 scope_result 

529 for scope_result in self.scope_results.values() 

530 if review_id in scope_result.reviews 

531 ] 

532 

533 def compute_status(self) -> Status: 

534 if self.pullrequest.draft: 

535 return Status.PENDING 

536 

537 # Assume passing status to start 

538 # TODO is this the unmatched status? what if there are no enabled scopes 

539 status = Status.PASS 

540 

541 for path_results in self.path_results.values(): 

542 if path_results.status == Status.FAIL: 

543 return Status.FAIL # Immediately fail if any fail 

544 elif path_results.status == Status.PENDING: 

545 status = Status.PENDING # Move to pending (could fail later) 

546 

547 for code_results in self.code_results.values(): 

548 if code_results.status == Status.FAIL: 

549 return Status.FAIL # Immediately fail if any fail 

550 elif code_results.status == Status.PENDING: 

551 status = Status.PENDING # Move to pending (could fail later) 

552 

553 return status 

554 

555 def compute_description(self) -> str: 

556 if self.pullrequest.draft: 

557 return "Draft is not ready for review" 

558 

559 if self.status == Status.PASS: 

560 # In success, want to know how many scopes passed 

561 scopes_passed = [ 

562 scope 

563 for scope in self.scope_results.values() 

564 if scope.status == Status.PASS 

565 ] 

566 

567 if not scopes_passed: 

568 # If the status was pass, but there are no scopes, then there were none assigned 

569 return "No review scopes are required" 

570 

571 scope_text = "scope" if len(scopes_passed) == 1 else "scopes" 

572 return f"{len(scopes_passed)} review {scope_text} passed" 

573 elif self.status == Status.FAIL: 

574 scopes_failed = [ 

575 scope 

576 for scope in self.scope_results.values() 

577 if scope.status == Status.FAIL 

578 ] 

579 scope_text = "scope" if len(scopes_failed) == 1 else "scopes" 

580 return f"{len(scopes_failed)} review {scope_text} failed" 

581 elif self.status == Status.PENDING: 

582 # In pending, want to know how many scopes are pending 

583 scopes_passed = [ 

584 scope 

585 for scope in self.scope_results.values() 

586 if scope.status == Status.PASS 

587 ] 

588 scopes_pending = [ 

589 scope 

590 for scope in self.scope_results.values() 

591 if scope.status == Status.PENDING 

592 and scope.scope.ownership != OwnershipChoices.GLOBAL 

593 ] 

594 pending_text = "scope" if len(scopes_pending) == 1 else "scopes" 

595 if scopes_passed: 

596 passed_text = "scope" if len(scopes_passed) == 1 else "scopes" 

597 return f"{len(scopes_pending)} review {pending_text} pending, {len(scopes_passed)} review {passed_text} passed" 

598 else: 

599 return f"{len(scopes_pending)} review {pending_text} pending" 

600 else: 

601 return "" 

602 

603 def compute_labels(self) -> list[str]: 

604 labels = set() 

605 

606 for scope_result in self.scope_results.values(): 

607 labels.update(scope_result.scope.labels) 

608 

609 return list(labels) 

610 

611 def compute_overview(self) -> str: 

612 """Build a concise markdown overview for GitHub pull request comments.""" 

613 overview = "" 

614 overview += f"**{self.status.value}**: {self.description}\n\n" 

615 

616 if self.large_scale_change_results: 

617 lsc = self.large_scale_change_results 

618 overview += ( 

619 "### Large Scale Change\n\n" 

620 f"- Status: {lsc.status.value}\n" 

621 f"- Points: {lsc.points} (Pending: {lsc.points_pending})\n" 

622 ) 

623 

624 overview += "## Matched Scopes\n\n" 

625 matched_scopes = [ 

626 sr 

627 for sr in self.ordered_scope_results() 

628 if sr.matched_paths or sr.matched_code 

629 ] 

630 

631 if matched_scopes: 

632 for scope_result in matched_scopes: 

633 line = ( 

634 f"- **{scope_result.scope.printed_name()}**: {scope_result.status.value}" 

635 f" ({scope_result.points}/{scope_result.scope.require})" 

636 ) 

637 if scope_result.scope.cc: 

638 line += " cc: " + " ".join(f"@{u}" for u in scope_result.scope.cc) 

639 overview += line + "\n" 

640 

641 if scope_result.scope.instructions: 

642 overview += ( 

643 " <details>\n" 

644 f" {scope_result.scope.instructions}\n" 

645 " </details>\n" 

646 ) 

647 else: 

648 overview += "- None\n" 

649 

650 return overview 

651 

652 def rebuild_config_models(self) -> ConfigModels: 

653 """ 

654 Rebuild the ConfigModels from the config_results. 

655 This is useful for when we want to get the configs back from the results. 

656 """ 

657 configs = ConfigModels(root={}) 

658 for path, config_result in self.config_results.items(): 

659 configs.add_config(config_result.config, Path(path)) 

660 return configs 

661 

662 

663class ConfigResult(BaseModel): 

664 model_config = ConfigDict(extra="forbid") 

665 

666 config: ConfigModel 

667 

668 @classmethod 

669 def from_config_model(cls, config_model: ConfigModel) -> ConfigResult: 

670 return cls( 

671 config=config_model, 

672 ) 

673 

674 

675class ReviewResult(BaseModel): 

676 model_config = ConfigDict(extra="forbid") 

677 

678 review: Review 

679 scopes: list[str] # Explicit Reviewed-for scopes (empty if not specified) 

680 

681 

682class ScopeResult(BaseModel): 

683 model_config = ConfigDict(extra="forbid") 

684 

685 scope: ScopeModel 

686 status: Status # and/or review_status? 

687 points: int 

688 points_pending: int 

689 

690 reviews: list[str] # Review references 

691 matched_paths: list[str] # Path result references 

692 matched_code: list[str] # Code result references 

693 

694 def is_notable(self) -> bool: 

695 # In some cases, we don't care much about scopes that are global and not in use, for example 

696 if ( 

697 self.scope.ownership == OwnershipChoices.GLOBAL 

698 and self.status == Status.PENDING 

699 ): 

700 return False 

701 return True 

702 

703 def reviewers_to_request( 

704 self, pullrequest_results: PullRequestResults 

705 ) -> list[str]: 

706 if self.scope.request == 0 or not self.scope.reviewers: 

707 return [] 

708 

709 additional_reviewers_needed = ( 

710 self.scope.request - self.points - self.points_pending 

711 ) 

712 if additional_reviewers_needed <= 0: 

713 return [] 

714 

715 already_reviewed = pullrequest_results.review_results.values() 

716 

717 # Filter out wildcard and already reviewed users 

718 eligible_logins = [ 

719 login 

720 for login in self.scope.reviewers 

721 if login != "*" and login not in already_reviewed 

722 ] 

723 

724 # Remove the author from the list of eligible reviewers 

725 if pullrequest_results.pullrequest.author.username in eligible_logins: 

726 eligible_logins.remove(pullrequest_results.pullrequest.author.username) 

727 

728 if self.scope.request < 0: 

729 return eligible_logins 

730 

731 # Put the reviewers in a predictable random order for this PR 

732 Random(pullrequest_results.pullrequest.number).shuffle(eligible_logins) 

733 

734 return eligible_logins[:additional_reviewers_needed] 

735 

736 

737class PathResult(BaseModel): 

738 model_config = ConfigDict(extra="forbid") 

739 

740 path: ScopePathMatch 

741 status: Status 

742 reviews: list[str] # Review references 

743 

744 

745class CodeResult(BaseModel): 

746 model_config = ConfigDict(extra="forbid") 

747 

748 code: ScopeCodeMatch 

749 status: Status 

750 reviews: list[str] # Review references