Coverage for src/pullapprove/pullrequests.py: 41%
347 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-12-15 22:19 -0600
« prev ^ index » next coverage.py v7.8.2, created at 2025-12-15 22:19 -0600
1from __future__ import annotations
3import re
4from collections.abc import Generator
5from enum import Enum
6from pathlib import Path
7from random import Random
8from typing import Any
10from pydantic import BaseModel, ConfigDict, Field
12from .config import (
13 ConfigModel,
14 ConfigModels,
15 LargeScaleChangeModel,
16 OwnershipChoices,
17 ReviewedForChoices,
18 ScopeModel,
19)
20from .matches import ChangeMatches, ScopeCodeMatch, ScopePathMatch, match_diff
23# Could be a bool if these are literally the only two states?
24class Status(str, Enum):
25 PASS = "PASS"
26 FAIL = "FAIL"
27 ERROR = "ERROR"
28 PENDING = "PENDING"
29 EMPTY = ""
32class User(BaseModel):
33 model_config = ConfigDict(extra="forbid")
35 host_id: str = Field(min_length=1)
36 username: str = Field(min_length=1)
37 avatar_url: str
39 def __str__(self) -> str:
40 return self.username
42 def __eq__(self, value: Any) -> bool:
43 if isinstance(value, User):
44 return self.host_id == value.host_id
45 elif isinstance(value, str):
46 return self.host_id == value or self.username == value
47 return False
50class ReviewStates(str, Enum):
51 APPROVED = "APPROVED"
52 PENDING = "PENDING"
53 CHANGES_REQUESTED = "CHANGES_REQUESTED"
54 EMPTY = ""
57class Review(BaseModel):
58 model_config = ConfigDict(extra="forbid")
60 host_id: str = Field(min_length=1)
61 host_url: str = Field(min_length=1)
62 body: str
63 state: ReviewStates
64 submitted_at: str
65 user: User
67 def get_reviewed_for_scopes(self) -> list[str]:
68 if self.body:
69 # Parse Reviewed-for: <scope> from the body (could be comma separated)
70 if matches := re.findall(
71 r"Reviewed-for:\s*(\S+)", self.body, re.IGNORECASE
72 ):
73 return [match.strip() for match in matches[0].split(",")]
75 return []
78class Reviewer(BaseModel):
79 model_config = ConfigDict(extra="forbid")
81 reviews: list[Review]
82 user: User
84 def __str__(self) -> str:
85 return str(self.user)
87 def latest_review(self, scope: ScopeModel | None = None) -> Review | None:
88 if not self.reviews:
89 return None
91 # Most recent valid review is the one we want
92 sorted_reviews = sorted(
93 self.reviews, key=lambda r: r.submitted_at, reverse=True
94 )
96 for review in sorted_reviews:
97 if scope and scope.reviewed_for != ReviewedForChoices.IGNORED:
98 review_scopes = review.get_reviewed_for_scopes()
100 # Some scopes are required, so review_scopes can't be empty
101 if (
102 scope.reviewed_for == ReviewedForChoices.REQUIRED
103 and not review_scopes
104 ):
105 continue
107 if review_scopes and scope.name not in review_scopes:
108 continue
110 # Otherwise review_scopes are [] and that is ok for everything
112 # If a review has no known state, we skip it (commented on GitHub)
113 if review.state:
114 return review
116 return None
118 def get_review_state(self) -> ReviewStates:
119 if review := self.latest_review():
120 return review.state
122 # They are pending if they are a reviewer with no specific state
123 return ReviewStates.PENDING
126class Branch(BaseModel):
127 model_config = ConfigDict(extra="forbid")
129 name: str = Field(min_length=1)
130 # could be fork, other repo...
133class PullRequest(BaseModel):
134 model_config = ConfigDict(extra="forbid")
136 base_branch: Branch
137 head_branch: Branch
138 reviewers: list[Reviewer] # Includes requested and previous reviewers
139 author: User
140 diff: str | Generator = Field(exclude=True, default="")
141 number: int
142 draft: bool
144 # Configs actually come from outside the PR, so we don't attach it here
146 def get_reviewer(self, identifier: str) -> Reviewer | None:
147 for reviewer in self.reviewers:
148 if reviewer.user.host_id == identifier:
149 return reviewer
151 if reviewer.user.username == identifier:
152 return reviewer
154 return None
156 def process_configs(self, configs: ConfigModels) -> PullRequestResults | None:
157 if not configs:
158 return None
160 filtered_configs = configs.filter_for_pullrequest(self)
162 # If there are no configs, or they are are disabled, then we can return early
163 if not filtered_configs:
164 return None
166 diff_results = match_diff(filtered_configs, self.diff)
168 # If it's a large scale change, that's the only thing we need to consider (after branches)
169 if diff_results.matches.large_scale_change:
170 return self.process_large_scale_change(
171 diff_results.matches, diff_results.additions, diff_results.deletions
172 )
174 results = PullRequestResults(
175 pullrequest=self,
176 status=Status.PENDING,
177 description="",
178 labels=[],
179 large_scale_change_results=None,
180 scope_results={},
181 path_results={},
182 code_results={},
183 review_results={},
184 config_results={
185 path: ConfigResult.from_config_model(config)
186 for path, config in diff_results.matches.configs.items()
187 },
188 config_paths_modified=diff_results.config_paths_modified,
189 additions=diff_results.additions,
190 deletions=diff_results.deletions,
191 )
193 # Iterate the active scopes and get their results
194 for scope_name, scope_model in diff_results.matches.scopes.items():
195 reviews = []
196 review_points = 0
197 pending_points = 0
199 for reviewer in self.reviewers:
200 has_wildcard = "*" in scope_model.reviewers
201 reviewer_in_scope = (
202 reviewer.user.username in scope_model.reviewers
203 or reviewer.user.username in scope_model.alternates
204 )
206 # Could maybe enable host id, or email too
207 if not has_wildcard and not reviewer_in_scope:
208 continue
210 if review := reviewer.latest_review(scope=scope_model):
211 reviews.append(review.host_id)
212 results.review_results[review.host_id] = ReviewResult(
213 review=review,
214 scopes=review.get_reviewed_for_scopes(),
215 )
217 if review.state == ReviewStates.APPROVED:
218 review_points += 1
219 elif review.state in (
220 ReviewStates.PENDING,
221 ReviewStates.CHANGES_REQUESTED,
222 ):
223 pending_points += 1
224 else:
225 # They exist on the PR but with no review yet
226 pending_points += 1
228 # Author points only count if explicitly listed (wildcard is not converted to usernames)
229 if self.author.username in scope_model.reviewers:
230 author_points = scope_model.author_value
231 else:
232 author_points = 0
234 points = review_points + author_points
236 if any(
237 results.review_results[review].review.state
238 == ReviewStates.CHANGES_REQUESTED
239 for review in reviews
240 ):
241 status = Status.FAIL
242 elif points >= scope_model.require:
243 status = Status.PASS
244 else:
245 status = Status.PENDING
247 matched_paths = []
248 for path, path_match in diff_results.matches.paths.items():
249 if scope_name in path_match.scopes:
250 matched_paths.append(path)
252 matched_code = []
253 for code, code_match in diff_results.matches.code.items():
254 if scope_name in code_match.scopes:
255 matched_code.append(code)
257 results.scope_results[scope_name] = ScopeResult(
258 scope=scope_model,
259 status=status,
260 points=points,
261 # separate review points and author points?
262 points_pending=pending_points, # Not using this anywhere? would tell us how many to request...
263 reviews=reviews,
264 matched_paths=matched_paths,
265 matched_code=matched_code,
266 )
268 # Now we have to get the status of the results overall by looking
269 # at the paths and code, because scopes can combine based on their ownership model,
270 # so looking at scopes alone isn't enough.
272 for path, path_match in diff_results.matches.paths.items():
273 results.path_results[path] = PathResult(
274 path=path_match,
275 status=results.status_for_scope_names(path_match.scopes),
276 reviews=results.reviews_for_scope_names(path_match.scopes),
277 )
279 for code_hash, code_match in diff_results.matches.code.items():
280 results.code_results[code_hash] = CodeResult(
281 code=code_match,
282 status=results.status_for_scope_names(code_match.scopes),
283 reviews=results.reviews_for_scope_names(code_match.scopes),
284 )
286 # TODO what happens if no scopes match?
287 # configurable in pullapprove.com?
289 results.status = results.compute_status()
290 results.description = results.compute_description()
291 results.labels = results.compute_labels()
293 return results
295 def process_large_scale_change(
296 self, change_matches: ChangeMatches, additions: int, deletions: int
297 ) -> PullRequestResults:
298 lsc = change_matches.large_scale_change
299 assert lsc is not None, "large_scale_change must be set"
300 reviews = []
301 review_points = 0
302 pending_points = 0
303 review_results = {}
305 config_results = {
306 path: ConfigResult.from_config_model(config)
307 for path, config in change_matches.configs.items()
308 }
310 for reviewer in self.reviewers:
311 # Check if wildcard is in reviewers list
312 has_wildcard = "*" in lsc.reviewers
313 reviewer_in_scope = reviewer.user.username in lsc.reviewers
315 # Could maybe enable host id, or email too
316 if not has_wildcard and not reviewer_in_scope:
317 continue
319 # TODO what about Reviewed-for?
320 if review := reviewer.latest_review(scope=None):
321 reviews.append(review.host_id)
322 review_results[review.host_id] = ReviewResult(
323 review=review,
324 scopes=review.get_reviewed_for_scopes(),
325 )
327 if review.state == ReviewStates.APPROVED:
328 review_points += 1
329 elif review.state in (
330 ReviewStates.PENDING,
331 ReviewStates.CHANGES_REQUESTED,
332 ):
333 pending_points += 1
334 else:
335 # They exist on the PR but with no review yet
336 pending_points += 1
338 if any(
339 review_results[review].review.state == ReviewStates.CHANGES_REQUESTED
340 for review in reviews
341 ):
342 status = Status.FAIL
343 description = "Large-scale change: changes requested"
344 elif review_points >= lsc.require:
345 status = Status.PASS
346 description = "Large-scale change: approved"
347 else:
348 status = Status.PENDING
349 description = f"Large-scale change: {review_points} of {lsc.require} reviewers approved"
351 # If reviewers were not defined (default LSC config),
352 # then we show an error.
353 if not lsc.reviewers:
354 status = Status.FAIL
355 description = (
356 "Large-scale change: configuration required (no reviewers defined)"
357 )
359 return PullRequestResults(
360 status=status,
361 description=description,
362 labels=lsc.labels,
363 large_scale_change_results=LargeScaleChangeResults(
364 large_scale_change=lsc,
365 status=status,
366 points=review_points,
367 points_pending=pending_points,
368 reviews=reviews,
369 ),
370 scope_results={},
371 path_results={},
372 code_results={},
373 review_results=review_results,
374 pullrequest=self,
375 config_results=config_results,
376 config_paths_modified=[],
377 additions=additions,
378 deletions=deletions,
379 )
382class LargeScaleChangeResults(BaseModel):
383 model_config = ConfigDict(extra="forbid")
385 large_scale_change: LargeScaleChangeModel
386 status: Status
387 points: int
388 points_pending: int
389 reviews: list[str]
392class PullRequestResults(BaseModel):
393 model_config = ConfigDict(extra="forbid")
395 # No defaults in this model, so we will always get all fields represented in the export
396 status: Status
397 description: str
398 labels: list[str]
399 # comments?
401 config_paths_modified: list[str] = Field(
402 default_factory=list
403 ) # Paths that were modified in the PR
405 # Diff statistics calculated during processing
406 additions: int | None = None
407 deletions: int | None = None
409 pullrequest: PullRequest
411 large_scale_change_results: LargeScaleChangeResults | None
412 scope_results: dict[str, ScopeResult]
413 path_results: dict[str, PathResult]
414 code_results: dict[str, CodeResult]
415 review_results: dict[str, ReviewResult] # Latest reviews and their scopes...
416 config_results: dict[str, ConfigResult]
418 def as_dict(self) -> dict[str, Any]:
419 """
420 Dump the results as a dictionary and remove any values that aren't the same
421 as the defaults (we always use "empty" defaults) -- this keeps the stored JSON more minimal.
423 In the UI, the actual models are reloaded from the dict, so it is ok that we don't have all the information in the stored dict.
424 """
425 return self.model_dump(exclude_defaults=True)
427 @classmethod
428 def from_dict(cls, data: dict[str, Any]) -> PullRequestResults:
429 return cls(**data)
431 def get_scope_results_by_name(self, names: list[str]) -> list[ScopeResult]:
432 """
433 Get scopes by name (from other result objects),
434 and return them in as ordered_scope_results() order.
435 """
436 filtered_scopes = [
437 scope_result
438 for scope_result in self.ordered_scope_results()
439 if scope_result.scope.name in names
440 ]
441 return filtered_scopes
443 def ordered_scope_results(self) -> list[ScopeResult]:
444 """Order by ownership (primary will naturally come first, then appended, then global)"""
445 return sorted(
446 self.scope_results.values(),
447 key=lambda s: s.scope.ownership,
448 )
450 def scope_results_pending(self) -> list[ScopeResult]:
451 """Get all scope results that are pending"""
452 return [
453 scope_result
454 for scope_result in self.scope_results.values()
455 if scope_result.status == Status.PENDING
456 and scope_result.scope.ownership != OwnershipChoices.GLOBAL
457 ]
459 def path_results_pending(self) -> list[PathResult]:
460 """Get all path results that are pending"""
461 return [
462 path_result
463 for path_result in self.path_results.values()
464 if path_result.status == Status.PENDING
465 ]
467 def code_results_pending(self) -> list[CodeResult]:
468 """Get all code results that are pending"""
469 return [
470 code_result
471 for code_result in self.code_results.values()
472 if code_result.status == Status.PENDING
473 ]
475 def status_for_scope_names(self, scope_names: list[str]) -> Status:
476 """
477 Get the status for a list of scopes.
478 This is used to get the status for a list of scopes.
479 """
480 scope_results = [self.scope_results[scope_name] for scope_name in scope_names]
482 # If there's a single scope, use that result (whether it is global, or normal, etc)
483 if len(scope_results) == 1:
484 return scope_results[0].status
486 # If any scope failed, then we fail
487 if any(scope.status == Status.FAIL for scope in scope_results):
488 return Status.FAIL
490 global_scopes = [
491 scope
492 for scope in scope_results
493 if scope.scope.ownership == OwnershipChoices.GLOBAL
494 ]
495 nonglobal_scopes = [
496 scope
497 for scope in scope_results
498 if scope.scope.ownership != OwnershipChoices.GLOBAL
499 ]
501 # If any global scopes approved, then we pass
502 if any(scope.status == Status.PASS for scope in global_scopes):
503 return Status.PASS
505 # If all regular scopes approved, then we pass
506 if all(scope.status == Status.PASS for scope in nonglobal_scopes):
507 return Status.PASS
509 return Status.PENDING
511 def reviews_for_scope_names(self, scope_names: list[str]) -> list[str]:
512 """
513 Get the reviews for a list of scopes.
514 This is used to get the reviews for a list of scopes.
515 """
516 scope_results = [self.scope_results[scope_name] for scope_name in scope_names]
517 reviews = []
518 for scope in scope_results:
519 reviews.extend(scope.reviews)
520 return reviews
522 def get_scopes_for_review(self, review_id: str) -> list[ScopeResult]:
523 """
524 Get all scopes that this review satisfies.
525 Returns a list of ScopeResult objects where the review_id is in their reviews list.
526 """
527 return [
528 scope_result
529 for scope_result in self.scope_results.values()
530 if review_id in scope_result.reviews
531 ]
533 def compute_status(self) -> Status:
534 if self.pullrequest.draft:
535 return Status.PENDING
537 # Assume passing status to start
538 # TODO is this the unmatched status? what if there are no enabled scopes
539 status = Status.PASS
541 for path_results in self.path_results.values():
542 if path_results.status == Status.FAIL:
543 return Status.FAIL # Immediately fail if any fail
544 elif path_results.status == Status.PENDING:
545 status = Status.PENDING # Move to pending (could fail later)
547 for code_results in self.code_results.values():
548 if code_results.status == Status.FAIL:
549 return Status.FAIL # Immediately fail if any fail
550 elif code_results.status == Status.PENDING:
551 status = Status.PENDING # Move to pending (could fail later)
553 return status
555 def compute_description(self) -> str:
556 if self.pullrequest.draft:
557 return "Draft is not ready for review"
559 if self.status == Status.PASS:
560 # In success, want to know how many scopes passed
561 scopes_passed = [
562 scope
563 for scope in self.scope_results.values()
564 if scope.status == Status.PASS
565 ]
567 if not scopes_passed:
568 # If the status was pass, but there are no scopes, then there were none assigned
569 return "No review scopes are required"
571 scope_text = "scope" if len(scopes_passed) == 1 else "scopes"
572 return f"{len(scopes_passed)} review {scope_text} passed"
573 elif self.status == Status.FAIL:
574 scopes_failed = [
575 scope
576 for scope in self.scope_results.values()
577 if scope.status == Status.FAIL
578 ]
579 scope_text = "scope" if len(scopes_failed) == 1 else "scopes"
580 return f"{len(scopes_failed)} review {scope_text} failed"
581 elif self.status == Status.PENDING:
582 # In pending, want to know how many scopes are pending
583 scopes_passed = [
584 scope
585 for scope in self.scope_results.values()
586 if scope.status == Status.PASS
587 ]
588 scopes_pending = [
589 scope
590 for scope in self.scope_results.values()
591 if scope.status == Status.PENDING
592 and scope.scope.ownership != OwnershipChoices.GLOBAL
593 ]
594 pending_text = "scope" if len(scopes_pending) == 1 else "scopes"
595 if scopes_passed:
596 passed_text = "scope" if len(scopes_passed) == 1 else "scopes"
597 return f"{len(scopes_pending)} review {pending_text} pending, {len(scopes_passed)} review {passed_text} passed"
598 else:
599 return f"{len(scopes_pending)} review {pending_text} pending"
600 else:
601 return ""
603 def compute_labels(self) -> list[str]:
604 labels = set()
606 for scope_result in self.scope_results.values():
607 labels.update(scope_result.scope.labels)
609 return list(labels)
611 def compute_overview(self) -> str:
612 """Build a concise markdown overview for GitHub pull request comments."""
613 overview = ""
614 overview += f"**{self.status.value}**: {self.description}\n\n"
616 if self.large_scale_change_results:
617 lsc = self.large_scale_change_results
618 overview += (
619 "### Large Scale Change\n\n"
620 f"- Status: {lsc.status.value}\n"
621 f"- Points: {lsc.points} (Pending: {lsc.points_pending})\n"
622 )
624 overview += "## Matched Scopes\n\n"
625 matched_scopes = [
626 sr
627 for sr in self.ordered_scope_results()
628 if sr.matched_paths or sr.matched_code
629 ]
631 if matched_scopes:
632 for scope_result in matched_scopes:
633 line = (
634 f"- **{scope_result.scope.printed_name()}**: {scope_result.status.value}"
635 f" ({scope_result.points}/{scope_result.scope.require})"
636 )
637 if scope_result.scope.cc:
638 line += " cc: " + " ".join(f"@{u}" for u in scope_result.scope.cc)
639 overview += line + "\n"
641 if scope_result.scope.instructions:
642 overview += (
643 " <details>\n"
644 f" {scope_result.scope.instructions}\n"
645 " </details>\n"
646 )
647 else:
648 overview += "- None\n"
650 return overview
652 def rebuild_config_models(self) -> ConfigModels:
653 """
654 Rebuild the ConfigModels from the config_results.
655 This is useful for when we want to get the configs back from the results.
656 """
657 configs = ConfigModels(root={})
658 for path, config_result in self.config_results.items():
659 configs.add_config(config_result.config, Path(path))
660 return configs
663class ConfigResult(BaseModel):
664 model_config = ConfigDict(extra="forbid")
666 config: ConfigModel
668 @classmethod
669 def from_config_model(cls, config_model: ConfigModel) -> ConfigResult:
670 return cls(
671 config=config_model,
672 )
675class ReviewResult(BaseModel):
676 model_config = ConfigDict(extra="forbid")
678 review: Review
679 scopes: list[str] # Explicit Reviewed-for scopes (empty if not specified)
682class ScopeResult(BaseModel):
683 model_config = ConfigDict(extra="forbid")
685 scope: ScopeModel
686 status: Status # and/or review_status?
687 points: int
688 points_pending: int
690 reviews: list[str] # Review references
691 matched_paths: list[str] # Path result references
692 matched_code: list[str] # Code result references
694 def is_notable(self) -> bool:
695 # In some cases, we don't care much about scopes that are global and not in use, for example
696 if (
697 self.scope.ownership == OwnershipChoices.GLOBAL
698 and self.status == Status.PENDING
699 ):
700 return False
701 return True
703 def reviewers_to_request(
704 self, pullrequest_results: PullRequestResults
705 ) -> list[str]:
706 if self.scope.request == 0 or not self.scope.reviewers:
707 return []
709 additional_reviewers_needed = (
710 self.scope.request - self.points - self.points_pending
711 )
712 if additional_reviewers_needed <= 0:
713 return []
715 already_reviewed = pullrequest_results.review_results.values()
717 # Filter out wildcard and already reviewed users
718 eligible_logins = [
719 login
720 for login in self.scope.reviewers
721 if login != "*" and login not in already_reviewed
722 ]
724 # Remove the author from the list of eligible reviewers
725 if pullrequest_results.pullrequest.author.username in eligible_logins:
726 eligible_logins.remove(pullrequest_results.pullrequest.author.username)
728 if self.scope.request < 0:
729 return eligible_logins
731 # Put the reviewers in a predictable random order for this PR
732 Random(pullrequest_results.pullrequest.number).shuffle(eligible_logins)
734 return eligible_logins[:additional_reviewers_needed]
737class PathResult(BaseModel):
738 model_config = ConfigDict(extra="forbid")
740 path: ScopePathMatch
741 status: Status
742 reviews: list[str] # Review references
745class CodeResult(BaseModel):
746 model_config = ConfigDict(extra="forbid")
748 code: ScopeCodeMatch
749 status: Status
750 reviews: list[str] # Review references