2025-12-30 16:33:51 -03:00
import glob
import logging
import os
import subprocess
import time
import requests
# Configure logging
logging . basicConfig (
level = logging . INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s '
)
logger = logging . getLogger ( __name__ )
# Constants
MASTER_SERVER_ADDRESS = os . environ . get ( ' MASTER_SERVER_ADDRESS ' )
TOKEN = os . environ . get ( ' TOKEN ' )
WIREGUARD_DIR = ' /etc/wireguard '
2025-12-31 20:01:00 -03:00
DNS_DIR = ' /etc/dnsmasq '
2025-12-30 16:33:51 -03:00
WORKER_VERSION = 10
REQUEST_TIMEOUT = 10
class ClusterWorker :
def __init__ ( self ) :
self . config_version = 0
2025-12-31 20:01:00 -03:00
self . dns_version = self . get_local_dns_version ( )
2025-12-30 16:33:51 -03:00
self . should_run = True
self . session = requests . Session ( )
if not MASTER_SERVER_ADDRESS or not TOKEN :
logger . error ( " MASTER_SERVER_ADDRESS or TOKEN not set " )
self . should_run = False
self . base_url = f " https:// { MASTER_SERVER_ADDRESS } /api/cluster "
# Validate URL scheme
if not MASTER_SERVER_ADDRESS . startswith ( ( ' http:// ' , ' https:// ' ) ) :
self . base_url = f " https:// { MASTER_SERVER_ADDRESS } /api/cluster "
else :
self . base_url = f " { MASTER_SERVER_ADDRESS } /api/cluster "
def cleanup_wireguard ( self ) :
logger . info ( " Cleaning up WireGuard configuration... " )
# Stop all wireguard interfaces
try :
files = glob . glob ( f " { WIREGUARD_DIR } /*.conf " )
for f in files :
interface = os . path . basename ( f ) . replace ( ' .conf ' , ' ' )
subprocess . run ( [ ' wg-quick ' , ' down ' , interface ] , capture_output = True )
except Exception as e :
logger . error ( f " Error stopping wireguard interfaces: { e } " )
# Remove files
try :
for f in glob . glob ( f " { WIREGUARD_DIR } /* " ) :
os . remove ( f )
except Exception as e :
logger . error ( f " Error cleaning directory: { e } " )
try :
subprocess . run ( [ ' iptables ' , ' -t ' , ' nat ' , ' -F ' , ' WGWADM_POSTROUTING ' ] , capture_output = True )
subprocess . run ( [ ' iptables ' , ' -t ' , ' nat ' , ' -F ' , ' WGWADM_PREROUTING ' ] , capture_output = True )
subprocess . run ( [ ' iptables ' , ' -t ' , ' filter ' , ' -F ' , ' WGWADM_FORWARD ' ] , capture_output = True )
except Exception as e :
logger . error ( f " Error flushing firewall: { e } " )
2025-12-31 20:01:00 -03:00
def get_local_dns_version ( self ) :
try :
version_file = os . path . join ( DNS_DIR , ' config_version.conf ' )
if os . path . exists ( version_file ) :
with open ( version_file , ' r ' ) as f :
for line in f :
if line . startswith ( ' DNS_VERSION= ' ) :
return int ( line . strip ( ) . split ( ' = ' ) [ 1 ] )
except Exception as e :
logger . error ( f " Error reading DNS version: { e } " )
return 0
2025-12-30 16:33:51 -03:00
def get_status ( self ) :
params = {
' token ' : TOKEN ,
' worker_config_version ' : self . config_version ,
2025-12-31 20:01:00 -03:00
' worker_dns_version ' : self . dns_version ,
2025-12-30 16:33:51 -03:00
' worker_version ' : WORKER_VERSION
}
2025-12-31 20:01:00 -03:00
logger . info ( f " Requesting status from Master... (Config Version: { self . config_version } , DNS Version: { self . dns_version } , Worker Version: { WORKER_VERSION } ) " )
2025-12-30 16:33:51 -03:00
try :
response = self . session . get ( f " { self . base_url } /status/ " , params = params , timeout = REQUEST_TIMEOUT )
2025-12-31 16:13:43 -03:00
logger . info ( f " Status response received. HTTP Code: { response . status_code } " )
2025-12-30 16:33:51 -03:00
return response
except requests . RequestException as e :
2025-12-31 16:13:43 -03:00
logger . error ( f " Connection error while getting status: { e } " )
2025-12-30 16:33:51 -03:00
return None
def download_configs ( self ) :
params = {
' token ' : TOKEN ,
' worker_config_version ' : self . config_version ,
2025-12-31 20:01:00 -03:00
' worker_dns_version ' : self . dns_version ,
2025-12-30 16:33:51 -03:00
' worker_version ' : WORKER_VERSION
}
try :
response = self . session . get ( f " { self . base_url } /worker/get_config_files/ " , params = params , timeout = REQUEST_TIMEOUT )
if response . status_code == 200 :
data = response . json ( )
if data . get ( ' status ' ) == ' success ' :
return data
return None
except requests . RequestException as e :
logger . error ( f " Error downloading configs: { e } " )
return None
2025-12-31 20:01:00 -03:00
def download_dns_config ( self ) :
params = {
' token ' : TOKEN ,
' worker_config_version ' : self . config_version ,
' worker_dns_version ' : self . dns_version ,
' worker_version ' : WORKER_VERSION
}
try :
logger . info ( f " Downloading DNS config... (Current Version: { self . dns_version } ) " )
response = self . session . get ( f " { self . base_url } /worker/get_dnsmasq_config/ " , params = params , timeout = REQUEST_TIMEOUT )
if response . status_code == 200 :
tar_path = ' dnsmasq_config.tar.gz '
with open ( tar_path , ' wb ' ) as f :
f . write ( response . content )
logger . info ( " Extracting DNS config... " )
if not os . path . exists ( DNS_DIR ) :
os . makedirs ( DNS_DIR , exist_ok = True )
subprocess . run ( [ ' tar ' , ' xvfz ' , tar_path , ' -C ' , DNS_DIR ] , check = True , capture_output = True )
if os . path . exists ( tar_path ) :
os . remove ( tar_path )
# Update version
self . dns_version = self . get_local_dns_version ( )
logger . info ( f " DNS config updated (New Version: { self . dns_version } ) " )
return True
else :
logger . error ( f " Failed to download DNS config. Status: { response . status_code } " )
return False
except Exception as e :
logger . error ( f " Error updating DNS config: { e } " )
return False
2025-12-31 20:34:16 -03:00
def send_ping ( self ) :
params = {
' token ' : TOKEN ,
' worker_config_version ' : self . config_version ,
' worker_dns_version ' : self . dns_version ,
' worker_version ' : WORKER_VERSION
}
try :
logger . info ( " Sending ping to Master... " )
self . session . get ( f " { self . base_url } /worker/ping/ " , params = params , timeout = REQUEST_TIMEOUT )
except Exception as e :
logger . error ( f " Error sending ping: { e } " )
2025-12-30 16:33:51 -03:00
def apply_configs ( self , data ) :
logger . info ( " Applying new configurations... " )
files = data . get ( ' files ' , { } )
cluster_settings = data . get ( ' cluster_settings ' , { } )
new_config_version = cluster_settings . get ( ' config_version ' , 0 )
# 1. Stop existing interfaces
self . cleanup_wireguard ( )
# 2. Write new files
for filename , content in files . items ( ) :
2025-12-31 17:08:36 -03:00
if filename == ' wg-firewall.sh ' and isinstance ( content , str ) :
content = content . replace ( ' wireguard-webadmin-dns ' , ' cluster-node-dns ' )
2025-12-30 16:33:51 -03:00
filepath = os . path . join ( WIREGUARD_DIR , filename )
with open ( filepath , ' w ' ) as f :
f . write ( content )
if filename == ' wg-firewall.sh ' :
os . chmod ( filepath , 0o755 )
# Start interfaces
conf_files = glob . glob ( f " { WIREGUARD_DIR } /*.conf " )
for conf in conf_files :
interface = os . path . basename ( conf ) . replace ( ' .conf ' , ' ' )
logger . info ( f " Starting WireGuard interface: { interface } " )
try :
subprocess . run ( [ ' wg-quick ' , ' up ' , interface ] , check = True , capture_output = True )
except subprocess . CalledProcessError as e :
logger . error ( f " Failed to start { interface } : { e . stderr . decode ( ) } " )
# 4. Update config version
self . config_version = new_config_version
logger . info ( f " Configuration updated to version { self . config_version } " )
def run ( self ) :
if not self . should_run :
return
logger . info ( " Cluster Worker starting... " )
# Initial cleanup
self . cleanup_wireguard ( )
while True :
try :
response = self . get_status ( )
2025-12-31 16:13:43 -03:00
if response is not None :
2025-12-30 16:33:51 -03:00
if response . status_code == 403 :
2025-12-31 16:13:43 -03:00
logger . error ( " Received 403 Forbidden (Token invalid/deleted). Deactivating WireGuard and stopping requests permanently. " )
2025-12-30 16:33:51 -03:00
self . cleanup_wireguard ( )
2025-12-31 16:13:43 -03:00
self . config_version = 0
2025-12-30 16:33:51 -03:00
self . should_run = False
break
2025-12-31 16:13:43 -03:00
if response . status_code == 400 :
logger . warning ( " Received 400 Bad Request (Worker suspended or error). Deactivating WireGuard and Firewall, but will keep retrying... " )
self . cleanup_wireguard ( )
self . config_version = 0
2025-12-30 16:33:51 -03:00
if response . status_code == 200 :
data = response . json ( )
remote_config_version = data . get ( ' cluster_settings ' , { } ) . get ( ' config_version ' , 0 )
2025-12-31 20:01:00 -03:00
# Check WireGuard Config
2025-12-31 20:34:16 -03:00
if int ( remote_config_version ) != self . config_version :
2025-12-30 16:33:51 -03:00
logger . info ( f " Config version mismatch (Local: { self . config_version } , Remote: { remote_config_version } ). Updating... " )
config_data = self . download_configs ( )
if config_data :
self . apply_configs ( config_data )
2025-12-31 20:34:16 -03:00
self . send_ping ( )
continue
2025-12-30 16:33:51 -03:00
else :
logger . error ( " Failed to download config files. " )
2025-12-31 20:01:00 -03:00
# Check DNS Config
2025-12-31 20:34:16 -03:00
remote_dns_version = int ( data . get ( ' cluster_settings ' , { } ) . get ( ' dns_version ' , 0 ) )
2025-12-31 20:01:00 -03:00
if remote_dns_version != self . dns_version :
logger . info ( f " DNS version mismatch (Local: { self . dns_version } , Remote: { remote_dns_version } ). Updating... " )
if self . download_dns_config ( ) :
2025-12-31 20:34:16 -03:00
self . send_ping ( )
continue
2025-12-31 20:01:00 -03:00
2025-12-31 20:34:16 -03:00
if int ( remote_config_version ) == self . config_version and remote_dns_version == self . dns_version :
2025-12-31 20:01:00 -03:00
logger . info ( f " No changes detected. Configuration is up to date (WG: { self . config_version } , DNS: { self . dns_version } ). " )
2025-12-30 16:33:51 -03:00
except Exception as e :
logger . error ( f " Unexpected error in main loop: { e } " )
2025-12-31 16:13:43 -03:00
interval = 60
logger . info ( f " Waiting { interval } seconds for next check... " )
time . sleep ( interval )
2025-12-30 16:33:51 -03:00
# Final loop state if 403 was received
while not self . should_run :
logger . info ( " Worker disabled due to 403 Forbidden. WireGuard is off. " )
time . sleep ( 60 )
if __name__ == " __main__ " :
worker = ClusterWorker ( )
worker . run ( )