Coverage for amazonorders/session.py: 89.89%

188 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-25 19:11 +0000

1import json 

2import logging 

3import os 

4from io import BytesIO 

5from typing import Optional, Any, Dict 

6from urllib.parse import urlparse 

7 

8import requests 

9from PIL import Image 

10from amazoncaptcha import AmazonCaptcha 

11from bs4 import BeautifulSoup, Tag 

12from requests import Session, Response 

13from requests.utils import dict_from_cookiejar 

14 

15from amazonorders.conf import DEFAULT_COOKIE_JAR_PATH, DEFAULT_OUTPUT_DIR 

16from amazonorders.exception import AmazonOrdersAuthError 

17 

18__author__ = "Alex Laird" 

19__copyright__ = "Copyright 2024, Alex Laird" 

20__version__ = "1.0.5" 

21 

22logger = logging.getLogger(__name__) 

23 

24BASE_URL = "https://www.amazon.com" 

25BASE_HEADERS = { 

26 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", 

27 "Accept-Encoding": "gzip, deflate, br", 

28 "Accept-Language": "en-US,en;q=0.9", 

29 "Cache-Control": "max-age=0", 

30 "Content-Type": "application/x-www-form-urlencoded", 

31 "Origin": BASE_URL, 

32 "Referer": "{}/ap/signin".format(BASE_URL), 

33 "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 

34 "Sec-Ch-Ua-Mobile": "?0", 

35 "Sec-Ch-Ua-Platform": "macOS", 

36 "Sec-Ch-Viewport-Width": "1393", 

37 "Sec-Fetch-Dest": "document", 

38 "Sec-Fetch-Mode": "navigate", 

39 "Sec-Fetch-Site": "same-origin", 

40 "Sec-Fetch-User": "?1", 

41 "Viewport-Width": "1393", 

42 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", 

43} 

44SIGN_IN_FORM_SELECTOR = "form[name='signIn']" 

45MFA_DEVICE_SELECT_FORM_SELECTOR = "form[id='auth-select-device-form']" 

46MFA_FORM_SELECTOR = "form[id='auth-mfa-form']" 

47CAPTCHA_1_FORM_SELECTOR = "form[class*='cvf-widget-form-captcha']" 

48CAPTCHA_2_FORM_SELECTOR = "form:has(input[id^='captchacharacters'])" 

49CAPTCHA_OTP_FORM_SELECTOR = "form[id='verification-code-form']" 

50 

51 

52class IODefault: 

53 """ 

54 Handles input/output from the application. By default, this uses console commands, but 

55 this class exists so that it can be overriden when constructing an :class:`AmazonSession` 

56 if input/output should be handled another way. 

57 """ 

58 

59 def echo(self, 

60 msg, 

61 **kwargs): 

62 """ 

63 Echo a message to the console. 

64 

65 :param msg: The data to send to output. 

66 :param kwargs: Unused by the default implementation. 

67 """ 

68 print(msg) 

69 

70 def prompt(self, 

71 msg, 

72 type=None, 

73 **kwargs): 

74 """ 

75 Prompt to the console for user input. 

76 

77 :param msg: The data to use as the input prompt. 

78 :param type: Unused by the default implementation. 

79 :param kwargs: Unused by the default implementation. 

80 :return: The user input result. 

81 """ 

82 return input("{}: ".format(msg)) 

83 

84 

85class AmazonSession: 

86 """ 

87 An interface for interacting with Amazon and authenticating an underlying :class:`requests.Session`. Utilizing 

88 this class means session data is maintained between requests. Session data is also persisted after each request, 

89 meaning it will also be maintained between separate instantiations of the class or application. 

90 

91 To get started, call the :func:`login` function. 

92 """ 

93 

94 def __init__(self, 

95 username: str, 

96 password: str, 

97 debug: bool = False, 

98 max_auth_attempts: int = 10, 

99 cookie_jar_path: str = None, 

100 io: IODefault = IODefault(), 

101 output_dir: str = None) -> None: 

102 if not cookie_jar_path: 

103 cookie_jar_path = DEFAULT_COOKIE_JAR_PATH 

104 if not output_dir: 

105 output_dir = DEFAULT_OUTPUT_DIR 

106 

107 #: An Amazon username. 

108 self.username: str = username 

109 #: An Amazon password. 

110 self.password: str = password 

111 

112 #: Set logger ``DEBUG``, send output to ``stderr``, and write an HTML file for each request made on the session. 

113 self.debug: bool = debug 

114 if self.debug: 

115 logger.setLevel(logging.DEBUG) 

116 #: Will continue in :func:`login()`'s auth flow this many times (successes and failures). 

117 self.max_auth_attempts: int = max_auth_attempts 

118 #: The path to persist session cookies, defaults to ``conf.DEFAULT_COOKIE_JAR_PATH``. 

119 self.cookie_jar_path: str = cookie_jar_path 

120 #: The I/O handler for echoes and prompts. 

121 self.io: IODefault = io 

122 #: The directory where any output files will be produced, defaults to ``conf.DEFAULT_OUTPUT_DIR``. 

123 self.output_dir = output_dir 

124 

125 #: The shared session to be used across all requests. 

126 self.session: Session = Session() 

127 #: The last response executed on the Session. 

128 self.last_response: Optional[Response] = None 

129 #: A parsed representation of the last response executed on the Session. 

130 self.last_response_parsed: Optional[Tag] = None 

131 #: If :func:`login()` has been executed and successfully logged in the session. 

132 self.is_authenticated: bool = False 

133 

134 cookie_dir = os.path.dirname(self.cookie_jar_path) 

135 if not os.path.exists(cookie_dir): 

136 os.makedirs(cookie_dir) 

137 if os.path.exists(self.cookie_jar_path): 

138 with open(self.cookie_jar_path, "r", encoding="utf-8") as f: 

139 data = json.loads(f.read()) 

140 cookies = requests.utils.cookiejar_from_dict(data) 

141 self.session.cookies.update(cookies) 

142 

143 def request(self, 

144 method: str, 

145 url: str, 

146 **kwargs: Any) -> Response: 

147 """ 

148 Execute the request against Amazon with base headers, parsing and storing the response 

149 and persisting response cookies. 

150 

151 :param method: The request method to execute. 

152 :param url: The URL to execute ``method`` on. 

153 :param kwargs: Remaining ``kwargs`` will be passed to :func:`requests.request`. 

154 :return: The Response from the executed request. 

155 """ 

156 if "headers" not in kwargs: 

157 kwargs["headers"] = {} 

158 kwargs["headers"].update(BASE_HEADERS) 

159 

160 logger.debug("{} request to {}".format(method, url)) 

161 

162 self.last_response = self.session.request(method, url, **kwargs) 

163 self.last_response_parsed = BeautifulSoup(self.last_response.text, 

164 "html.parser") 

165 

166 cookies = dict_from_cookiejar(self.session.cookies) 

167 if os.path.exists(self.cookie_jar_path): 

168 os.remove(self.cookie_jar_path) 

169 with open(self.cookie_jar_path, "w", encoding="utf-8") as f: 

170 f.write(json.dumps(cookies)) 

171 

172 logger.debug("Response: {} - {}".format(self.last_response.url, 

173 self.last_response.status_code)) 

174 

175 if self.debug: 

176 page_name = self._get_page_from_url(self.last_response.url) 

177 with open(os.path.join(self.output_dir, page_name), "w", 

178 encoding="utf-8") as html_file: 

179 logger.debug( 

180 "Response written to file: {}".format(html_file.name)) 

181 html_file.write(self.last_response.text) 

182 

183 return self.last_response 

184 

185 def get(self, 

186 url: str, 

187 **kwargs: Any): 

188 """ 

189 Perform a GET request. 

190 

191 :param url: The URL to GET on. 

192 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`. 

193 :return: The Response from the executed GET request. 

194 """ 

195 return self.request("GET", url, **kwargs) 

196 

197 def post(self, 

198 url, 

199 **kwargs: Any) -> Response: 

200 """ 

201 Perform a POST request. 

202 

203 :param url: The URL to POST on. 

204 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`. 

205 :return: The Response from the executed POST request. 

206 """ 

207 return self.request("POST", url, **kwargs) 

208 

209 def auth_cookies_stored(self): 

210 cookies = dict_from_cookiejar(self.session.cookies) 

211 return cookies.get("session-token") and cookies.get("x-main") 

212 

213 def login(self) -> None: 

214 """ 

215 Execute an Amazon login process. This will include the sign-in page, and may also include Captcha challenges 

216 and OTP pages (of 2FA authentication is enabled on your account). 

217 

218 If successful, ``is_authenticated`` will be set to ``True``. 

219 

220 Session cookies are persisted, and if existing session data is found during this auth flow, it will be 

221 skipped entirely and flagged as authenticated. 

222 """ 

223 self.get("{}/gp/sign-in.html".format(BASE_URL)) 

224 

225 attempts = 0 

226 while not self.is_authenticated and attempts < self.max_auth_attempts: 

227 if self.auth_cookies_stored() or \ 

228 ("Hello, sign in" not in self.last_response.text and 

229 "nav-item-signout" in self.last_response.text): 

230 self.is_authenticated = True 

231 break 

232 

233 if self.last_response_parsed.select_one(SIGN_IN_FORM_SELECTOR): 

234 self._sign_in() 

235 elif self.last_response_parsed.select_one(CAPTCHA_1_FORM_SELECTOR): 

236 self._captcha_submit(CAPTCHA_1_FORM_SELECTOR, 

237 "cvf_captcha_input", 

238 "cvf-widget-alert") 

239 elif self.last_response_parsed.select_one(CAPTCHA_2_FORM_SELECTOR): 

240 self._captcha_submit(CAPTCHA_2_FORM_SELECTOR, 

241 "field-keywords", 

242 "a-alert-info") 

243 elif self.last_response_parsed.select_one( 

244 MFA_DEVICE_SELECT_FORM_SELECTOR): 

245 self._mfa_device_select() 

246 elif self.last_response_parsed.select_one(MFA_FORM_SELECTOR): 

247 self._mfa_submit() 

248 elif self.last_response_parsed.select_one( 

249 CAPTCHA_OTP_FORM_SELECTOR): 

250 self._captcha_otp_submit() 

251 else: 

252 raise AmazonOrdersAuthError( 

253 "An error occurred, this is an unknown page, or its parsed contents don't match a known auth flow: {}. To capture the page to a file, set the `debug` flag.".format( 

254 self.last_response.url)) 

255 

256 attempts += 1 

257 

258 if attempts == self.max_auth_attempts: 

259 raise AmazonOrdersAuthError( 

260 "Max authentication flow attempts reached.") 

261 

262 def logout(self) -> None: 

263 """ 

264 Logout and close the existing Amazon session and clear cookies. 

265 """ 

266 self.get("{}/gp/sign-out.html".format(BASE_URL)) 

267 

268 if os.path.exists(self.cookie_jar_path): 

269 os.remove(self.cookie_jar_path) 

270 

271 self.session.close() 

272 self.session = Session() 

273 

274 self.is_authenticated = False 

275 

276 def _sign_in(self) -> None: 

277 form = self.last_response_parsed.select_one(SIGN_IN_FORM_SELECTOR) 

278 data = self._build_from_form(form, 

279 additional_attrs={"email": self.username, 

280 "password": self.password, 

281 "rememberMe": "true"}) 

282 

283 self.request(form.get("method", "GET"), 

284 self._get_form_action(form), 

285 data=data) 

286 

287 self._handle_errors(critical=True) 

288 

289 def _mfa_device_select(self) -> None: 

290 form = self.last_response_parsed.select_one( 

291 MFA_DEVICE_SELECT_FORM_SELECTOR) 

292 contexts = form.select("input[name='otpDeviceContext']") 

293 

294 i = 1 

295 for field in contexts: 

296 self.io.echo("{}: {}".format(i, field["value"].strip())) 

297 i += 1 

298 otp_device = int( 

299 self.io.prompt( 

300 "--> Enter where you would like your one-time passcode sent", 

301 type=int)) 

302 self.io.echo("") 

303 

304 form = self.last_response_parsed.select_one( 

305 MFA_DEVICE_SELECT_FORM_SELECTOR) 

306 data = self._build_from_form(form, 

307 additional_attrs={"otpDeviceContext": 

308 contexts[ 

309 otp_device - 1]["value"]}) 

310 

311 self.request(form.get("method", "GET"), 

312 self._get_form_action(form), 

313 data=data) 

314 

315 self._handle_errors() 

316 

317 def _mfa_submit(self) -> None: 

318 otp = self.io.prompt( 

319 "--> Enter the one-time passcode sent to your device") 

320 self.io.echo("") 

321 

322 form = self.last_response_parsed.select_one(MFA_FORM_SELECTOR) 

323 data = self._build_from_form(form, 

324 additional_attrs={"otpCode": otp, 

325 "rememberDevice": ""}) 

326 

327 self.request(form.get("method", "GET"), 

328 self._get_form_action(form), 

329 data=data) 

330 

331 self._handle_errors() 

332 

333 def _captcha_submit(self, form_selector, solution_attr_key, 

334 error_div_class) -> None: 

335 form = self.last_response_parsed.select_one(form_selector) 

336 

337 solution = self._solve_captcha( 

338 form.find_parent().select_one("img")["src"]) 

339 

340 data = self._build_from_form(form, 

341 additional_attrs={ 

342 solution_attr_key: solution}) 

343 

344 self.request(form.get("method", "GET"), 

345 self._get_form_action(form), 

346 data=data) 

347 

348 self._handle_errors(error_div_class, "class") 

349 

350 def _captcha_otp_submit(self) -> None: 

351 otp = self.io.prompt( 

352 "--> Enter the one-time passcode sent to your device") 

353 self.io.echo("") 

354 

355 form = self.last_response_parsed.select_one(CAPTCHA_OTP_FORM_SELECTOR) 

356 data = self._build_from_form(form, 

357 additional_attrs={"otpCode": otp}) 

358 

359 self.request(form.get("method", "GET"), 

360 self._get_form_action(form), 

361 data=data) 

362 

363 self._handle_errors() 

364 

365 def _build_from_form(self, 

366 form: Tag, 

367 additional_attrs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 

368 data = {} 

369 for field in form.select("input"): 

370 try: 

371 data[field["name"]] = field["value"] 

372 except: 

373 pass 

374 if additional_attrs: 

375 data.update(additional_attrs) 

376 return data 

377 

378 def _get_form_action(self, 

379 form: Tag) -> str: 

380 action = form.get("action") 

381 if not action: 

382 return self.last_response.url 

383 elif not action.startswith("http"): 

384 if action.startswith("/"): 

385 parsed_url = urlparse(self.last_response.url) 

386 return "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc, 

387 action) 

388 else: 

389 return "{}/{}".format( 

390 "/".join(self.last_response.url.split("/")[:-1]), action) 

391 else: 

392 return action 

393 

394 def _get_page_from_url(self, 

395 url: str) -> str: 

396 page_name = os.path.basename(urlparse(url).path).strip(".html") 

397 i = 0 

398 while os.path.isfile("{}_{}".format(page_name, 0)): 

399 i += 1 

400 return "{}_{}.html".format(page_name, i) 

401 

402 def _handle_errors(self, 

403 error_div: str = "auth-error-message-box", 

404 attr_name: str = "id", 

405 critical: bool = False) -> None: 

406 error_div = self.last_response_parsed.select_one( 

407 "div[{}='{}']".format(attr_name, error_div)) 

408 if error_div: 

409 error_msg = "An error occurred: {}\n".format(error_div.text.strip()) 

410 

411 if critical: 

412 raise AmazonOrdersAuthError(error_msg) 

413 else: 

414 self.io.echo(error_msg, fg="red") 

415 

416 def _solve_captcha(self, 

417 url: str) -> str: 

418 captcha_response = AmazonCaptcha.fromlink(url).solve() 

419 if not captcha_response or captcha_response.lower() == "not solved": 

420 img_response = self.session.get(url) 

421 img = Image.open(BytesIO(img_response.content)) 

422 img.show() 

423 self.io.echo("Info: The Captcha couldn't be auto-solved.") 

424 captcha_response = self.io.prompt( 

425 "--> Enter the characters shown in the image") 

426 self.io.echo("") 

427 

428 return captcha_response