Coverage for amazonorders/session.py: 89.53%
172 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-18 00:17 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-18 00:17 +0000
1import json
2import logging
3import os
4from io import BytesIO
5from typing import Optional, Any, Dict
6from urllib.parse import urlparse
8import requests
9from PIL import Image
10from amazoncaptcha import AmazonCaptcha
11from bs4 import BeautifulSoup, Tag
12from requests import Session, Response
13from requests.utils import dict_from_cookiejar
15from amazonorders.exception import AmazonOrdersAuthError
17__author__ = "Alex Laird"
18__copyright__ = "Copyright 2024, Alex Laird"
19__version__ = "1.0.1"
21logger = logging.getLogger(__name__)
23BASE_URL = "https://www.amazon.com"
24BASE_HEADERS = {
25 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
26 "Accept-Encoding": "gzip, deflate, br",
27 "Accept-Language": "en-US,en;q=0.9",
28 "Cache-Control": "max-age=0",
29 "Content-Type": "application/x-www-form-urlencoded",
30 "Origin": BASE_URL,
31 "Referer": "{}/ap/signin".format(BASE_URL),
32 "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
33 "Sec-Ch-Ua-Mobile": "?0",
34 "Sec-Ch-Ua-Platform": "macOS",
35 "Sec-Ch-Viewport-Width": "1393",
36 "Sec-Fetch-Dest": "document",
37 "Sec-Fetch-Mode": "navigate",
38 "Sec-Fetch-Site": "same-origin",
39 "Sec-Fetch-User": "?1",
40 "Viewport-Width": "1393",
41 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
42}
43SIGN_IN_FORM_NAME = "signIn"
44MFA_DEVICE_SELECT_FORM_ID = "auth-select-device-form"
45MFA_FORM_ID = "auth-mfa-form"
46CAPTCHA_1_DIV_ID = "cvf-page-content"
47CAPTCHA_1_FORM_CLASS = "cvf-widget-form"
48CAPTCHA_2_INPUT_ID = "captchacharacters"
50DEFAULT_COOKIE_JAR_PATH = os.path.join(os.path.expanduser("~"), ".config", "amazon-orders", "cookies.json")
53class AmazonSession:
54 """
56 """
58 def __init__(self,
59 username: str,
60 password: str,
61 debug: bool = False,
62 max_auth_attempts: int = 10,
63 cookie_jar_path: str = None) -> None:
64 if not cookie_jar_path:
65 cookie_jar_path = DEFAULT_COOKIE_JAR_PATH
67 #: An Amazon username.
68 self.username: str = username
69 #: An Amazon password.
70 self.password: str = password
72 #: Set logger ``DEBUG``, send output to ``stderr``, and write an HTML file for each request made on the session.
73 self.debug: bool = debug
74 if self.debug:
75 logger.setLevel(logging.DEBUG)
76 #: Will continue in :func:`login()`'s auth flow this many times.
77 self.max_auth_attempts: int = max_auth_attempts
78 #: The path to persist session cookies, defaults to ``conf.DEFAULT_COOKIE_JAR_PATH``.
79 self.cookie_jar_path: str = cookie_jar_path
81 #:
82 self.session: Session = Session()
83 #:
84 self.last_response: Optional[Response] = None
85 #:
86 self.last_response_parsed: Optional[Tag] = None
87 #: If :func:`login()` has been executed and successfully logged in the session.
88 self.is_authenticated: bool = False
90 cookie_dir = os.path.dirname(self.cookie_jar_path)
91 if not os.path.exists(cookie_dir):
92 os.makedirs(cookie_dir)
93 if os.path.exists(self.cookie_jar_path):
94 with open(cookie_jar_path, "r", encoding="utf-8") as f:
95 data = json.loads(f.read())
96 cookies = requests.utils.cookiejar_from_dict(data)
97 self.session.cookies.update(cookies)
99 def request(self,
100 method: str,
101 url: str,
102 **kwargs: Any) -> Response:
103 """
105 :param method: The request method to execute.
106 :param url: The URL to execute ``method`` on.
107 :param kwargs: Remaining ``kwargs`` will be passed to :func:`requests.request`.
108 :return: The Response from the executed request.
109 """
110 if "headers" not in kwargs:
111 kwargs["headers"] = {}
112 kwargs["headers"].update(BASE_HEADERS)
114 logger.debug("{} request to {}".format(method, url))
116 self.last_response = self.session.request(method, url, **kwargs)
117 self.last_response_parsed = BeautifulSoup(self.last_response.text,
118 "html.parser")
120 cookies = dict_from_cookiejar(self.session.cookies)
121 if os.path.exists(self.cookie_jar_path):
122 os.remove(self.cookie_jar_path)
123 with open(self.cookie_jar_path, "w", encoding="utf-8") as f:
124 f.write(json.dumps(cookies))
126 logger.debug("Response: {} - {}".format(self.last_response.url,
127 self.last_response.status_code))
129 if self.debug:
130 page_name = self._get_page_from_url(self.last_response.url)
131 with open(page_name, "w", encoding="utf-8") as html_file:
132 logger.debug(
133 "Response written to file: {}".format(html_file.name))
134 html_file.write(self.last_response.text)
136 return self.last_response
138 def get(self,
139 url: str,
140 **kwargs: Any):
141 """
143 :param url: The URL to GET on.
144 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`.
145 :return: The Response from the executed GET request.
146 """
147 return self.request("GET", url, **kwargs)
149 def post(self,
150 url,
151 **kwargs: Any) -> Response:
152 """
154 :param url: The URL to POST on.
155 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`.
156 :return: The Response from the executed POST request.
157 """
158 return self.request("POST", url, **kwargs)
160 def auth_cookies_stored(self):
161 cookies = dict_from_cookiejar(self.session.cookies)
162 return cookies.get("session-token") and cookies.get("x-main")
164 def login(self) -> None:
165 """
166 Execute an Amazon login process. This will include the sign-in page, and may also include Captcha challenges
167 and OTP pages (of 2FA authentication is enabled on your account).
169 If successful, ``is_authenticated`` will be set to ``True``.
171 Session cookies are persisted, and if existing session data is found during this auth flow, it will be
172 skipped entirely and flagged as authenticated.
173 """
174 self.get("{}/gp/sign-in.html".format(BASE_URL))
176 attempts = 0
177 while not self.is_authenticated and attempts < self.max_auth_attempts:
178 if self.auth_cookies_stored() or \
179 ("Hello, sign in" not in self.last_response.text and
180 "nav-item-signout" in self.last_response.text):
181 self.is_authenticated = True
182 break
184 if self._is_field_found(SIGN_IN_FORM_NAME):
185 self._sign_in()
186 elif self._is_field_found(CAPTCHA_1_FORM_CLASS, field_key="class"):
187 self._captcha_1_submit()
188 elif self.last_response_parsed.find("input",
189 id=lambda
190 value: value and value.startswith(
191 CAPTCHA_2_INPUT_ID)):
192 self._captcha_2_submit()
193 elif self._is_field_found(MFA_DEVICE_SELECT_FORM_ID,
194 field_key="id"):
195 self._mfa_device_select()
196 elif self._is_field_found(MFA_FORM_ID, field_key="id"):
197 self._mfa_submit()
198 else:
199 raise AmazonOrdersAuthError(
200 "An error occurred, this is an unknown page: {}. To capture the page to a file, set the `debug` flag.".format(
201 self.last_response.url))
203 attempts += 1
205 if attempts == self.max_auth_attempts:
206 raise AmazonOrdersAuthError(
207 "Max authentication flow attempts reached.")
209 def logout(self) -> None:
210 """
211 Logout of the existing Amazon session and clear cookies.
212 """
213 self.get("{}/gp/sign-out.html".format(BASE_URL))
215 if os.path.exists(self.cookie_jar_path):
216 os.remove(self.cookie_jar_path)
218 self.session.close()
219 self.session = Session()
221 self.is_authenticated = False
223 def _sign_in(self) -> None:
224 form = self.last_response_parsed.find("form",
225 {"name": SIGN_IN_FORM_NAME})
226 data = self._build_from_form(form,
227 additional_attrs={"email": self.username,
228 "password": self.password,
229 "rememberMe": "true"})
231 self.request(form.attrs.get("method", "GET"),
232 self._get_form_action(form),
233 data=data)
235 self._handle_errors(critical=True)
237 def _mfa_device_select(self) -> None:
238 form = self.last_response_parsed.find("form",
239 {"id": MFA_DEVICE_SELECT_FORM_ID})
240 contexts = form.find_all("input", {"name": "otpDeviceContext"})
241 i = 1
242 for field in contexts:
243 print("{}: {}".format(i, field.attrs["value"].strip()))
244 i += 1
245 otp_device = int(
246 input("Where would you like your one-time passcode sent? "))
248 form = self.last_response_parsed.find("form",
249 id=MFA_DEVICE_SELECT_FORM_ID)
250 data = self._build_from_form(form,
251 additional_attrs={"otpDeviceContext":
252 contexts[
253 otp_device - 1].attrs[
254 "value"]})
256 self.request(form.attrs.get("method", "GET"),
257 self._get_form_action(form),
258 data=data)
260 self._handle_errors()
262 def _mfa_submit(self) -> None:
263 otp = input("Enter the one-time passcode sent to your device: ")
265 form = self.last_response_parsed.find("form", id=MFA_FORM_ID)
266 data = self._build_from_form(form,
267 additional_attrs={"otpCode": otp,
268 "rememberDevice": ""})
270 self.request(form.attrs.get("method", "GET"),
271 self._get_form_action(form),
272 data=data)
274 self._handle_errors()
276 def _captcha_1_submit(self) -> None:
277 captcha_div = self.last_response_parsed.find("div",
278 {"id": CAPTCHA_1_DIV_ID})
280 solution = self._solve_captcha(
281 captcha_div.find("img", {"alt": "captcha"}).attrs["src"])
283 form = self.last_response_parsed.find("form",
284 {"class": CAPTCHA_1_FORM_CLASS})
285 data = self._build_from_form(form,
286 additional_attrs={
287 "cvf_captcha_input": solution})
289 self.request(form.attrs.get("method", "GET"),
290 self._get_form_action(form,
291 prefix="{}/ap/cvf/".format(
292 BASE_URL)),
293 data=data)
295 self._handle_errors("cvf-widget-alert", "class")
297 def _captcha_2_submit(self) -> None:
298 form = self.last_response_parsed.find("input",
299 id=lambda
300 value: value and value.startswith(
301 CAPTCHA_2_INPUT_ID)).find_parent(
302 "form")
304 solution = self._solve_captcha(form.find("img").attrs["src"])
306 data = self._build_from_form(form,
307 additional_attrs={
308 "field-keywords": solution})
310 self.request(form.attrs.get("method", "GET"),
311 self._get_form_action(form,
312 prefix=BASE_URL),
313 params=data)
315 self._handle_errors("a-alert-info", "class")
317 def _build_from_form(self,
318 form: Tag,
319 additional_attrs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
320 data = {}
321 for field in form.find_all("input"):
322 try:
323 data[field["name"]] = field["value"]
324 except:
325 pass
326 if additional_attrs:
327 data.update(additional_attrs)
328 return data
330 def _get_form_action(self,
331 form: Tag,
332 prefix: Optional[str] = None) -> str:
333 action = form.attrs.get("action")
334 if not action:
335 action = self.last_response.url
336 if prefix and not action.startswith("http"):
337 action = prefix + action
338 return action
340 def _is_field_found(self,
341 field_value: str,
342 field_type: str = "form",
343 field_key: str = "name") -> bool:
344 return self.last_response_parsed.find(field_type, {
345 field_key: field_value}) is not None
347 def _get_page_from_url(self,
348 url: str) -> str:
349 page_name = os.path.basename(urlparse(url).path).strip(".html")
350 i = 0
351 while os.path.isfile("{}_{}".format(page_name, 0)):
352 i += 1
353 return "{}_{}.html".format(page_name, i)
355 def _handle_errors(self,
356 error_div: str = "auth-error-message-box",
357 attr_name: str = "id",
358 critical: bool = False) -> None:
359 error_div = self.last_response_parsed.find("div",
360 {attr_name: error_div})
361 if error_div:
362 error_msg = "An error occurred: {}".format(error_div.text.strip())
364 if critical:
365 raise AmazonOrdersAuthError(error_msg)
366 else:
367 print(error_msg)
369 def _solve_captcha(self,
370 url: str) -> str:
371 captcha_response = AmazonCaptcha.fromlink(url).solve()
372 if not captcha_response or captcha_response.lower() == "not solved":
373 img_response = self.session.get(url)
374 img = Image.open(BytesIO(img_response.content))
375 img.show()
376 captcha_response = input(
377 "The Captcha couldn't be auto-solved, enter the characters shown in the image: ")
379 return captcha_response