Coverage for src/chat_limiter/batch.py: 82%

331 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-11 13:37 +0100

1""" 

2Batch processing functionality for handling multiple requests efficiently. 

3""" 

4 

5import asyncio 

6import logging 

7import traceback 

8from abc import ABC, abstractmethod 

9from collections.abc import Callable 

10from concurrent.futures import ThreadPoolExecutor, as_completed 

11from dataclasses import dataclass, field 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Generic, 

16 TypeVar, 

17) 

18 

19import httpx 

20 

21from tqdm.asyncio import tqdm as atqdm 

22from tqdm import tqdm 

23 

24if TYPE_CHECKING: 

25 pass 

26 

27from .limiter import ChatLimiter 

28from .types import ChatCompletionRequest, ChatCompletionResponse 

29 

30logger = logging.getLogger(__name__) 

31 

32# Type variables for generic batch processing 

33BatchItemT = TypeVar("BatchItemT") 

34BatchResultT = TypeVar("BatchResultT") 

35 

36 

37@dataclass 

38class BatchConfig: 

39 """Configuration for batch processing.""" 

40 

41 # Concurrency settings 

42 max_concurrent_requests: int = 10 

43 max_workers: int = 4 # For sync processing 

44 

45 # Retry settings 

46 max_retries_per_item: int = 3 

47 retry_delay: float = 1.0 

48 

49 # Progress tracking 

50 show_progress: bool = True 

51 progress_desc: str = "Processing batch" 

52 

53 # Error handling 

54 stop_on_first_error: bool = False 

55 collect_errors: bool = True 

56 verbose: bool = False 

57 

58 # Batch size optimization 

59 adaptive_batch_size: bool = True 

60 min_batch_size: int = 1 

61 max_batch_size: int = 100 

62 

63 # Request grouping 

64 group_by_model: bool = True 

65 group_by_provider: bool = True 

66 

67 

68@dataclass 

69class BatchItem(Generic[BatchItemT]): 

70 """A single item in a batch request.""" 

71 

72 # Item data 

73 data: BatchItemT 

74 

75 # Request configuration 

76 method: str = "POST" 

77 url: str = "/chat/completions" 

78 json_data: dict[str, Any] | None = None 

79 

80 # Metadata 

81 id: str | None = None 

82 metadata: dict[str, Any] = field(default_factory=dict) 

83 

84 # Processing state 

85 attempt_count: int = 0 

86 last_error: Exception | None = None 

87 

88 

89@dataclass 

90class BatchResult(Generic[BatchResultT]): 

91 """Result of processing a batch item.""" 

92 

93 # Original item 

94 item: "BatchItem[Any]" 

95 

96 # Result data 

97 result: BatchResultT | None = None 

98 error: Exception | None = None 

99 

100 # Processing metadata 

101 success: bool = False 

102 duration: float = 0.0 

103 attempt_count: int = 0 

104 

105 # Response metadata 

106 response_headers: dict[str, str] = field(default_factory=dict) 

107 status_code: int | None = None 

108 

109 

110class BatchProcessor(ABC, Generic[BatchItemT, BatchResultT]): 

111 """Abstract base class for batch processing.""" 

112 

113 def __init__( 

114 self, 

115 limiter: ChatLimiter, 

116 config: BatchConfig | None = None, 

117 ): 

118 self.limiter = limiter 

119 self.config = config or BatchConfig() 

120 self._results: list[BatchResult[BatchResultT]] = [] 

121 self._errors: list[Exception] = [] 

122 

123 # Enable verbose mode on limiter if config specifies it 

124 if hasattr(self.limiter, 'set_verbose_mode'): 

125 self.limiter.set_verbose_mode(self.config.verbose) 

126 

127 @abstractmethod 

128 async def process_item(self, item: BatchItem[BatchItemT]) -> BatchResultT: 

129 """Process a single batch item.""" 

130 pass 

131 

132 @abstractmethod 

133 def process_item_sync(self, item: BatchItem[BatchItemT]) -> BatchResultT: 

134 """Process a single batch item synchronously.""" 

135 pass 

136 

137 def create_batch_items( 

138 self, 

139 items: list[BatchItemT], 

140 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None, 

141 ) -> list[BatchItem[BatchItemT]]: 

142 """Create batch items from raw data.""" 

143 batch_items = [] 

144 

145 for i, item in enumerate(items): 

146 batch_item = BatchItem( 

147 data=item, 

148 id=f"item_{i}", 

149 ) 

150 

151 # Configure request if function provided 

152 if request_fn: 

153 method, url, json_data = request_fn(item) 

154 batch_item.method = method 

155 batch_item.url = url 

156 batch_item.json_data = json_data 

157 

158 batch_items.append(batch_item) 

159 

160 return batch_items 

161 

162 async def process_batch( 

163 self, 

164 items: list[BatchItemT] | list[BatchItem[BatchItemT]], 

165 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None, 

166 ) -> list[BatchResult[BatchResultT]]: 

167 """Process a batch of items asynchronously.""" 

168 # Convert to batch items if needed 

169 if items and not isinstance(items[0], BatchItem): 

170 batch_items = self.create_batch_items(items, request_fn) # type: ignore 

171 else: 

172 batch_items = items # type: ignore 

173 

174 # Group items if configured 

175 if self.config.group_by_model or self.config.group_by_provider: 

176 grouped_items = self._group_items(batch_items) 

177 else: 

178 grouped_items = {"default": batch_items} 

179 

180 # Process groups 

181 all_results = [] 

182 

183 # Calculate total items for progress tracking 

184 total_items = sum(len(group_items) for group_items in grouped_items.values()) 

185 

186 # Initialize progress bar if enabled 

187 progress_bar = None 

188 if self.config.show_progress: 

189 progress_bar = tqdm( 

190 total=total_items, 

191 desc=self.config.progress_desc, 

192 unit="item" 

193 ) 

194 

195 for group_name, group_items in grouped_items.items(): 

196 logger.info( 

197 f"Processing group '{group_name}' with {len(group_items)} items" 

198 ) 

199 

200 # Create semaphore for concurrency control 

201 semaphore = asyncio.Semaphore(self.config.max_concurrent_requests) 

202 

203 # Process items with concurrency control and progress tracking 

204 tasks = [ 

205 self._process_item_with_retry(item, semaphore, progress_bar) for item in group_items 

206 ] 

207 

208 # Wait for all tasks to complete 

209 group_results = await asyncio.gather(*tasks, return_exceptions=True) 

210 

211 # Handle exceptions from gather 

212 for i, result in enumerate(group_results): 

213 if isinstance(result, Exception): 

214 # Create error result 

215 error_result: BatchResult[BatchResultT] = BatchResult( 

216 item=group_items[i], 

217 error=result, 

218 success=False, 

219 attempt_count=group_items[i].attempt_count, 

220 ) 

221 all_results.append(error_result) 

222 else: 

223 all_results.append(result) # type: ignore 

224 

225 # Close progress bar if it was created 

226 if progress_bar: 

227 progress_bar.close() 

228 

229 self._results = all_results 

230 return all_results 

231 

232 def process_batch_sync( 

233 self, 

234 items: list[BatchItemT] | list[BatchItem[BatchItemT]], 

235 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None, 

236 ) -> list[BatchResult[BatchResultT]]: 

237 """Process a batch of items synchronously.""" 

238 # Convert to batch items if needed 

239 if items and not isinstance(items[0], BatchItem): 

240 batch_items = self.create_batch_items(items, request_fn) # type: ignore 

241 else: 

242 batch_items = items # type: ignore 

243 

244 # Group items if configured 

245 if self.config.group_by_model or self.config.group_by_provider: 

246 grouped_items = self._group_items(batch_items) 

247 else: 

248 grouped_items = {"default": batch_items} 

249 

250 # Calculate total items for progress tracking 

251 total_items = sum(len(group_items) for group_items in grouped_items.values()) 

252 

253 # Initialize progress bar if enabled 

254 progress_bar = None 

255 if self.config.show_progress: 

256 progress_bar = tqdm( 

257 total=total_items, 

258 desc=self.config.progress_desc, 

259 unit="item" 

260 ) 

261 

262 # Process groups 

263 all_results = [] 

264 for group_name, group_items in grouped_items.items(): 

265 logger.info( 

266 f"Processing group '{group_name}' with {len(group_items)} items" 

267 ) 

268 

269 # Use ThreadPoolExecutor for concurrent processing 

270 with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: 

271 # Submit all tasks 

272 future_to_item = { 

273 executor.submit(self._process_item_sync_with_retry, item, progress_bar): item 

274 for item in group_items 

275 } 

276 

277 # Collect results 

278 for future in as_completed(future_to_item): 

279 item = future_to_item[future] 

280 try: 

281 result = future.result() 

282 all_results.append(result) 

283 except Exception as e: 

284 error_result: BatchResult[BatchResultT] = BatchResult( 

285 item=item, 

286 error=e, 

287 success=False, 

288 attempt_count=item.attempt_count, 

289 ) 

290 all_results.append(error_result) 

291 

292 # Close progress bar if it was created 

293 if progress_bar: 

294 progress_bar.close() 

295 

296 self._results = all_results 

297 return all_results 

298 

299 def _group_items( 

300 self, items: list[BatchItem[BatchItemT]] 

301 ) -> dict[str, list[BatchItem[BatchItemT]]]: 

302 """Group items by model or provider.""" 

303 groups: dict[str, list[BatchItem[BatchItemT]]] = {} 

304 

305 for item in items: 

306 # Determine group key 

307 group_key = "default" 

308 

309 if ( 

310 self.config.group_by_model 

311 and item.json_data 

312 and "model" in item.json_data 

313 ): 

314 group_key = item.json_data["model"] 

315 elif self.config.group_by_provider: 

316 group_key = self.limiter.provider.value 

317 

318 # Add to group 

319 if group_key not in groups: 

320 groups[group_key] = [] 

321 groups[group_key].append(item) 

322 

323 return groups 

324 

325 async def _process_item_with_retry( 

326 self, 

327 item: BatchItem[BatchItemT], 

328 semaphore: asyncio.Semaphore, 

329 progress_bar: tqdm | None = None, 

330 ) -> BatchResult[BatchResultT]: 

331 """Process a single item with retry logic.""" 

332 async with semaphore: 

333 import time 

334 

335 start_time = time.time() 

336 

337 for attempt in range(self.config.max_retries_per_item + 1): 

338 item.attempt_count = attempt + 1 

339 

340 try: 

341 # Process the item 

342 result = await self.process_item(item) 

343 

344 # Update progress bar on success 

345 if progress_bar: 

346 progress_bar.update(1) 

347 

348 # Success 

349 return BatchResult( 

350 item=item, 

351 result=result, 

352 success=True, 

353 duration=time.time() - start_time, 

354 attempt_count=item.attempt_count, 

355 ) 

356 

357 except Exception as e: 

358 item.last_error = e 

359 

360 # Check if this is a timeout error 

361 is_timeout_error = ( 

362 isinstance(e, (httpx.ReadTimeout, httpx.ConnectTimeout)) or 

363 (hasattr(e, '__cause__') and isinstance(e.__cause__, (httpx.ReadTimeout, httpx.ConnectTimeout))) or 

364 'ReadTimeout' in str(type(e)) or 'timeout' in str(e).lower() 

365 ) 

366 

367 # Print user-friendly error messages 

368 if self.config.verbose: 

369 if is_timeout_error: 

370 # Get current timeout from the limiter 

371 current_timeout = getattr(self.limiter, '_user_timeout', 120.0) 

372 print(f"⏱️ TIMEOUT ERROR in batch item {item.id} (attempt {attempt + 1}):") 

373 print(f" Current timeout setting: {current_timeout} seconds") 

374 print(f" The request took longer than {current_timeout}s to complete.") 

375 print(f"") 

376 print(f"💡 How to fix this:") 

377 print(f" 1. Increase timeout: ChatLimiter.for_model('{getattr(self.limiter, 'provider', 'your-model')}', timeout={current_timeout + 60})") 

378 print(f" 2. Reduce concurrency: BatchConfig(max_concurrent_requests={max(1, self.config.max_concurrent_requests // 2)})") 

379 print(f" 3. Current concurrency: {self.config.max_concurrent_requests} requests") 

380 print(f"") 

381 else: 

382 print(f"❌ Exception in batch item {item.id} (attempt {attempt + 1}):") 

383 

384 traceback.print_exc() 

385 

386 # If this is the last attempt or we should stop on error 

387 if ( 

388 attempt == self.config.max_retries_per_item 

389 or self.config.stop_on_first_error 

390 ): 

391 # Update progress bar on final failure 

392 if progress_bar: 

393 progress_bar.update(1) 

394 

395 return BatchResult( 

396 item=item, 

397 error=e, 

398 success=False, 

399 duration=time.time() - start_time, 

400 attempt_count=item.attempt_count, 

401 ) 

402 

403 # Wait before retry - longer for timeout errors 

404 if is_timeout_error: 

405 # For timeout errors, wait longer and suggest more aggressive backing off 

406 retry_delay = self.config.retry_delay * (3**attempt) # More aggressive backoff 

407 else: 

408 retry_delay = self.config.retry_delay * (2**attempt) 

409 

410 await asyncio.sleep(retry_delay) 

411 

412 # This should never be reached, but added for type checking 

413 return BatchResult( 

414 item=item, 

415 error=Exception("Unexpected error in retry logic"), 

416 success=False, 

417 duration=time.time() - start_time, 

418 attempt_count=item.attempt_count, 

419 ) 

420 

421 def _process_item_sync_with_retry( 

422 self, 

423 item: BatchItem[BatchItemT], 

424 progress_bar: tqdm | None = None, 

425 ) -> BatchResult[BatchResultT]: 

426 """Process a single item with retry logic (sync).""" 

427 import time 

428 

429 start_time = time.time() 

430 

431 for attempt in range(self.config.max_retries_per_item + 1): 

432 item.attempt_count = attempt + 1 

433 

434 try: 

435 # Process the item 

436 result = self.process_item_sync(item) 

437 

438 # Update progress bar on success 

439 if progress_bar: 

440 progress_bar.update(1) 

441 

442 # Success 

443 return BatchResult( 

444 item=item, 

445 result=result, 

446 success=True, 

447 duration=time.time() - start_time, 

448 attempt_count=item.attempt_count, 

449 ) 

450 

451 except Exception as e: 

452 item.last_error = e 

453 

454 # Print traceback if verbose mode is enabled 

455 if self.config.verbose: 

456 print(f"Exception in batch item {item.id} (attempt {attempt + 1}):") 

457 traceback.print_exc() 

458 

459 # If this is the last attempt or we should stop on error 

460 if ( 

461 attempt == self.config.max_retries_per_item 

462 or self.config.stop_on_first_error 

463 ): 

464 # Update progress bar on final failure 

465 if progress_bar: 

466 progress_bar.update(1) 

467 

468 return BatchResult( 

469 item=item, 

470 error=e, 

471 success=False, 

472 duration=time.time() - start_time, 

473 attempt_count=item.attempt_count, 

474 ) 

475 

476 # Wait before retry 

477 time.sleep(self.config.retry_delay * (2**attempt)) 

478 

479 # This should never be reached, but added for type checking 

480 return BatchResult( 

481 item=item, 

482 error=Exception("Unexpected error in retry logic"), 

483 success=False, 

484 duration=time.time() - start_time, 

485 attempt_count=item.attempt_count, 

486 ) 

487 

488 def get_success_rate(self) -> float: 

489 """Get the success rate of the last batch.""" 

490 if not self._results: 

491 return 0.0 

492 

493 successful = sum(1 for r in self._results if r.success) 

494 return successful / len(self._results) 

495 

496 def get_successful_results(self) -> list[BatchResult[BatchResultT]]: 

497 """Get only successful results.""" 

498 return [r for r in self._results if r.success] 

499 

500 def get_failed_results(self) -> list[BatchResult[BatchResultT]]: 

501 """Get only failed results.""" 

502 return [r for r in self._results if not r.success] 

503 

504 def get_stats(self) -> dict[str, Any]: 

505 """Get comprehensive processing statistics.""" 

506 if not self._results: 

507 return {"total": 0, "successful": 0, "failed": 0, "success_rate": 0.0} 

508 

509 successful = self.get_successful_results() 

510 failed = self.get_failed_results() 

511 

512 # Calculate timing statistics 

513 durations = [r.duration for r in self._results] 

514 avg_duration = sum(durations) / len(durations) if durations else 0 

515 

516 return { 

517 "total": len(self._results), 

518 "successful": len(successful), 

519 "failed": len(failed), 

520 "success_rate": self.get_success_rate(), 

521 "avg_duration": avg_duration, 

522 "total_duration": sum(durations), 

523 "avg_attempts": sum(r.attempt_count for r in self._results) 

524 / len(self._results), 

525 } 

526 

527 

528class ChatBatchProcessor(BatchProcessor[dict[str, Any], dict[str, Any]]): 

529 """Batch processor for chat completion requests.""" 

530 

531 async def process_item(self, item: BatchItem[dict[str, Any]]) -> dict[str, Any]: 

532 """Process a single chat completion request.""" 

533 request_data = item.json_data or item.data 

534 

535 # Log prompt if verbose mode is enabled 

536 if self.config.verbose: 

537 print(f"\n--- PROMPT (Item {item.id}) ---") 

538 if "messages" in request_data: 

539 for msg in request_data["messages"]: 

540 role = msg.get("role", "unknown") 

541 content = msg.get("content", "") 

542 print(f"{role.upper()}: {content}") 

543 else: 

544 print(f"REQUEST DATA: {request_data}") 

545 print("--- END PROMPT ---\n") 

546 

547 # Make the request using the limiter 

548 response = await self.limiter.request( 

549 method=item.method, 

550 url=item.url, 

551 json=request_data, 

552 ) 

553 

554 # Parse response 

555 response.raise_for_status() 

556 result: dict[str, Any] = response.json() 

557 

558 # Log response if verbose mode is enabled 

559 if self.config.verbose: 

560 print(f"\n--- RESPONSE (Item {item.id}) ---") 

561 if "choices" in result and result["choices"]: 

562 for i, choice in enumerate(result["choices"]): 

563 if "message" in choice: 

564 content = choice["message"].get("content", "") 

565 print(f"CHOICE {i}: {content}") 

566 elif "text" in choice: 

567 print(f"CHOICE {i}: {choice['text']}") 

568 else: 

569 print(f"FULL RESPONSE: {result}") 

570 print("--- END RESPONSE ---\n") 

571 

572 # Store response metadata 

573 item.metadata["response_headers"] = dict(response.headers) 

574 item.metadata["status_code"] = response.status_code 

575 

576 return result 

577 

578 def process_item_sync(self, item: BatchItem[dict[str, Any]]) -> dict[str, Any]: 

579 """Process a single chat completion request synchronously.""" 

580 request_data = item.json_data or item.data 

581 

582 # Log prompt if verbose mode is enabled 

583 if self.config.verbose: 

584 print(f"\n--- PROMPT (Item {item.id}) ---") 

585 if "messages" in request_data: 

586 for msg in request_data["messages"]: 

587 role = msg.get("role", "unknown") 

588 content = msg.get("content", "") 

589 print(f"{role.upper()}: {content}") 

590 else: 

591 print(f"REQUEST DATA: {request_data}") 

592 print("--- END PROMPT ---\n") 

593 

594 # Make the request using the limiter 

595 response = self.limiter.request_sync( 

596 method=item.method, 

597 url=item.url, 

598 json=request_data, 

599 ) 

600 

601 # Parse response 

602 response.raise_for_status() 

603 result: dict[str, Any] = response.json() 

604 

605 # Log response if verbose mode is enabled 

606 if self.config.verbose: 

607 print(f"\n--- RESPONSE (Item {item.id}) ---") 

608 if "choices" in result and result["choices"]: 

609 for i, choice in enumerate(result["choices"]): 

610 if "message" in choice: 

611 content = choice["message"].get("content", "") 

612 print(f"CHOICE {i}: {content}") 

613 elif "text" in choice: 

614 print(f"CHOICE {i}: {choice['text']}") 

615 else: 

616 print(f"FULL RESPONSE: {result}") 

617 print("--- END RESPONSE ---\n") 

618 

619 # Store response metadata 

620 item.metadata["response_headers"] = dict(response.headers) 

621 item.metadata["status_code"] = response.status_code 

622 

623 return result 

624 

625 

626# Convenience functions for common use cases 

627async def process_chat_batch( 

628 limiter: ChatLimiter, 

629 requests: list[dict[str, Any]], 

630 config: BatchConfig | None = None, 

631) -> list[BatchResult[dict[str, Any]]]: 

632 """ 

633 Process a batch of chat completion requests. 

634 

635 Args: 

636 limiter: Configured ChatLimiter instance 

637 requests: List of request data (will be sent as JSON) 

638 config: Optional batch processing configuration 

639 

640 Returns: 

641 List of batch results 

642 """ 

643 processor = ChatBatchProcessor(limiter, config) 

644 return await processor.process_batch(requests) 

645 

646 

647def process_chat_batch_sync( 

648 limiter: ChatLimiter, 

649 requests: list[dict[str, Any]], 

650 config: BatchConfig | None = None, 

651) -> list[BatchResult[dict[str, Any]]]: 

652 """ 

653 Process a batch of chat completion requests synchronously. 

654 

655 Args: 

656 limiter: Configured ChatLimiter instance 

657 requests: List of request data (will be sent as JSON) 

658 config: Optional batch processing configuration 

659 

660 Returns: 

661 List of batch results 

662 """ 

663 processor = ChatBatchProcessor(limiter, config) 

664 return processor.process_batch_sync(requests) 

665 

666 

667# High-level chat completion batch processing 

668class ChatCompletionBatchProcessor(BatchProcessor[ChatCompletionRequest, ChatCompletionResponse]): 

669 """High-level batch processor for chat completion requests.""" 

670 

671 async def process_item(self, item: BatchItem[ChatCompletionRequest]) -> ChatCompletionResponse: 

672 """Process a single chat completion request using high-level interface.""" 

673 request = item.data 

674 

675 # Log prompt if verbose mode is enabled 

676 if self.config.verbose: 

677 print(f"\n--- PROMPT (Item {item.id}) ---") 

678 print(f"MODEL: {request.model}") 

679 for msg in request.messages: 

680 print(f"{msg.role.value.upper()}: {msg.content}") 

681 print("--- END PROMPT ---\n") 

682 

683 # Use the high-level chat completion method 

684 response = await self.limiter.chat_completion( 

685 model=request.model, 

686 messages=request.messages, 

687 max_tokens=request.max_tokens, 

688 temperature=request.temperature, 

689 top_p=request.top_p, 

690 stop=request.stop, 

691 stream=request.stream, 

692 # Provider-specific parameters 

693 frequency_penalty=request.frequency_penalty, 

694 presence_penalty=request.presence_penalty, 

695 top_k=request.top_k, 

696 ) 

697 

698 # Log response if verbose mode is enabled 

699 if self.config.verbose: 

700 print(f"\n--- RESPONSE (Item {item.id}) ---") 

701 print(f"MODEL: {response.model}") 

702 if response.choices: 

703 for i, choice in enumerate(response.choices): 

704 print(f"CHOICE {i}: {choice.message.content}") 

705 print("--- END RESPONSE ---\n") 

706 

707 return response 

708 

709 def process_item_sync(self, item: BatchItem[ChatCompletionRequest]) -> ChatCompletionResponse: 

710 """Process a single chat completion request synchronously using high-level interface.""" 

711 request = item.data 

712 

713 # Log prompt if verbose mode is enabled 

714 if self.config.verbose: 

715 print(f"\n--- PROMPT (Item {item.id}) ---") 

716 print(f"MODEL: {request.model}") 

717 for msg in request.messages: 

718 print(f"{msg.role.value.upper()}: {msg.content}") 

719 print("--- END PROMPT ---\n") 

720 

721 # Use the high-level chat completion method (sync) 

722 response = self.limiter.chat_completion_sync( 

723 model=request.model, 

724 messages=request.messages, 

725 max_tokens=request.max_tokens, 

726 temperature=request.temperature, 

727 top_p=request.top_p, 

728 stop=request.stop, 

729 stream=request.stream, 

730 # Provider-specific parameters 

731 frequency_penalty=request.frequency_penalty, 

732 presence_penalty=request.presence_penalty, 

733 top_k=request.top_k, 

734 ) 

735 

736 # Log response if verbose mode is enabled 

737 if self.config.verbose: 

738 print(f"\n--- RESPONSE (Item {item.id}) ---") 

739 print(f"MODEL: {response.model}") 

740 if response.choices: 

741 for i, choice in enumerate(response.choices): 

742 print(f"CHOICE {i}: {choice.message.content}") 

743 print("--- END RESPONSE ---\n") 

744 

745 return response 

746 

747 

748# Convenience functions for high-level chat completion batches 

749async def process_chat_completion_batch( 

750 limiter: ChatLimiter, 

751 requests: list[ChatCompletionRequest], 

752 config: BatchConfig | None = None, 

753) -> list[BatchResult[ChatCompletionResponse]]: 

754 """ 

755 Process a batch of high-level chat completion requests. 

756 

757 Args: 

758 limiter: Configured ChatLimiter instance 

759 requests: List of ChatCompletionRequest objects 

760 config: Optional batch processing configuration 

761 

762 Returns: 

763 List of batch results containing ChatCompletionResponse objects 

764 

765 Example: 

766 from chat_limiter import ChatLimiter, Message, MessageRole, ChatCompletionRequest 

767 

768 requests = [ 

769 ChatCompletionRequest( 

770 model="gpt-4o", 

771 messages=[Message(role=MessageRole.USER, content="Hello!")], 

772 max_tokens=50 

773 ), 

774 ChatCompletionRequest( 

775 model="gpt-4o", 

776 messages=[Message(role=MessageRole.USER, content="How are you?")], 

777 max_tokens=50 

778 ) 

779 ] 

780 

781 async with ChatLimiter.for_model("gpt-4o", api_key) as limiter: 

782 results = await process_chat_completion_batch(limiter, requests) 

783 """ 

784 processor = ChatCompletionBatchProcessor(limiter, config) 

785 return await processor.process_batch(requests) 

786 

787 

788def process_chat_completion_batch_sync( 

789 limiter: ChatLimiter, 

790 requests: list[ChatCompletionRequest], 

791 config: BatchConfig | None = None, 

792) -> list[BatchResult[ChatCompletionResponse]]: 

793 """ 

794 Process a batch of high-level chat completion requests synchronously. 

795 

796 Args: 

797 limiter: Configured ChatLimiter instance 

798 requests: List of ChatCompletionRequest objects 

799 config: Optional batch processing configuration 

800 

801 Returns: 

802 List of batch results containing ChatCompletionResponse objects 

803 """ 

804 processor = ChatCompletionBatchProcessor(limiter, config) 

805 return processor.process_batch_sync(requests) 

806 

807 

808# Helper function for creating chat completion requests from simple data 

809def create_chat_completion_requests( 

810 model: str, 

811 prompts: list[str], 

812 max_tokens: int | None = None, 

813 temperature: float | None = None, 

814 **kwargs: Any, 

815) -> list[ChatCompletionRequest]: 

816 """ 

817 Create a list of ChatCompletionRequest objects from simple prompts. 

818 

819 Args: 

820 model: The model to use for all requests 

821 prompts: List of user prompts 

822 max_tokens: Maximum tokens per completion 

823 temperature: Sampling temperature 

824 **kwargs: Additional parameters for all requests 

825 

826 Returns: 

827 List of ChatCompletionRequest objects 

828 

829 Example: 

830 requests = create_chat_completion_requests( 

831 model="gpt-4o", 

832 prompts=["Hello!", "How are you?", "What is Python?"], 

833 max_tokens=50, 

834 temperature=0.7 

835 ) 

836 """ 

837 from .types import Message, MessageRole 

838 

839 requests = [] 

840 for prompt in prompts: 

841 request = ChatCompletionRequest( 

842 model=model, 

843 messages=[Message(role=MessageRole.USER, content=prompt)], 

844 max_tokens=max_tokens, 

845 temperature=temperature, 

846 **kwargs 

847 ) 

848 requests.append(request) 

849 

850 return requests