add travel notifier

This commit is contained in:
YuanYu 2024-03-31 19:48:36 +08:00
parent 78215fca70
commit 4d1dfab292
3 changed files with 53 additions and 13 deletions

View File

@ -7,8 +7,7 @@ from functools import cache
import time
class Clicker:
class AbstractChrome:
def __init__(self, headless: bool = True):
self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.add_argument(f'user-agent={self._user_aget}')
@ -26,15 +25,6 @@ class Clicker:
options=self.chrome_options
)
def click(self, auth_url: str):
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="set-primary-location-action"]')
element.click()
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)
def __enter__(self):
return self
@ -46,3 +36,27 @@ class Clicker:
def _user_aget(self):
ua = UserAgent()
return ua.googlechrome
class PrimaryLocationClicker(AbstractChrome):
def click(self, auth_url: str):
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="set-primary-location-action"]')
element.click()
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)
class TravelVerificationRetriever(AbstractChrome):
def get_code(self, auth_url: str) -> str:
self.driver.get(auth_url)
try:
element = self.driver.find_element(By.CSS_SELECTOR, '[data-uia="travel-verification-otp"]')
return element.text
except NoSuchElementException as e:
self.driver.save_screenshot(f'{time.time()}.png')
logger.error(e)

16
line_notifier.py Normal file
View File

@ -0,0 +1,16 @@
import requests
import os
class LineNotifier:
def __init__(self) -> None:
self.url = 'https://notify-api.line.me/api/notify'
self.headers = {
'Authorization': 'Bearer ' + os.environ['LINE_TOKEN']
}
def notify(self, msg: str):
data = {
'message': msg,
}
requests.post(self.url, headers=self.headers, data=data)

View File

@ -2,10 +2,14 @@ from aiosmtpd.controller import Controller
import quopri
from loguru import logger
import re
from clicker import Clicker
from browser import PrimaryLocationClicker, TravelVerificationClicker
import asyncio
from line_notifier import LineNotifier
URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/update-primary-location.*?)"', re.DOTALL)
VERIFY_URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/travel/verify.*?)"', re.DOTALL)
notifier = LineNotifier()
class NetflixHandler:
@ -23,13 +27,19 @@ class NetflixHandler:
envelope.content.decode('utf8', errors='replace')
)
if urls := URL_PATTERN.findall(data):
with Clicker() as browser:
with PrimaryLocationClicker() as browser:
url = urls[0]
logger.info(f'Found Update link: {url}')
try:
browser.click(url.decode())
except Exception as e:
logger.error(e)
elif urls := VERIFY_URL_PATTERN.findall(data):
with TravelVerificationClicker() as browser:
url = urls[0]
logger.info(f'Found Travel Verification link: {url}')
code = browser.get_code(url)
notifier.notify(f'代碼為 {code}')
elif 'google' in envelope.mail_from:
logger.info(data)