netflix-clicker/mail_server.py

47 lines
1.5 KiB
Python

from aiosmtpd.controller import Controller
import quopri
from loguru import logger
import re
from clicker import Clicker
import asyncio
URL_PATTERN = re.compile(b'"(https://www.netflix.com/account/update-primary-location.*?)"', re.DOTALL)
class NetflixHandler:
async def handle_RCPT(self, session, envelope, address, rcpt_options):
if 'netflix_clicker' not in address:
logger.info(f'Rejected mail from {envelope.mail_from}')
return '550 not relaying to that domain'
envelope.rcpt_tos.append(address)
logger.info(f'Accepted mail from {envelope.mail_from}')
return '250 OK'
async def handle_DATA(self, session, envelope, *args):
data = quopri.decodestring(
envelope.content.decode('utf8', errors='replace')
)
if urls := URL_PATTERN.findall(data):
with Clicker() as browser:
url = urls[0]
logger.info(f'Found Update link: {url}')
try:
browser.click(url.decode())
except Exception as e:
logger.error(e)
elif 'google' in envelope.mail_from:
logger.info(data)
return '250 Message accepted for delivery'
server = Controller(handler=NetflixHandler, hostname='0.0.0.0', port=1025)
server.start()
logger.info('Netflix-clicker start.')
while True:
try:
asyncio.run(asyncio.sleep(3))
except KeyboardInterrupt:
server.stop()