Coverage for amazonorders/session.py: 89.89%
188 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-25 19:11 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-25 19:11 +0000
1import json
2import logging
3import os
4from io import BytesIO
5from typing import Optional, Any, Dict
6from urllib.parse import urlparse
8import requests
9from PIL import Image
10from amazoncaptcha import AmazonCaptcha
11from bs4 import BeautifulSoup, Tag
12from requests import Session, Response
13from requests.utils import dict_from_cookiejar
15from amazonorders.conf import DEFAULT_COOKIE_JAR_PATH, DEFAULT_OUTPUT_DIR
16from amazonorders.exception import AmazonOrdersAuthError
18__author__ = "Alex Laird"
19__copyright__ = "Copyright 2024, Alex Laird"
20__version__ = "1.0.5"
22logger = logging.getLogger(__name__)
24BASE_URL = "https://www.amazon.com"
25BASE_HEADERS = {
26 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
27 "Accept-Encoding": "gzip, deflate, br",
28 "Accept-Language": "en-US,en;q=0.9",
29 "Cache-Control": "max-age=0",
30 "Content-Type": "application/x-www-form-urlencoded",
31 "Origin": BASE_URL,
32 "Referer": "{}/ap/signin".format(BASE_URL),
33 "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
34 "Sec-Ch-Ua-Mobile": "?0",
35 "Sec-Ch-Ua-Platform": "macOS",
36 "Sec-Ch-Viewport-Width": "1393",
37 "Sec-Fetch-Dest": "document",
38 "Sec-Fetch-Mode": "navigate",
39 "Sec-Fetch-Site": "same-origin",
40 "Sec-Fetch-User": "?1",
41 "Viewport-Width": "1393",
42 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
43}
44SIGN_IN_FORM_SELECTOR = "form[name='signIn']"
45MFA_DEVICE_SELECT_FORM_SELECTOR = "form[id='auth-select-device-form']"
46MFA_FORM_SELECTOR = "form[id='auth-mfa-form']"
47CAPTCHA_1_FORM_SELECTOR = "form[class*='cvf-widget-form-captcha']"
48CAPTCHA_2_FORM_SELECTOR = "form:has(input[id^='captchacharacters'])"
49CAPTCHA_OTP_FORM_SELECTOR = "form[id='verification-code-form']"
52class IODefault:
53 """
54 Handles input/output from the application. By default, this uses console commands, but
55 this class exists so that it can be overriden when constructing an :class:`AmazonSession`
56 if input/output should be handled another way.
57 """
59 def echo(self,
60 msg,
61 **kwargs):
62 """
63 Echo a message to the console.
65 :param msg: The data to send to output.
66 :param kwargs: Unused by the default implementation.
67 """
68 print(msg)
70 def prompt(self,
71 msg,
72 type=None,
73 **kwargs):
74 """
75 Prompt to the console for user input.
77 :param msg: The data to use as the input prompt.
78 :param type: Unused by the default implementation.
79 :param kwargs: Unused by the default implementation.
80 :return: The user input result.
81 """
82 return input("{}: ".format(msg))
85class AmazonSession:
86 """
87 An interface for interacting with Amazon and authenticating an underlying :class:`requests.Session`. Utilizing
88 this class means session data is maintained between requests. Session data is also persisted after each request,
89 meaning it will also be maintained between separate instantiations of the class or application.
91 To get started, call the :func:`login` function.
92 """
94 def __init__(self,
95 username: str,
96 password: str,
97 debug: bool = False,
98 max_auth_attempts: int = 10,
99 cookie_jar_path: str = None,
100 io: IODefault = IODefault(),
101 output_dir: str = None) -> None:
102 if not cookie_jar_path:
103 cookie_jar_path = DEFAULT_COOKIE_JAR_PATH
104 if not output_dir:
105 output_dir = DEFAULT_OUTPUT_DIR
107 #: An Amazon username.
108 self.username: str = username
109 #: An Amazon password.
110 self.password: str = password
112 #: Set logger ``DEBUG``, send output to ``stderr``, and write an HTML file for each request made on the session.
113 self.debug: bool = debug
114 if self.debug:
115 logger.setLevel(logging.DEBUG)
116 #: Will continue in :func:`login()`'s auth flow this many times (successes and failures).
117 self.max_auth_attempts: int = max_auth_attempts
118 #: The path to persist session cookies, defaults to ``conf.DEFAULT_COOKIE_JAR_PATH``.
119 self.cookie_jar_path: str = cookie_jar_path
120 #: The I/O handler for echoes and prompts.
121 self.io: IODefault = io
122 #: The directory where any output files will be produced, defaults to ``conf.DEFAULT_OUTPUT_DIR``.
123 self.output_dir = output_dir
125 #: The shared session to be used across all requests.
126 self.session: Session = Session()
127 #: The last response executed on the Session.
128 self.last_response: Optional[Response] = None
129 #: A parsed representation of the last response executed on the Session.
130 self.last_response_parsed: Optional[Tag] = None
131 #: If :func:`login()` has been executed and successfully logged in the session.
132 self.is_authenticated: bool = False
134 cookie_dir = os.path.dirname(self.cookie_jar_path)
135 if not os.path.exists(cookie_dir):
136 os.makedirs(cookie_dir)
137 if os.path.exists(self.cookie_jar_path):
138 with open(self.cookie_jar_path, "r", encoding="utf-8") as f:
139 data = json.loads(f.read())
140 cookies = requests.utils.cookiejar_from_dict(data)
141 self.session.cookies.update(cookies)
143 def request(self,
144 method: str,
145 url: str,
146 **kwargs: Any) -> Response:
147 """
148 Execute the request against Amazon with base headers, parsing and storing the response
149 and persisting response cookies.
151 :param method: The request method to execute.
152 :param url: The URL to execute ``method`` on.
153 :param kwargs: Remaining ``kwargs`` will be passed to :func:`requests.request`.
154 :return: The Response from the executed request.
155 """
156 if "headers" not in kwargs:
157 kwargs["headers"] = {}
158 kwargs["headers"].update(BASE_HEADERS)
160 logger.debug("{} request to {}".format(method, url))
162 self.last_response = self.session.request(method, url, **kwargs)
163 self.last_response_parsed = BeautifulSoup(self.last_response.text,
164 "html.parser")
166 cookies = dict_from_cookiejar(self.session.cookies)
167 if os.path.exists(self.cookie_jar_path):
168 os.remove(self.cookie_jar_path)
169 with open(self.cookie_jar_path, "w", encoding="utf-8") as f:
170 f.write(json.dumps(cookies))
172 logger.debug("Response: {} - {}".format(self.last_response.url,
173 self.last_response.status_code))
175 if self.debug:
176 page_name = self._get_page_from_url(self.last_response.url)
177 with open(os.path.join(self.output_dir, page_name), "w",
178 encoding="utf-8") as html_file:
179 logger.debug(
180 "Response written to file: {}".format(html_file.name))
181 html_file.write(self.last_response.text)
183 return self.last_response
185 def get(self,
186 url: str,
187 **kwargs: Any):
188 """
189 Perform a GET request.
191 :param url: The URL to GET on.
192 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`.
193 :return: The Response from the executed GET request.
194 """
195 return self.request("GET", url, **kwargs)
197 def post(self,
198 url,
199 **kwargs: Any) -> Response:
200 """
201 Perform a POST request.
203 :param url: The URL to POST on.
204 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`.
205 :return: The Response from the executed POST request.
206 """
207 return self.request("POST", url, **kwargs)
209 def auth_cookies_stored(self):
210 cookies = dict_from_cookiejar(self.session.cookies)
211 return cookies.get("session-token") and cookies.get("x-main")
213 def login(self) -> None:
214 """
215 Execute an Amazon login process. This will include the sign-in page, and may also include Captcha challenges
216 and OTP pages (of 2FA authentication is enabled on your account).
218 If successful, ``is_authenticated`` will be set to ``True``.
220 Session cookies are persisted, and if existing session data is found during this auth flow, it will be
221 skipped entirely and flagged as authenticated.
222 """
223 self.get("{}/gp/sign-in.html".format(BASE_URL))
225 attempts = 0
226 while not self.is_authenticated and attempts < self.max_auth_attempts:
227 if self.auth_cookies_stored() or \
228 ("Hello, sign in" not in self.last_response.text and
229 "nav-item-signout" in self.last_response.text):
230 self.is_authenticated = True
231 break
233 if self.last_response_parsed.select_one(SIGN_IN_FORM_SELECTOR):
234 self._sign_in()
235 elif self.last_response_parsed.select_one(CAPTCHA_1_FORM_SELECTOR):
236 self._captcha_submit(CAPTCHA_1_FORM_SELECTOR,
237 "cvf_captcha_input",
238 "cvf-widget-alert")
239 elif self.last_response_parsed.select_one(CAPTCHA_2_FORM_SELECTOR):
240 self._captcha_submit(CAPTCHA_2_FORM_SELECTOR,
241 "field-keywords",
242 "a-alert-info")
243 elif self.last_response_parsed.select_one(
244 MFA_DEVICE_SELECT_FORM_SELECTOR):
245 self._mfa_device_select()
246 elif self.last_response_parsed.select_one(MFA_FORM_SELECTOR):
247 self._mfa_submit()
248 elif self.last_response_parsed.select_one(
249 CAPTCHA_OTP_FORM_SELECTOR):
250 self._captcha_otp_submit()
251 else:
252 raise AmazonOrdersAuthError(
253 "An error occurred, this is an unknown page, or its parsed contents don't match a known auth flow: {}. To capture the page to a file, set the `debug` flag.".format(
254 self.last_response.url))
256 attempts += 1
258 if attempts == self.max_auth_attempts:
259 raise AmazonOrdersAuthError(
260 "Max authentication flow attempts reached.")
262 def logout(self) -> None:
263 """
264 Logout and close the existing Amazon session and clear cookies.
265 """
266 self.get("{}/gp/sign-out.html".format(BASE_URL))
268 if os.path.exists(self.cookie_jar_path):
269 os.remove(self.cookie_jar_path)
271 self.session.close()
272 self.session = Session()
274 self.is_authenticated = False
276 def _sign_in(self) -> None:
277 form = self.last_response_parsed.select_one(SIGN_IN_FORM_SELECTOR)
278 data = self._build_from_form(form,
279 additional_attrs={"email": self.username,
280 "password": self.password,
281 "rememberMe": "true"})
283 self.request(form.get("method", "GET"),
284 self._get_form_action(form),
285 data=data)
287 self._handle_errors(critical=True)
289 def _mfa_device_select(self) -> None:
290 form = self.last_response_parsed.select_one(
291 MFA_DEVICE_SELECT_FORM_SELECTOR)
292 contexts = form.select("input[name='otpDeviceContext']")
294 i = 1
295 for field in contexts:
296 self.io.echo("{}: {}".format(i, field["value"].strip()))
297 i += 1
298 otp_device = int(
299 self.io.prompt(
300 "--> Enter where you would like your one-time passcode sent",
301 type=int))
302 self.io.echo("")
304 form = self.last_response_parsed.select_one(
305 MFA_DEVICE_SELECT_FORM_SELECTOR)
306 data = self._build_from_form(form,
307 additional_attrs={"otpDeviceContext":
308 contexts[
309 otp_device - 1]["value"]})
311 self.request(form.get("method", "GET"),
312 self._get_form_action(form),
313 data=data)
315 self._handle_errors()
317 def _mfa_submit(self) -> None:
318 otp = self.io.prompt(
319 "--> Enter the one-time passcode sent to your device")
320 self.io.echo("")
322 form = self.last_response_parsed.select_one(MFA_FORM_SELECTOR)
323 data = self._build_from_form(form,
324 additional_attrs={"otpCode": otp,
325 "rememberDevice": ""})
327 self.request(form.get("method", "GET"),
328 self._get_form_action(form),
329 data=data)
331 self._handle_errors()
333 def _captcha_submit(self, form_selector, solution_attr_key,
334 error_div_class) -> None:
335 form = self.last_response_parsed.select_one(form_selector)
337 solution = self._solve_captcha(
338 form.find_parent().select_one("img")["src"])
340 data = self._build_from_form(form,
341 additional_attrs={
342 solution_attr_key: solution})
344 self.request(form.get("method", "GET"),
345 self._get_form_action(form),
346 data=data)
348 self._handle_errors(error_div_class, "class")
350 def _captcha_otp_submit(self) -> None:
351 otp = self.io.prompt(
352 "--> Enter the one-time passcode sent to your device")
353 self.io.echo("")
355 form = self.last_response_parsed.select_one(CAPTCHA_OTP_FORM_SELECTOR)
356 data = self._build_from_form(form,
357 additional_attrs={"otpCode": otp})
359 self.request(form.get("method", "GET"),
360 self._get_form_action(form),
361 data=data)
363 self._handle_errors()
365 def _build_from_form(self,
366 form: Tag,
367 additional_attrs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
368 data = {}
369 for field in form.select("input"):
370 try:
371 data[field["name"]] = field["value"]
372 except:
373 pass
374 if additional_attrs:
375 data.update(additional_attrs)
376 return data
378 def _get_form_action(self,
379 form: Tag) -> str:
380 action = form.get("action")
381 if not action:
382 return self.last_response.url
383 elif not action.startswith("http"):
384 if action.startswith("/"):
385 parsed_url = urlparse(self.last_response.url)
386 return "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
387 action)
388 else:
389 return "{}/{}".format(
390 "/".join(self.last_response.url.split("/")[:-1]), action)
391 else:
392 return action
394 def _get_page_from_url(self,
395 url: str) -> str:
396 page_name = os.path.basename(urlparse(url).path).strip(".html")
397 i = 0
398 while os.path.isfile("{}_{}".format(page_name, 0)):
399 i += 1
400 return "{}_{}.html".format(page_name, i)
402 def _handle_errors(self,
403 error_div: str = "auth-error-message-box",
404 attr_name: str = "id",
405 critical: bool = False) -> None:
406 error_div = self.last_response_parsed.select_one(
407 "div[{}='{}']".format(attr_name, error_div))
408 if error_div:
409 error_msg = "An error occurred: {}\n".format(error_div.text.strip())
411 if critical:
412 raise AmazonOrdersAuthError(error_msg)
413 else:
414 self.io.echo(error_msg, fg="red")
416 def _solve_captcha(self,
417 url: str) -> str:
418 captcha_response = AmazonCaptcha.fromlink(url).solve()
419 if not captcha_response or captcha_response.lower() == "not solved":
420 img_response = self.session.get(url)
421 img = Image.open(BytesIO(img_response.content))
422 img.show()
423 self.io.echo("Info: The Captcha couldn't be auto-solved.")
424 captcha_response = self.io.prompt(
425 "--> Enter the characters shown in the image")
426 self.io.echo("")
428 return captcha_response