add travel code notify

This commit is contained in:
YuanYu 2024-03-31 18:15:17 +08:00
parent 78215fca70
commit bfcbd72a10
4 changed files with 54 additions and 13 deletions

1
.env.example Normal file
View File

@ -0,0 +1 @@
LINE_TOKEN=

View File

@ -7,8 +7,7 @@ from functools import cache
import time import time
class Clicker: class AbstractChrome:
def __init__(self, headless: bool = True): def __init__(self, headless: bool = True):
self.chrome_options = webdriver.ChromeOptions() self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.add_argument(f'user-agent={self._user_aget}') self.chrome_options.add_argument(f'user-agent={self._user_aget}')
@ -26,15 +25,6 @@ class Clicker:
options=self.chrome_options options=self.chrome_options
) )
def click(self, auth_url: str):
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="set-primary-location-action"]')
element.click()
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)
def __enter__(self): def __enter__(self):
return self return self
@ -46,3 +36,27 @@ class Clicker:
def _user_aget(self): def _user_aget(self):
ua = UserAgent() ua = UserAgent()
return ua.googlechrome return ua.googlechrome
class PrimaryLocationClicker(AbstractChrome):
def click(self, auth_url: str):
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="set-primary-location-action"]')
element.click()
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)
class TravelVerificationRetriever(AbstractChrome):
def get_code(self, auth_url: str) -> str:
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="travel-verification-otp"]')
return element.text
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)

16
line_notifier.py Normal file
View File

@ -0,0 +1,16 @@
import requests
import os
class LineNotifier:
def __init__(self) -> None:
self.url = 'https://notify-api.line.me/api/notify'
self.headers = {
'Authorization': 'Bearer ' + os.environ['LINE_TOKEN']
}
def notify(self, msg: str):
data = {
'message': msg,
}
requests.post(self.url, headers=self.headers, data=data)

View File

@ -2,10 +2,14 @@ from aiosmtpd.controller import Controller
import quopri import quopri
from loguru import logger from loguru import logger
import re import re
from clicker import Clicker from browser import PrimaryLocationClicker, TravelVerificationClicker
import asyncio import asyncio
from line_notifier import LineNotifier
URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/update-primary-location.*?)"', re.DOTALL) URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/update-primary-location.*?)"', re.DOTALL)
VERIFY_URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/travel/verify.*?)"', re.DOTALL)
notifier = LineNotifier()
class NetflixHandler: class NetflixHandler:
@ -23,13 +27,19 @@ class NetflixHandler:
envelope.content.decode('utf8', errors='replace') envelope.content.decode('utf8', errors='replace')
) )
if urls := URL_PATTERN.findall(data): if urls := URL_PATTERN.findall(data):
with Clicker() as browser: with PrimaryLocationClicker() as browser:
url = urls[0] url = urls[0]
logger.info(f'Found Update link: {url}') logger.info(f'Found Update link: {url}')
try: try:
browser.click(url.decode()) browser.click(url.decode())
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
elif urls := VERIFY_URL_PATTERN.findall(data):
with TravelVerificationClicker() as browser:
url = urls[0]
logger.info(f'Found Travel Verification link: {url}')
code = browser.get_code(url)
notifier.notify(f'代碼為 {code}')
elif 'google' in envelope.mail_from: elif 'google' in envelope.mail_from:
logger.info(data) logger.info(data)