Coverage for src/chat_limiter/batch.py: 82%
331 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 13:37 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 13:37 +0100
1"""
2Batch processing functionality for handling multiple requests efficiently.
3"""
5import asyncio
6import logging
7import traceback
8from abc import ABC, abstractmethod
9from collections.abc import Callable
10from concurrent.futures import ThreadPoolExecutor, as_completed
11from dataclasses import dataclass, field
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Generic,
16 TypeVar,
17)
19import httpx
21from tqdm.asyncio import tqdm as atqdm
22from tqdm import tqdm
24if TYPE_CHECKING:
25 pass
27from .limiter import ChatLimiter
28from .types import ChatCompletionRequest, ChatCompletionResponse
30logger = logging.getLogger(__name__)
32# Type variables for generic batch processing
33BatchItemT = TypeVar("BatchItemT")
34BatchResultT = TypeVar("BatchResultT")
37@dataclass
38class BatchConfig:
39 """Configuration for batch processing."""
41 # Concurrency settings
42 max_concurrent_requests: int = 10
43 max_workers: int = 4 # For sync processing
45 # Retry settings
46 max_retries_per_item: int = 3
47 retry_delay: float = 1.0
49 # Progress tracking
50 show_progress: bool = True
51 progress_desc: str = "Processing batch"
53 # Error handling
54 stop_on_first_error: bool = False
55 collect_errors: bool = True
56 verbose: bool = False
58 # Batch size optimization
59 adaptive_batch_size: bool = True
60 min_batch_size: int = 1
61 max_batch_size: int = 100
63 # Request grouping
64 group_by_model: bool = True
65 group_by_provider: bool = True
68@dataclass
69class BatchItem(Generic[BatchItemT]):
70 """A single item in a batch request."""
72 # Item data
73 data: BatchItemT
75 # Request configuration
76 method: str = "POST"
77 url: str = "/chat/completions"
78 json_data: dict[str, Any] | None = None
80 # Metadata
81 id: str | None = None
82 metadata: dict[str, Any] = field(default_factory=dict)
84 # Processing state
85 attempt_count: int = 0
86 last_error: Exception | None = None
89@dataclass
90class BatchResult(Generic[BatchResultT]):
91 """Result of processing a batch item."""
93 # Original item
94 item: "BatchItem[Any]"
96 # Result data
97 result: BatchResultT | None = None
98 error: Exception | None = None
100 # Processing metadata
101 success: bool = False
102 duration: float = 0.0
103 attempt_count: int = 0
105 # Response metadata
106 response_headers: dict[str, str] = field(default_factory=dict)
107 status_code: int | None = None
110class BatchProcessor(ABC, Generic[BatchItemT, BatchResultT]):
111 """Abstract base class for batch processing."""
113 def __init__(
114 self,
115 limiter: ChatLimiter,
116 config: BatchConfig | None = None,
117 ):
118 self.limiter = limiter
119 self.config = config or BatchConfig()
120 self._results: list[BatchResult[BatchResultT]] = []
121 self._errors: list[Exception] = []
123 # Enable verbose mode on limiter if config specifies it
124 if hasattr(self.limiter, 'set_verbose_mode'):
125 self.limiter.set_verbose_mode(self.config.verbose)
127 @abstractmethod
128 async def process_item(self, item: BatchItem[BatchItemT]) -> BatchResultT:
129 """Process a single batch item."""
130 pass
132 @abstractmethod
133 def process_item_sync(self, item: BatchItem[BatchItemT]) -> BatchResultT:
134 """Process a single batch item synchronously."""
135 pass
137 def create_batch_items(
138 self,
139 items: list[BatchItemT],
140 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None,
141 ) -> list[BatchItem[BatchItemT]]:
142 """Create batch items from raw data."""
143 batch_items = []
145 for i, item in enumerate(items):
146 batch_item = BatchItem(
147 data=item,
148 id=f"item_{i}",
149 )
151 # Configure request if function provided
152 if request_fn:
153 method, url, json_data = request_fn(item)
154 batch_item.method = method
155 batch_item.url = url
156 batch_item.json_data = json_data
158 batch_items.append(batch_item)
160 return batch_items
162 async def process_batch(
163 self,
164 items: list[BatchItemT] | list[BatchItem[BatchItemT]],
165 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None,
166 ) -> list[BatchResult[BatchResultT]]:
167 """Process a batch of items asynchronously."""
168 # Convert to batch items if needed
169 if items and not isinstance(items[0], BatchItem):
170 batch_items = self.create_batch_items(items, request_fn) # type: ignore
171 else:
172 batch_items = items # type: ignore
174 # Group items if configured
175 if self.config.group_by_model or self.config.group_by_provider:
176 grouped_items = self._group_items(batch_items)
177 else:
178 grouped_items = {"default": batch_items}
180 # Process groups
181 all_results = []
183 # Calculate total items for progress tracking
184 total_items = sum(len(group_items) for group_items in grouped_items.values())
186 # Initialize progress bar if enabled
187 progress_bar = None
188 if self.config.show_progress:
189 progress_bar = tqdm(
190 total=total_items,
191 desc=self.config.progress_desc,
192 unit="item"
193 )
195 for group_name, group_items in grouped_items.items():
196 logger.info(
197 f"Processing group '{group_name}' with {len(group_items)} items"
198 )
200 # Create semaphore for concurrency control
201 semaphore = asyncio.Semaphore(self.config.max_concurrent_requests)
203 # Process items with concurrency control and progress tracking
204 tasks = [
205 self._process_item_with_retry(item, semaphore, progress_bar) for item in group_items
206 ]
208 # Wait for all tasks to complete
209 group_results = await asyncio.gather(*tasks, return_exceptions=True)
211 # Handle exceptions from gather
212 for i, result in enumerate(group_results):
213 if isinstance(result, Exception):
214 # Create error result
215 error_result: BatchResult[BatchResultT] = BatchResult(
216 item=group_items[i],
217 error=result,
218 success=False,
219 attempt_count=group_items[i].attempt_count,
220 )
221 all_results.append(error_result)
222 else:
223 all_results.append(result) # type: ignore
225 # Close progress bar if it was created
226 if progress_bar:
227 progress_bar.close()
229 self._results = all_results
230 return all_results
232 def process_batch_sync(
233 self,
234 items: list[BatchItemT] | list[BatchItem[BatchItemT]],
235 request_fn: Callable[[BatchItemT], tuple[str, str, dict[str, Any]]] | None = None,
236 ) -> list[BatchResult[BatchResultT]]:
237 """Process a batch of items synchronously."""
238 # Convert to batch items if needed
239 if items and not isinstance(items[0], BatchItem):
240 batch_items = self.create_batch_items(items, request_fn) # type: ignore
241 else:
242 batch_items = items # type: ignore
244 # Group items if configured
245 if self.config.group_by_model or self.config.group_by_provider:
246 grouped_items = self._group_items(batch_items)
247 else:
248 grouped_items = {"default": batch_items}
250 # Calculate total items for progress tracking
251 total_items = sum(len(group_items) for group_items in grouped_items.values())
253 # Initialize progress bar if enabled
254 progress_bar = None
255 if self.config.show_progress:
256 progress_bar = tqdm(
257 total=total_items,
258 desc=self.config.progress_desc,
259 unit="item"
260 )
262 # Process groups
263 all_results = []
264 for group_name, group_items in grouped_items.items():
265 logger.info(
266 f"Processing group '{group_name}' with {len(group_items)} items"
267 )
269 # Use ThreadPoolExecutor for concurrent processing
270 with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
271 # Submit all tasks
272 future_to_item = {
273 executor.submit(self._process_item_sync_with_retry, item, progress_bar): item
274 for item in group_items
275 }
277 # Collect results
278 for future in as_completed(future_to_item):
279 item = future_to_item[future]
280 try:
281 result = future.result()
282 all_results.append(result)
283 except Exception as e:
284 error_result: BatchResult[BatchResultT] = BatchResult(
285 item=item,
286 error=e,
287 success=False,
288 attempt_count=item.attempt_count,
289 )
290 all_results.append(error_result)
292 # Close progress bar if it was created
293 if progress_bar:
294 progress_bar.close()
296 self._results = all_results
297 return all_results
299 def _group_items(
300 self, items: list[BatchItem[BatchItemT]]
301 ) -> dict[str, list[BatchItem[BatchItemT]]]:
302 """Group items by model or provider."""
303 groups: dict[str, list[BatchItem[BatchItemT]]] = {}
305 for item in items:
306 # Determine group key
307 group_key = "default"
309 if (
310 self.config.group_by_model
311 and item.json_data
312 and "model" in item.json_data
313 ):
314 group_key = item.json_data["model"]
315 elif self.config.group_by_provider:
316 group_key = self.limiter.provider.value
318 # Add to group
319 if group_key not in groups:
320 groups[group_key] = []
321 groups[group_key].append(item)
323 return groups
325 async def _process_item_with_retry(
326 self,
327 item: BatchItem[BatchItemT],
328 semaphore: asyncio.Semaphore,
329 progress_bar: tqdm | None = None,
330 ) -> BatchResult[BatchResultT]:
331 """Process a single item with retry logic."""
332 async with semaphore:
333 import time
335 start_time = time.time()
337 for attempt in range(self.config.max_retries_per_item + 1):
338 item.attempt_count = attempt + 1
340 try:
341 # Process the item
342 result = await self.process_item(item)
344 # Update progress bar on success
345 if progress_bar:
346 progress_bar.update(1)
348 # Success
349 return BatchResult(
350 item=item,
351 result=result,
352 success=True,
353 duration=time.time() - start_time,
354 attempt_count=item.attempt_count,
355 )
357 except Exception as e:
358 item.last_error = e
360 # Check if this is a timeout error
361 is_timeout_error = (
362 isinstance(e, (httpx.ReadTimeout, httpx.ConnectTimeout)) or
363 (hasattr(e, '__cause__') and isinstance(e.__cause__, (httpx.ReadTimeout, httpx.ConnectTimeout))) or
364 'ReadTimeout' in str(type(e)) or 'timeout' in str(e).lower()
365 )
367 # Print user-friendly error messages
368 if self.config.verbose:
369 if is_timeout_error:
370 # Get current timeout from the limiter
371 current_timeout = getattr(self.limiter, '_user_timeout', 120.0)
372 print(f"⏱️ TIMEOUT ERROR in batch item {item.id} (attempt {attempt + 1}):")
373 print(f" Current timeout setting: {current_timeout} seconds")
374 print(f" The request took longer than {current_timeout}s to complete.")
375 print(f"")
376 print(f"💡 How to fix this:")
377 print(f" 1. Increase timeout: ChatLimiter.for_model('{getattr(self.limiter, 'provider', 'your-model')}', timeout={current_timeout + 60})")
378 print(f" 2. Reduce concurrency: BatchConfig(max_concurrent_requests={max(1, self.config.max_concurrent_requests // 2)})")
379 print(f" 3. Current concurrency: {self.config.max_concurrent_requests} requests")
380 print(f"")
381 else:
382 print(f"❌ Exception in batch item {item.id} (attempt {attempt + 1}):")
384 traceback.print_exc()
386 # If this is the last attempt or we should stop on error
387 if (
388 attempt == self.config.max_retries_per_item
389 or self.config.stop_on_first_error
390 ):
391 # Update progress bar on final failure
392 if progress_bar:
393 progress_bar.update(1)
395 return BatchResult(
396 item=item,
397 error=e,
398 success=False,
399 duration=time.time() - start_time,
400 attempt_count=item.attempt_count,
401 )
403 # Wait before retry - longer for timeout errors
404 if is_timeout_error:
405 # For timeout errors, wait longer and suggest more aggressive backing off
406 retry_delay = self.config.retry_delay * (3**attempt) # More aggressive backoff
407 else:
408 retry_delay = self.config.retry_delay * (2**attempt)
410 await asyncio.sleep(retry_delay)
412 # This should never be reached, but added for type checking
413 return BatchResult(
414 item=item,
415 error=Exception("Unexpected error in retry logic"),
416 success=False,
417 duration=time.time() - start_time,
418 attempt_count=item.attempt_count,
419 )
421 def _process_item_sync_with_retry(
422 self,
423 item: BatchItem[BatchItemT],
424 progress_bar: tqdm | None = None,
425 ) -> BatchResult[BatchResultT]:
426 """Process a single item with retry logic (sync)."""
427 import time
429 start_time = time.time()
431 for attempt in range(self.config.max_retries_per_item + 1):
432 item.attempt_count = attempt + 1
434 try:
435 # Process the item
436 result = self.process_item_sync(item)
438 # Update progress bar on success
439 if progress_bar:
440 progress_bar.update(1)
442 # Success
443 return BatchResult(
444 item=item,
445 result=result,
446 success=True,
447 duration=time.time() - start_time,
448 attempt_count=item.attempt_count,
449 )
451 except Exception as e:
452 item.last_error = e
454 # Print traceback if verbose mode is enabled
455 if self.config.verbose:
456 print(f"Exception in batch item {item.id} (attempt {attempt + 1}):")
457 traceback.print_exc()
459 # If this is the last attempt or we should stop on error
460 if (
461 attempt == self.config.max_retries_per_item
462 or self.config.stop_on_first_error
463 ):
464 # Update progress bar on final failure
465 if progress_bar:
466 progress_bar.update(1)
468 return BatchResult(
469 item=item,
470 error=e,
471 success=False,
472 duration=time.time() - start_time,
473 attempt_count=item.attempt_count,
474 )
476 # Wait before retry
477 time.sleep(self.config.retry_delay * (2**attempt))
479 # This should never be reached, but added for type checking
480 return BatchResult(
481 item=item,
482 error=Exception("Unexpected error in retry logic"),
483 success=False,
484 duration=time.time() - start_time,
485 attempt_count=item.attempt_count,
486 )
488 def get_success_rate(self) -> float:
489 """Get the success rate of the last batch."""
490 if not self._results:
491 return 0.0
493 successful = sum(1 for r in self._results if r.success)
494 return successful / len(self._results)
496 def get_successful_results(self) -> list[BatchResult[BatchResultT]]:
497 """Get only successful results."""
498 return [r for r in self._results if r.success]
500 def get_failed_results(self) -> list[BatchResult[BatchResultT]]:
501 """Get only failed results."""
502 return [r for r in self._results if not r.success]
504 def get_stats(self) -> dict[str, Any]:
505 """Get comprehensive processing statistics."""
506 if not self._results:
507 return {"total": 0, "successful": 0, "failed": 0, "success_rate": 0.0}
509 successful = self.get_successful_results()
510 failed = self.get_failed_results()
512 # Calculate timing statistics
513 durations = [r.duration for r in self._results]
514 avg_duration = sum(durations) / len(durations) if durations else 0
516 return {
517 "total": len(self._results),
518 "successful": len(successful),
519 "failed": len(failed),
520 "success_rate": self.get_success_rate(),
521 "avg_duration": avg_duration,
522 "total_duration": sum(durations),
523 "avg_attempts": sum(r.attempt_count for r in self._results)
524 / len(self._results),
525 }
528class ChatBatchProcessor(BatchProcessor[dict[str, Any], dict[str, Any]]):
529 """Batch processor for chat completion requests."""
531 async def process_item(self, item: BatchItem[dict[str, Any]]) -> dict[str, Any]:
532 """Process a single chat completion request."""
533 request_data = item.json_data or item.data
535 # Log prompt if verbose mode is enabled
536 if self.config.verbose:
537 print(f"\n--- PROMPT (Item {item.id}) ---")
538 if "messages" in request_data:
539 for msg in request_data["messages"]:
540 role = msg.get("role", "unknown")
541 content = msg.get("content", "")
542 print(f"{role.upper()}: {content}")
543 else:
544 print(f"REQUEST DATA: {request_data}")
545 print("--- END PROMPT ---\n")
547 # Make the request using the limiter
548 response = await self.limiter.request(
549 method=item.method,
550 url=item.url,
551 json=request_data,
552 )
554 # Parse response
555 response.raise_for_status()
556 result: dict[str, Any] = response.json()
558 # Log response if verbose mode is enabled
559 if self.config.verbose:
560 print(f"\n--- RESPONSE (Item {item.id}) ---")
561 if "choices" in result and result["choices"]:
562 for i, choice in enumerate(result["choices"]):
563 if "message" in choice:
564 content = choice["message"].get("content", "")
565 print(f"CHOICE {i}: {content}")
566 elif "text" in choice:
567 print(f"CHOICE {i}: {choice['text']}")
568 else:
569 print(f"FULL RESPONSE: {result}")
570 print("--- END RESPONSE ---\n")
572 # Store response metadata
573 item.metadata["response_headers"] = dict(response.headers)
574 item.metadata["status_code"] = response.status_code
576 return result
578 def process_item_sync(self, item: BatchItem[dict[str, Any]]) -> dict[str, Any]:
579 """Process a single chat completion request synchronously."""
580 request_data = item.json_data or item.data
582 # Log prompt if verbose mode is enabled
583 if self.config.verbose:
584 print(f"\n--- PROMPT (Item {item.id}) ---")
585 if "messages" in request_data:
586 for msg in request_data["messages"]:
587 role = msg.get("role", "unknown")
588 content = msg.get("content", "")
589 print(f"{role.upper()}: {content}")
590 else:
591 print(f"REQUEST DATA: {request_data}")
592 print("--- END PROMPT ---\n")
594 # Make the request using the limiter
595 response = self.limiter.request_sync(
596 method=item.method,
597 url=item.url,
598 json=request_data,
599 )
601 # Parse response
602 response.raise_for_status()
603 result: dict[str, Any] = response.json()
605 # Log response if verbose mode is enabled
606 if self.config.verbose:
607 print(f"\n--- RESPONSE (Item {item.id}) ---")
608 if "choices" in result and result["choices"]:
609 for i, choice in enumerate(result["choices"]):
610 if "message" in choice:
611 content = choice["message"].get("content", "")
612 print(f"CHOICE {i}: {content}")
613 elif "text" in choice:
614 print(f"CHOICE {i}: {choice['text']}")
615 else:
616 print(f"FULL RESPONSE: {result}")
617 print("--- END RESPONSE ---\n")
619 # Store response metadata
620 item.metadata["response_headers"] = dict(response.headers)
621 item.metadata["status_code"] = response.status_code
623 return result
626# Convenience functions for common use cases
627async def process_chat_batch(
628 limiter: ChatLimiter,
629 requests: list[dict[str, Any]],
630 config: BatchConfig | None = None,
631) -> list[BatchResult[dict[str, Any]]]:
632 """
633 Process a batch of chat completion requests.
635 Args:
636 limiter: Configured ChatLimiter instance
637 requests: List of request data (will be sent as JSON)
638 config: Optional batch processing configuration
640 Returns:
641 List of batch results
642 """
643 processor = ChatBatchProcessor(limiter, config)
644 return await processor.process_batch(requests)
647def process_chat_batch_sync(
648 limiter: ChatLimiter,
649 requests: list[dict[str, Any]],
650 config: BatchConfig | None = None,
651) -> list[BatchResult[dict[str, Any]]]:
652 """
653 Process a batch of chat completion requests synchronously.
655 Args:
656 limiter: Configured ChatLimiter instance
657 requests: List of request data (will be sent as JSON)
658 config: Optional batch processing configuration
660 Returns:
661 List of batch results
662 """
663 processor = ChatBatchProcessor(limiter, config)
664 return processor.process_batch_sync(requests)
667# High-level chat completion batch processing
668class ChatCompletionBatchProcessor(BatchProcessor[ChatCompletionRequest, ChatCompletionResponse]):
669 """High-level batch processor for chat completion requests."""
671 async def process_item(self, item: BatchItem[ChatCompletionRequest]) -> ChatCompletionResponse:
672 """Process a single chat completion request using high-level interface."""
673 request = item.data
675 # Log prompt if verbose mode is enabled
676 if self.config.verbose:
677 print(f"\n--- PROMPT (Item {item.id}) ---")
678 print(f"MODEL: {request.model}")
679 for msg in request.messages:
680 print(f"{msg.role.value.upper()}: {msg.content}")
681 print("--- END PROMPT ---\n")
683 # Use the high-level chat completion method
684 response = await self.limiter.chat_completion(
685 model=request.model,
686 messages=request.messages,
687 max_tokens=request.max_tokens,
688 temperature=request.temperature,
689 top_p=request.top_p,
690 stop=request.stop,
691 stream=request.stream,
692 # Provider-specific parameters
693 frequency_penalty=request.frequency_penalty,
694 presence_penalty=request.presence_penalty,
695 top_k=request.top_k,
696 )
698 # Log response if verbose mode is enabled
699 if self.config.verbose:
700 print(f"\n--- RESPONSE (Item {item.id}) ---")
701 print(f"MODEL: {response.model}")
702 if response.choices:
703 for i, choice in enumerate(response.choices):
704 print(f"CHOICE {i}: {choice.message.content}")
705 print("--- END RESPONSE ---\n")
707 return response
709 def process_item_sync(self, item: BatchItem[ChatCompletionRequest]) -> ChatCompletionResponse:
710 """Process a single chat completion request synchronously using high-level interface."""
711 request = item.data
713 # Log prompt if verbose mode is enabled
714 if self.config.verbose:
715 print(f"\n--- PROMPT (Item {item.id}) ---")
716 print(f"MODEL: {request.model}")
717 for msg in request.messages:
718 print(f"{msg.role.value.upper()}: {msg.content}")
719 print("--- END PROMPT ---\n")
721 # Use the high-level chat completion method (sync)
722 response = self.limiter.chat_completion_sync(
723 model=request.model,
724 messages=request.messages,
725 max_tokens=request.max_tokens,
726 temperature=request.temperature,
727 top_p=request.top_p,
728 stop=request.stop,
729 stream=request.stream,
730 # Provider-specific parameters
731 frequency_penalty=request.frequency_penalty,
732 presence_penalty=request.presence_penalty,
733 top_k=request.top_k,
734 )
736 # Log response if verbose mode is enabled
737 if self.config.verbose:
738 print(f"\n--- RESPONSE (Item {item.id}) ---")
739 print(f"MODEL: {response.model}")
740 if response.choices:
741 for i, choice in enumerate(response.choices):
742 print(f"CHOICE {i}: {choice.message.content}")
743 print("--- END RESPONSE ---\n")
745 return response
748# Convenience functions for high-level chat completion batches
749async def process_chat_completion_batch(
750 limiter: ChatLimiter,
751 requests: list[ChatCompletionRequest],
752 config: BatchConfig | None = None,
753) -> list[BatchResult[ChatCompletionResponse]]:
754 """
755 Process a batch of high-level chat completion requests.
757 Args:
758 limiter: Configured ChatLimiter instance
759 requests: List of ChatCompletionRequest objects
760 config: Optional batch processing configuration
762 Returns:
763 List of batch results containing ChatCompletionResponse objects
765 Example:
766 from chat_limiter import ChatLimiter, Message, MessageRole, ChatCompletionRequest
768 requests = [
769 ChatCompletionRequest(
770 model="gpt-4o",
771 messages=[Message(role=MessageRole.USER, content="Hello!")],
772 max_tokens=50
773 ),
774 ChatCompletionRequest(
775 model="gpt-4o",
776 messages=[Message(role=MessageRole.USER, content="How are you?")],
777 max_tokens=50
778 )
779 ]
781 async with ChatLimiter.for_model("gpt-4o", api_key) as limiter:
782 results = await process_chat_completion_batch(limiter, requests)
783 """
784 processor = ChatCompletionBatchProcessor(limiter, config)
785 return await processor.process_batch(requests)
788def process_chat_completion_batch_sync(
789 limiter: ChatLimiter,
790 requests: list[ChatCompletionRequest],
791 config: BatchConfig | None = None,
792) -> list[BatchResult[ChatCompletionResponse]]:
793 """
794 Process a batch of high-level chat completion requests synchronously.
796 Args:
797 limiter: Configured ChatLimiter instance
798 requests: List of ChatCompletionRequest objects
799 config: Optional batch processing configuration
801 Returns:
802 List of batch results containing ChatCompletionResponse objects
803 """
804 processor = ChatCompletionBatchProcessor(limiter, config)
805 return processor.process_batch_sync(requests)
808# Helper function for creating chat completion requests from simple data
809def create_chat_completion_requests(
810 model: str,
811 prompts: list[str],
812 max_tokens: int | None = None,
813 temperature: float | None = None,
814 **kwargs: Any,
815) -> list[ChatCompletionRequest]:
816 """
817 Create a list of ChatCompletionRequest objects from simple prompts.
819 Args:
820 model: The model to use for all requests
821 prompts: List of user prompts
822 max_tokens: Maximum tokens per completion
823 temperature: Sampling temperature
824 **kwargs: Additional parameters for all requests
826 Returns:
827 List of ChatCompletionRequest objects
829 Example:
830 requests = create_chat_completion_requests(
831 model="gpt-4o",
832 prompts=["Hello!", "How are you?", "What is Python?"],
833 max_tokens=50,
834 temperature=0.7
835 )
836 """
837 from .types import Message, MessageRole
839 requests = []
840 for prompt in prompts:
841 request = ChatCompletionRequest(
842 model=model,
843 messages=[Message(role=MessageRole.USER, content=prompt)],
844 max_tokens=max_tokens,
845 temperature=temperature,
846 **kwargs
847 )
848 requests.append(request)
850 return requests