add DNS version tracking and download functionality to ClusterWorker

This commit is contained in:
Eduardo Silva
2025-12-31 20:01:00 -03:00
parent a93acf3fec
commit 170e153f2b

View File

@@ -17,12 +17,14 @@ logger = logging.getLogger(__name__)
MASTER_SERVER_ADDRESS = os.environ.get('MASTER_SERVER_ADDRESS') MASTER_SERVER_ADDRESS = os.environ.get('MASTER_SERVER_ADDRESS')
TOKEN = os.environ.get('TOKEN') TOKEN = os.environ.get('TOKEN')
WIREGUARD_DIR = '/etc/wireguard' WIREGUARD_DIR = '/etc/wireguard'
DNS_DIR = '/etc/dnsmasq'
WORKER_VERSION = 10 WORKER_VERSION = 10
REQUEST_TIMEOUT = 10 REQUEST_TIMEOUT = 10
class ClusterWorker: class ClusterWorker:
def __init__(self): def __init__(self):
self.config_version = 0 self.config_version = 0
self.dns_version = self.get_local_dns_version()
self.should_run = True self.should_run = True
self.session = requests.Session() self.session = requests.Session()
@@ -62,13 +64,26 @@ class ClusterWorker:
except Exception as e: except Exception as e:
logger.error(f"Error flushing firewall: {e}") logger.error(f"Error flushing firewall: {e}")
def get_local_dns_version(self):
try:
version_file = os.path.join(DNS_DIR, 'config_version.conf')
if os.path.exists(version_file):
with open(version_file, 'r') as f:
for line in f:
if line.startswith('DNS_VERSION='):
return int(line.strip().split('=')[1])
except Exception as e:
logger.error(f"Error reading DNS version: {e}")
return 0
def get_status(self): def get_status(self):
params = { params = {
'token': TOKEN, 'token': TOKEN,
'worker_config_version': self.config_version, 'worker_config_version': self.config_version,
'worker_dns_version': self.dns_version,
'worker_version': WORKER_VERSION 'worker_version': WORKER_VERSION
} }
logger.info(f"Requesting status from Master... (Config Version: {self.config_version}, Worker Version: {WORKER_VERSION})") logger.info(f"Requesting status from Master... (Config Version: {self.config_version}, DNS Version: {self.dns_version}, Worker Version: {WORKER_VERSION})")
try: try:
response = self.session.get(f"{self.base_url}/status/", params=params, timeout=REQUEST_TIMEOUT) response = self.session.get(f"{self.base_url}/status/", params=params, timeout=REQUEST_TIMEOUT)
logger.info(f"Status response received. HTTP Code: {response.status_code}") logger.info(f"Status response received. HTTP Code: {response.status_code}")
@@ -81,6 +96,7 @@ class ClusterWorker:
params = { params = {
'token': TOKEN, 'token': TOKEN,
'worker_config_version': self.config_version, 'worker_config_version': self.config_version,
'worker_dns_version': self.dns_version,
'worker_version': WORKER_VERSION 'worker_version': WORKER_VERSION
} }
try: try:
@@ -94,6 +110,40 @@ class ClusterWorker:
logger.error(f"Error downloading configs: {e}") logger.error(f"Error downloading configs: {e}")
return None return None
def download_dns_config(self):
params = {
'token': TOKEN,
'worker_config_version': self.config_version,
'worker_dns_version': self.dns_version,
'worker_version': WORKER_VERSION
}
try:
logger.info(f"Downloading DNS config... (Current Version: {self.dns_version})")
response = self.session.get(f"{self.base_url}/worker/get_dnsmasq_config/", params=params, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
tar_path = 'dnsmasq_config.tar.gz'
with open(tar_path, 'wb') as f:
f.write(response.content)
logger.info("Extracting DNS config...")
if not os.path.exists(DNS_DIR):
os.makedirs(DNS_DIR, exist_ok=True)
subprocess.run(['tar', 'xvfz', tar_path, '-C', DNS_DIR], check=True, capture_output=True)
if os.path.exists(tar_path):
os.remove(tar_path)
# Update version
self.dns_version = self.get_local_dns_version()
logger.info(f"DNS config updated (New Version: {self.dns_version})")
return True
else:
logger.error(f"Failed to download DNS config. Status: {response.status_code}")
return False
except Exception as e:
logger.error(f"Error updating DNS config: {e}")
return False
def apply_configs(self, data): def apply_configs(self, data):
logger.info("Applying new configurations...") logger.info("Applying new configurations...")
files = data.get('files', {}) files = data.get('files', {})
@@ -158,15 +208,27 @@ class ClusterWorker:
data = response.json() data = response.json()
remote_config_version = data.get('cluster_settings', {}).get('config_version', 0) remote_config_version = data.get('cluster_settings', {}).get('config_version', 0)
# Check WireGuard Config
if remote_config_version != self.config_version: if remote_config_version != self.config_version:
logger.info(f"Config version mismatch (Local: {self.config_version}, Remote: {remote_config_version}). Updating...") logger.info(f"Config version mismatch (Local: {self.config_version}, Remote: {remote_config_version}). Updating...")
config_data = self.download_configs() config_data = self.download_configs()
if config_data: if config_data:
self.apply_configs(config_data) self.apply_configs(config_data)
logger.info("Sending post-update status to Master...")
self.get_status()
else: else:
logger.error("Failed to download config files.") logger.error("Failed to download config files.")
else:
logger.info(f"No changes detected. Configuration is up to date (Version: {self.config_version}).") # Check DNS Config
remote_dns_version = data.get('cluster_settings', {}).get('dns_version', 0)
if remote_dns_version != self.dns_version:
logger.info(f"DNS version mismatch (Local: {self.dns_version}, Remote: {remote_dns_version}). Updating...")
if self.download_dns_config():
logger.info("Sending post-update status to Master...")
self.get_status()
if remote_config_version == self.config_version and remote_dns_version == self.dns_version:
logger.info(f"No changes detected. Configuration is up to date (WG: {self.config_version}, DNS: {self.dns_version}).")
except Exception as e: except Exception as e:
logger.error(f"Unexpected error in main loop: {e}") logger.error(f"Unexpected error in main loop: {e}")