Coverage for amazonorders/session.py: 89.53%

172 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-18 00:17 +0000

1import json 

2import logging 

3import os 

4from io import BytesIO 

5from typing import Optional, Any, Dict 

6from urllib.parse import urlparse 

7 

8import requests 

9from PIL import Image 

10from amazoncaptcha import AmazonCaptcha 

11from bs4 import BeautifulSoup, Tag 

12from requests import Session, Response 

13from requests.utils import dict_from_cookiejar 

14 

15from amazonorders.exception import AmazonOrdersAuthError 

16 

17__author__ = "Alex Laird" 

18__copyright__ = "Copyright 2024, Alex Laird" 

19__version__ = "1.0.1" 

20 

21logger = logging.getLogger(__name__) 

22 

23BASE_URL = "https://www.amazon.com" 

24BASE_HEADERS = { 

25 "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", 

26 "Accept-Encoding": "gzip, deflate, br", 

27 "Accept-Language": "en-US,en;q=0.9", 

28 "Cache-Control": "max-age=0", 

29 "Content-Type": "application/x-www-form-urlencoded", 

30 "Origin": BASE_URL, 

31 "Referer": "{}/ap/signin".format(BASE_URL), 

32 "Sec-Ch-Ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 

33 "Sec-Ch-Ua-Mobile": "?0", 

34 "Sec-Ch-Ua-Platform": "macOS", 

35 "Sec-Ch-Viewport-Width": "1393", 

36 "Sec-Fetch-Dest": "document", 

37 "Sec-Fetch-Mode": "navigate", 

38 "Sec-Fetch-Site": "same-origin", 

39 "Sec-Fetch-User": "?1", 

40 "Viewport-Width": "1393", 

41 "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", 

42} 

43SIGN_IN_FORM_NAME = "signIn" 

44MFA_DEVICE_SELECT_FORM_ID = "auth-select-device-form" 

45MFA_FORM_ID = "auth-mfa-form" 

46CAPTCHA_1_DIV_ID = "cvf-page-content" 

47CAPTCHA_1_FORM_CLASS = "cvf-widget-form" 

48CAPTCHA_2_INPUT_ID = "captchacharacters" 

49 

50DEFAULT_COOKIE_JAR_PATH = os.path.join(os.path.expanduser("~"), ".config", "amazon-orders", "cookies.json") 

51 

52 

53class AmazonSession: 

54 """ 

55 

56 """ 

57 

58 def __init__(self, 

59 username: str, 

60 password: str, 

61 debug: bool = False, 

62 max_auth_attempts: int = 10, 

63 cookie_jar_path: str = None) -> None: 

64 if not cookie_jar_path: 

65 cookie_jar_path = DEFAULT_COOKIE_JAR_PATH 

66 

67 #: An Amazon username. 

68 self.username: str = username 

69 #: An Amazon password. 

70 self.password: str = password 

71 

72 #: Set logger ``DEBUG``, send output to ``stderr``, and write an HTML file for each request made on the session. 

73 self.debug: bool = debug 

74 if self.debug: 

75 logger.setLevel(logging.DEBUG) 

76 #: Will continue in :func:`login()`'s auth flow this many times. 

77 self.max_auth_attempts: int = max_auth_attempts 

78 #: The path to persist session cookies, defaults to ``conf.DEFAULT_COOKIE_JAR_PATH``. 

79 self.cookie_jar_path: str = cookie_jar_path 

80 

81 #: 

82 self.session: Session = Session() 

83 #: 

84 self.last_response: Optional[Response] = None 

85 #: 

86 self.last_response_parsed: Optional[Tag] = None 

87 #: If :func:`login()` has been executed and successfully logged in the session. 

88 self.is_authenticated: bool = False 

89 

90 cookie_dir = os.path.dirname(self.cookie_jar_path) 

91 if not os.path.exists(cookie_dir): 

92 os.makedirs(cookie_dir) 

93 if os.path.exists(self.cookie_jar_path): 

94 with open(cookie_jar_path, "r", encoding="utf-8") as f: 

95 data = json.loads(f.read()) 

96 cookies = requests.utils.cookiejar_from_dict(data) 

97 self.session.cookies.update(cookies) 

98 

99 def request(self, 

100 method: str, 

101 url: str, 

102 **kwargs: Any) -> Response: 

103 """ 

104 

105 :param method: The request method to execute. 

106 :param url: The URL to execute ``method`` on. 

107 :param kwargs: Remaining ``kwargs`` will be passed to :func:`requests.request`. 

108 :return: The Response from the executed request. 

109 """ 

110 if "headers" not in kwargs: 

111 kwargs["headers"] = {} 

112 kwargs["headers"].update(BASE_HEADERS) 

113 

114 logger.debug("{} request to {}".format(method, url)) 

115 

116 self.last_response = self.session.request(method, url, **kwargs) 

117 self.last_response_parsed = BeautifulSoup(self.last_response.text, 

118 "html.parser") 

119 

120 cookies = dict_from_cookiejar(self.session.cookies) 

121 if os.path.exists(self.cookie_jar_path): 

122 os.remove(self.cookie_jar_path) 

123 with open(self.cookie_jar_path, "w", encoding="utf-8") as f: 

124 f.write(json.dumps(cookies)) 

125 

126 logger.debug("Response: {} - {}".format(self.last_response.url, 

127 self.last_response.status_code)) 

128 

129 if self.debug: 

130 page_name = self._get_page_from_url(self.last_response.url) 

131 with open(page_name, "w", encoding="utf-8") as html_file: 

132 logger.debug( 

133 "Response written to file: {}".format(html_file.name)) 

134 html_file.write(self.last_response.text) 

135 

136 return self.last_response 

137 

138 def get(self, 

139 url: str, 

140 **kwargs: Any): 

141 """ 

142 

143 :param url: The URL to GET on. 

144 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`. 

145 :return: The Response from the executed GET request. 

146 """ 

147 return self.request("GET", url, **kwargs) 

148 

149 def post(self, 

150 url, 

151 **kwargs: Any) -> Response: 

152 """ 

153 

154 :param url: The URL to POST on. 

155 :param kwargs: Remaining ``kwargs`` will be passed to :func:`AmazonSession.request`. 

156 :return: The Response from the executed POST request. 

157 """ 

158 return self.request("POST", url, **kwargs) 

159 

160 def auth_cookies_stored(self): 

161 cookies = dict_from_cookiejar(self.session.cookies) 

162 return cookies.get("session-token") and cookies.get("x-main") 

163 

164 def login(self) -> None: 

165 """ 

166 Execute an Amazon login process. This will include the sign-in page, and may also include Captcha challenges 

167 and OTP pages (of 2FA authentication is enabled on your account). 

168 

169 If successful, ``is_authenticated`` will be set to ``True``. 

170 

171 Session cookies are persisted, and if existing session data is found during this auth flow, it will be 

172 skipped entirely and flagged as authenticated. 

173 """ 

174 self.get("{}/gp/sign-in.html".format(BASE_URL)) 

175 

176 attempts = 0 

177 while not self.is_authenticated and attempts < self.max_auth_attempts: 

178 if self.auth_cookies_stored() or \ 

179 ("Hello, sign in" not in self.last_response.text and 

180 "nav-item-signout" in self.last_response.text): 

181 self.is_authenticated = True 

182 break 

183 

184 if self._is_field_found(SIGN_IN_FORM_NAME): 

185 self._sign_in() 

186 elif self._is_field_found(CAPTCHA_1_FORM_CLASS, field_key="class"): 

187 self._captcha_1_submit() 

188 elif self.last_response_parsed.find("input", 

189 id=lambda 

190 value: value and value.startswith( 

191 CAPTCHA_2_INPUT_ID)): 

192 self._captcha_2_submit() 

193 elif self._is_field_found(MFA_DEVICE_SELECT_FORM_ID, 

194 field_key="id"): 

195 self._mfa_device_select() 

196 elif self._is_field_found(MFA_FORM_ID, field_key="id"): 

197 self._mfa_submit() 

198 else: 

199 raise AmazonOrdersAuthError( 

200 "An error occurred, this is an unknown page: {}. To capture the page to a file, set the `debug` flag.".format( 

201 self.last_response.url)) 

202 

203 attempts += 1 

204 

205 if attempts == self.max_auth_attempts: 

206 raise AmazonOrdersAuthError( 

207 "Max authentication flow attempts reached.") 

208 

209 def logout(self) -> None: 

210 """ 

211 Logout of the existing Amazon session and clear cookies. 

212 """ 

213 self.get("{}/gp/sign-out.html".format(BASE_URL)) 

214 

215 if os.path.exists(self.cookie_jar_path): 

216 os.remove(self.cookie_jar_path) 

217 

218 self.session.close() 

219 self.session = Session() 

220 

221 self.is_authenticated = False 

222 

223 def _sign_in(self) -> None: 

224 form = self.last_response_parsed.find("form", 

225 {"name": SIGN_IN_FORM_NAME}) 

226 data = self._build_from_form(form, 

227 additional_attrs={"email": self.username, 

228 "password": self.password, 

229 "rememberMe": "true"}) 

230 

231 self.request(form.attrs.get("method", "GET"), 

232 self._get_form_action(form), 

233 data=data) 

234 

235 self._handle_errors(critical=True) 

236 

237 def _mfa_device_select(self) -> None: 

238 form = self.last_response_parsed.find("form", 

239 {"id": MFA_DEVICE_SELECT_FORM_ID}) 

240 contexts = form.find_all("input", {"name": "otpDeviceContext"}) 

241 i = 1 

242 for field in contexts: 

243 print("{}: {}".format(i, field.attrs["value"].strip())) 

244 i += 1 

245 otp_device = int( 

246 input("Where would you like your one-time passcode sent? ")) 

247 

248 form = self.last_response_parsed.find("form", 

249 id=MFA_DEVICE_SELECT_FORM_ID) 

250 data = self._build_from_form(form, 

251 additional_attrs={"otpDeviceContext": 

252 contexts[ 

253 otp_device - 1].attrs[ 

254 "value"]}) 

255 

256 self.request(form.attrs.get("method", "GET"), 

257 self._get_form_action(form), 

258 data=data) 

259 

260 self._handle_errors() 

261 

262 def _mfa_submit(self) -> None: 

263 otp = input("Enter the one-time passcode sent to your device: ") 

264 

265 form = self.last_response_parsed.find("form", id=MFA_FORM_ID) 

266 data = self._build_from_form(form, 

267 additional_attrs={"otpCode": otp, 

268 "rememberDevice": ""}) 

269 

270 self.request(form.attrs.get("method", "GET"), 

271 self._get_form_action(form), 

272 data=data) 

273 

274 self._handle_errors() 

275 

276 def _captcha_1_submit(self) -> None: 

277 captcha_div = self.last_response_parsed.find("div", 

278 {"id": CAPTCHA_1_DIV_ID}) 

279 

280 solution = self._solve_captcha( 

281 captcha_div.find("img", {"alt": "captcha"}).attrs["src"]) 

282 

283 form = self.last_response_parsed.find("form", 

284 {"class": CAPTCHA_1_FORM_CLASS}) 

285 data = self._build_from_form(form, 

286 additional_attrs={ 

287 "cvf_captcha_input": solution}) 

288 

289 self.request(form.attrs.get("method", "GET"), 

290 self._get_form_action(form, 

291 prefix="{}/ap/cvf/".format( 

292 BASE_URL)), 

293 data=data) 

294 

295 self._handle_errors("cvf-widget-alert", "class") 

296 

297 def _captcha_2_submit(self) -> None: 

298 form = self.last_response_parsed.find("input", 

299 id=lambda 

300 value: value and value.startswith( 

301 CAPTCHA_2_INPUT_ID)).find_parent( 

302 "form") 

303 

304 solution = self._solve_captcha(form.find("img").attrs["src"]) 

305 

306 data = self._build_from_form(form, 

307 additional_attrs={ 

308 "field-keywords": solution}) 

309 

310 self.request(form.attrs.get("method", "GET"), 

311 self._get_form_action(form, 

312 prefix=BASE_URL), 

313 params=data) 

314 

315 self._handle_errors("a-alert-info", "class") 

316 

317 def _build_from_form(self, 

318 form: Tag, 

319 additional_attrs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 

320 data = {} 

321 for field in form.find_all("input"): 

322 try: 

323 data[field["name"]] = field["value"] 

324 except: 

325 pass 

326 if additional_attrs: 

327 data.update(additional_attrs) 

328 return data 

329 

330 def _get_form_action(self, 

331 form: Tag, 

332 prefix: Optional[str] = None) -> str: 

333 action = form.attrs.get("action") 

334 if not action: 

335 action = self.last_response.url 

336 if prefix and not action.startswith("http"): 

337 action = prefix + action 

338 return action 

339 

340 def _is_field_found(self, 

341 field_value: str, 

342 field_type: str = "form", 

343 field_key: str = "name") -> bool: 

344 return self.last_response_parsed.find(field_type, { 

345 field_key: field_value}) is not None 

346 

347 def _get_page_from_url(self, 

348 url: str) -> str: 

349 page_name = os.path.basename(urlparse(url).path).strip(".html") 

350 i = 0 

351 while os.path.isfile("{}_{}".format(page_name, 0)): 

352 i += 1 

353 return "{}_{}.html".format(page_name, i) 

354 

355 def _handle_errors(self, 

356 error_div: str = "auth-error-message-box", 

357 attr_name: str = "id", 

358 critical: bool = False) -> None: 

359 error_div = self.last_response_parsed.find("div", 

360 {attr_name: error_div}) 

361 if error_div: 

362 error_msg = "An error occurred: {}".format(error_div.text.strip()) 

363 

364 if critical: 

365 raise AmazonOrdersAuthError(error_msg) 

366 else: 

367 print(error_msg) 

368 

369 def _solve_captcha(self, 

370 url: str) -> str: 

371 captcha_response = AmazonCaptcha.fromlink(url).solve() 

372 if not captcha_response or captcha_response.lower() == "not solved": 

373 img_response = self.session.get(url) 

374 img = Image.open(BytesIO(img_response.content)) 

375 img.show() 

376 captcha_response = input( 

377 "The Captcha couldn't be auto-solved, enter the characters shown in the image: ") 

378 

379 return captcha_response