mirror of
				https://github.com/Akkudoktor-EOS/EOS.git
				synced 2025-10-24 19:36:21 +00:00 
			
		
		
		
	
		
			
	
	
		
			54 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			54 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import os | ||
|  | import subprocess | ||
|  | import sys | ||
|  | 
 | ||
|  | import pytest | ||
|  | from xprocess import ProcessStarter | ||
|  | 
 | ||
|  | 
 | ||
|  | @pytest.fixture | ||
|  | def server(xprocess): | ||
|  |     class Starter(ProcessStarter): | ||
|  |         # assure server to be installed | ||
|  |         try: | ||
|  |             subprocess.run( | ||
|  |                 [sys.executable, "-c", "import akkudoktoreosserver"], | ||
|  |                 check=True, | ||
|  |                 stdout=subprocess.PIPE, | ||
|  |                 stderr=subprocess.PIPE, | ||
|  |             ) | ||
|  |         except subprocess.CalledProcessError: | ||
|  |             test_dir = os.path.dirname(os.path.realpath(__file__)) | ||
|  |             project_dir = os.path.abspath(os.path.join(test_dir, "..")) | ||
|  |             subprocess.run( | ||
|  |                 [sys.executable, "-m", "pip", "install", "-e", project_dir], | ||
|  |                 check=True, | ||
|  |                 stdout=subprocess.PIPE, | ||
|  |                 stderr=subprocess.PIPE, | ||
|  |             ) | ||
|  | 
 | ||
|  |         # command to start server process | ||
|  |         args = [sys.executable, "-m", "akkudoktoreosserver.flask_server"] | ||
|  | 
 | ||
|  |         # startup pattern | ||
|  |         pattern = "Serving Flask app 'flask_server'" | ||
|  |         # search the first 12 lines for the startup pattern, if not found | ||
|  |         # a RuntimeError will be raised informing the user | ||
|  |         max_read_lines = 12 | ||
|  | 
 | ||
|  |         # will wait for 10 seconds before timing out | ||
|  |         timeout = 10 | ||
|  | 
 | ||
|  |         # xprocess will now attempt to clean up upon interruptions | ||
|  |         terminate_on_interrupt = True | ||
|  | 
 | ||
|  |     # ensure process is running and return its logfile | ||
|  |     logfile = xprocess.ensure("akkudoktoreosserver", Starter) | ||
|  | 
 | ||
|  |     # create url/port info to the server | ||
|  |     url = "http://127.0.0.1:8503" | ||
|  |     yield url | ||
|  | 
 | ||
|  |     # clean up whole process tree afterwards | ||
|  |     xprocess.getinfo("akkudoktoreosserver").terminate() |