refac: Email.py module with bugfix (#1058)

* refac: refactor Email.py module for clarity

* chore: rework ready method for dynamic config

* chore: implement resolvconf from v4.3.2-dev
This commit is contained in:
DaanSelen
2026-01-03 13:00:23 +01:00
committed by GitHub
parent 9a7b23748d
commit 17e23685ec
2 changed files with 79 additions and 62 deletions

View File

@@ -1463,7 +1463,7 @@ def API_Locale_Update():
@app.get(f'{APP_PREFIX}/api/email/ready') @app.get(f'{APP_PREFIX}/api/email/ready')
def API_Email_Ready(): def API_Email_Ready():
return ResponseObject(EmailSender.ready()) return ResponseObject(EmailSender.is_ready())
@app.post(f'{APP_PREFIX}/api/email/send') @app.post(f'{APP_PREFIX}/api/email/send')
def API_Email_Send(): def API_Email_Send():

View File

@@ -1,76 +1,93 @@
import os.path import os.path
import smtplib import smtplib
# Email libaries
from email import encoders from email import encoders
from email.header import Header
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr from email.utils import formatdate
class EmailSender: class EmailSender:
def __init__(self, DashboardConfig): def __init__(self, DashboardConfig):
self.smtp = None
self.DashboardConfig = DashboardConfig self.DashboardConfig = DashboardConfig
if not os.path.exists('./attachments'): if not os.path.exists('./attachments'):
os.mkdir('./attachments') os.mkdir('./attachments')
def Server(self): self.refresh_vals()
return self.DashboardConfig.GetConfig("Email", "server")[1]
def Port(self): def refresh_vals(self):
return self.DashboardConfig.GetConfig("Email", "port")[1] self.Server = self.DashboardConfig.GetConfig("Email", "server")[1]
self.Port = self.DashboardConfig.GetConfig("Email", "port")[1]
def Encryption(self): self.Encryption = self.DashboardConfig.GetConfig("Email", "encryption")[1]
return self.DashboardConfig.GetConfig("Email", "encryption")[1] self.AuthRequired = self.DashboardConfig.GetConfig("Email", "authentication_required")[1]
self.Username = self.DashboardConfig.GetConfig("Email", "username")[1]
self.Password = self.DashboardConfig.GetConfig("Email", "email_password")[1]
def Username(self): self.SendFrom = self.DashboardConfig.GetConfig("Email", "send_from")[1]
return self.DashboardConfig.GetConfig("Email", "username")[1]
def Password(self): def is_ready(self) -> bool:
return self.DashboardConfig.GetConfig("Email", "email_password")[1] self.refresh_vals()
def SendFrom(self): if self.AuthRequired:
return self.DashboardConfig.GetConfig("Email", "send_from")[1] ready = all([
self.Server, self.Port, self.Encryption,
self.Username, self.Password, self.SendFrom
])
else:
ready = all([
self.Server, self.Port, self.Encryption, self.SendFrom
])
return ready
# Thank you, @gdeeble from GitHub def send(self, receiver, subject, body, includeAttachment: bool = False, attachmentName: str = "") -> tuple[bool, str | None]:
def AuthenticationRequired(self): if not self.is_ready():
return self.DashboardConfig.GetConfig("Email", "authentication_required")[1] return False, "SMTP not configured"
def ready(self): message = MIMEMultipart()
if self.AuthenticationRequired(): message['Subject'] = subject
return all([self.Server(), self.Port(), self.Encryption(), self.Username(), self.Password(), self.SendFrom()]) message['From'] = self.SendFrom
return all([self.Server(), self.Port(), self.Encryption(), self.SendFrom()]) message["To"] = receiver
message["Date"] = formatdate(localtime=True)
message.attach(MIMEText(body, "plain"))
def send(self, receiver, subject, body, includeAttachment = False, attachmentName = "") -> tuple[bool, str] | tuple[bool, None]: if includeAttachment and len(attachmentName) > 0:
if self.ready(): attachmentPath = os.path.join('./attachments', attachmentName)
try:
self.smtp = smtplib.SMTP(self.Server(), port=int(self.Port()))
self.smtp.ehlo()
if self.Encryption() == "STARTTLS":
self.smtp.starttls()
if self.AuthenticationRequired():
self.smtp.login(self.Username(), self.Password())
message = MIMEMultipart()
message['Subject'] = subject
message['From'] = self.SendFrom()
message["To"] = receiver
message.attach(MIMEText(body, "plain"))
if includeAttachment and len(attachmentName) > 0: if not os.path.exists(attachmentPath):
attachmentPath = os.path.join('./attachments', attachmentName) return False, "Attachment does not exist"
if os.path.exists(attachmentPath):
attachment = MIMEBase("application", "octet-stream") attachment = MIMEBase("application", "octet-stream")
with open(os.path.join('./attachments', attachmentName), 'rb') as f: with open(os.path.join('./attachments', attachmentName), 'rb') as f:
attachment.set_payload(f.read()) attachment.set_payload(f.read())
encoders.encode_base64(attachment)
attachment.add_header("Content-Disposition", f"attachment; filename= {attachmentName}",) encoders.encode_base64(attachment)
message.attach(attachment) attachment.add_header("Content-Disposition", f"attachment; filename= {attachmentName}",)
else: message.attach(attachment)
self.smtp.close()
return False, "Attachment does not exist" smtp = None
self.smtp.sendmail(self.SendFrom(), receiver, message.as_string()) try:
self.smtp.close() smtp = smtplib.SMTP(self.Server, port=int(self.Port))
return True, None smtp.ehlo()
except Exception as e:
return False, f"Send failed | Reason: {e}" # Configure SMTP encryption type
return False, "SMTP not configured" if self.Encryption == "STARTTLS":
smtp.starttls()
smtp.ehlo()
# Log into the SMTP server if required
if self.AuthRequired:
smtp.login(self.Username, self.Password)
# Send the actual email from the SMTP object
smtp.sendmail(self.SendFrom, receiver, message.as_string())
return True, None
except Exception as e:
return False, f"Send failed | Reason: {e}"
finally:
if smtp:
smtp.quit()