from configuration import read_config # region Logger import logging from debug import setup_logging, catch_errors log = logger = logging.getLogger("default") setup_logging() # endregion from statistics import median from threading import Thread import speedtest from dbo import Entry import routeros_api config = read_config() secrets = read_config('secrets') import time def mbits(bits): return round(bits / 1000000, 2) @catch_errors def gather_data(): log.debug("Gathering data...") downloads = [] uploads = [] dates = [] for entry in Entry.select(): downloads.append(entry.download) uploads.append(entry.upload) dates.append(entry.date_created) return dates, downloads, uploads @catch_errors def generate_plot_image(dates, downloads, uploads): log.debug("Genering image output...") import matplotlib import matplotlib.pyplot as plt dates = matplotlib.dates.date2num(dates) fig = plt.figure(figsize=(12, 3)) plt.plot_date(dates, downloads, fmt="b-") plt.ylabel('Download Speed Mbps') plt.tight_layout() plt.savefig(read_config()['output_image_path']) @catch_errors def generate_txt_output(dates, downloads, uploads): log.debug("Genering txt output...") txt = "Date: Down; Up;\n" for i, date in enumerate(dates): download = downloads[i] upload = uploads[i] txt += f"{date}: {download} Mbps; {upload} Mbps\n" with open(read_config()['output_txt_path'], "w+") as f: f.write(txt) @catch_errors def ros_fastrack_enable(enable): ''' Enable or disable fasttrack :param enable: True to enable, False to disable ''' if enable: disabled = 'false' else: disabled = 'true' log.info(f"Changing fasttrack disabled to {disabled}") connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"], password=secrets["ros_password"], plaintext_login=True) api = connection.get_api() filter = api.get_resource('/ip/firewall/filter') for i in filter.get(): if config['ros_fasttrack_comment'] in i['comment']: filter.set(id=i['id'], disabled=disabled) connection.disconnect() @catch_errors def ros_dynamic_speed(upload, download): ''' Adjust router os queue speed. :param upload: Upload speed from speedtest :param download: Download speed from speedtest ''' log.debug(f"Set Dynamic Speed to: DOWN {mbits(download)} mbps; UP {mbits(upload)} mbps") connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"], password=secrets["ros_password"], plaintext_login=True) api = connection.get_api() list_queues = api.get_resource('/queue/simple') for queue in list_queues.get(): if queue['name'] in config['ros_queues']: log.debug( f"Adjust Queue {queue['name']}: max_limit {int(upload)}/{int(download)}") if config["ros_du_invert"] == True: # Inverting upload and download values, because master queue is most likely applied to the bridge list_queues.set(id=queue['id'], max_limit=f"{int(download)}/{int(upload)}") else: # Not inverting, use this in case master queue is applied to something like LTE interface list_queues.set(id=queue['id'], max_limit=f"{int(upload)}/{int(download)}") connection.disconnect() wan_upload = None wan_download = None results_dict = None test_started = False downloading = True # True for download, False for upload threads = None servers = [] def threaded_speedtest(): global test_started global downloading s = speedtest.Speedtest() s.get_servers(servers) s.get_best_server() print(f"Running test...") test_started = True downloading = True s.download(threads=threads) downloading = False s.upload(threads=threads, pre_allocate=False) global results_dict results_dict = s.results.dict() log.info( f"Speedtest.net result: DOWN {mbits(results_dict['download'])} mbps; UP {mbits(results_dict['upload'])} mbps;") return results_dict def threaded_wan_speed(): global test_started global results_dict global downloading print("Waiting for test to start...") while not test_started: time.sleep(1) print("Allow warm-up...") time.sleep(2) uploads = [] downloads = [] print("Monitoring...") while not results_dict: connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"], password=secrets["ros_password"], plaintext_login=True) api = connection.get_api() traffic = api.get_resource('/').call('interface/monitor-traffic', {'interface': config['ros_wan_interface'], 'once': ' '})[0] traffic['rx-bits-per-second'] = int(traffic['rx-bits-per-second']) traffic['tx-bits-per-second'] = int(traffic['tx-bits-per-second']) if downloading: downloads.append(traffic['rx-bits-per-second']) print(f"DL: {mbits(traffic['rx-bits-per-second'])} mbps;") else: uploads.append(traffic['tx-bits-per-second']) print(f"UP: {mbits(traffic['tx-bits-per-second'])} mbps;") time.sleep(1) global wan_download global wan_upload wan_download = median(downloads) wan_upload = median(uploads) log.info(f"Monitor result: {mbits(wan_download)} mbps; {mbits(wan_upload)} mbps;") def test_speed(): sws = Thread(target=threaded_wan_speed) st = Thread(target=threaded_speedtest) st.start() sws.start() st.join() sws.join() if __name__ == "__main__": ''' This script will run a few speed tests, calculate average upload and download speeds and record them into database. Once finished it will also generate an image with graph plotted. ''' # generate_plot_image(*gather_data()) # import sys # sys.exit() from random import uniform try: if config["ros_dynamic_speed"]: ros_fastrack_enable(True) time.sleep(5) test_speed() entry = Entry() entry.upload = mbits(wan_upload) entry.download = mbits(wan_download) entry.save() if wan_download < config['ros_minimum_speed']: wan_download = config['ros_minimum_speed'] if config["ros_dynamic_speed"]: ros_dynamic_speed(wan_upload, wan_download) ros_fastrack_enable(False) dates, downloads, uploads = gather_data() generate_txt_output(dates, downloads, uploads) generate_plot_image(dates, downloads, uploads) except: log.error("Error!", exc_info=True) if config["ros_dynamic_speed"]: ros_fastrack_enable(False)