BroSpeedTester/speedtester.py

520 lines
16 KiB
Python

from configuration import read_config, dash_data
# region Logger
import logging
from debug import setup_logging, catch_errors
log = logger = logging.getLogger("default")
setup_logging()
# endregion
from statistics import median
from threading import Thread
from datetime import datetime
import speedtest
from dbo import Entry
import routeros_api
config = read_config()
secrets = read_config('secrets')
import time
def mbits(bits):
return round(bits / 1000000, 2)
@catch_errors
def gather_data():
log.debug("Gathering data...")
downloads = []
uploads = []
dates = []
for entry in Entry.select():
downloads.append(entry.download)
uploads.append(entry.upload)
dates.append(entry.date_created)
return dates, downloads, uploads
import matplotlib
import matplotlib.pyplot as plt
@catch_errors
def generate_plot_image(dates, downloads, uploads, name="speed", description="Speed Graph"):
log.debug("Genering image output for {}...".format(name))
dates = matplotlib.dates.date2num(dates)
fig = plt.figure(figsize=(15, 3))
plt.plot_date(dates, downloads, linestyle='solid', color="red", linewidth=2, marker=None)
plt.plot_date(dates, uploads, linestyle='dashed', color="green", linewidth=0.5, marker=None)
plt.ylabel(description + " (Mbps)")
plt.tight_layout()
plt.savefig(read_config()['output_image_path'].format(name))
@catch_errors
def generate_diff(dates, downloads, uploads):
dl = []
up = []
for i, date in enumerate(dates):
curr = downloads[i] if downloads[i] else 0
prev = downloads[i - 1] if downloads[i - 1] and i > 0 else 0
dl.append(curr - prev)
curr = uploads[i] if uploads[i] else 0
prev = uploads[i - 1] if uploads[i - 1] and i > 0 else 0
up.append(curr - prev)
generate_plot_image(dates, dl, up, "diff", "Difference Graph")
def generate_updown_plot_simple(downloads, uploads, name, description):
fig = plt.figure(figsize=(10, 3))
plt.plot(downloads, linestyle='solid', color="red", linewidth=2, marker=None)
plt.plot(uploads, linestyle='dashed', color="green", linewidth=0.5, marker=None)
plt.ylabel(description + " (Mbps)")
plt.tight_layout()
plt.savefig(read_config()['output_image_path'].format(name))
@catch_errors
def gather_day_median_data(dates, downloads, uploads):
log.debug("Gather day median data...")
dl = []
up = []
for hour in range(24):
dl.append([])
up.append([])
for i, date in enumerate(dates):
hour = date.hour
dl[hour].append(downloads[i] if downloads[i] else 0)
up[hour].append(uploads[i] if uploads[i] else 0)
for i, hour in enumerate(dl):
dl[i] = median(dl[i]) if len(dl[i]) > 0 else 0
for i, hour in enumerate(up):
up[i] = median(up[i]) if len(up[i]) > 0 else 0
return dl, up
@catch_errors
def generate_day_median(dates, downloads, uploads):
dl, up = gather_day_median_data(dates, downloads, uploads)
generate_updown_plot_simple(dl, up, "day_median", "Day average")
@catch_errors
def generate_day_median_diff(dates, downloads, uploads):
dl, up = gather_day_median_data(dates, downloads, uploads)
downs = []
ups = []
for i, down in enumerate(dl):
if i > 0:
downs.append(dl[i] - dl[i - 1])
ups.append(up[i] - up[i - 1])
else:
downs.append(dl[i])
ups.append(up[i])
generate_updown_plot_simple(downs, ups, "day_median_diff", "Day average diff")
@catch_errors
def generate_txt_output(dates, downloads, uploads):
log.debug("Genering txt output...")
txt = "Date: Down; Up;\n"
for i, date in enumerate(dates):
download = downloads[i]
upload = uploads[i]
txt += f"{date}: {download} Mbps; {upload} Mbps\n"
with open(read_config()['output_txt_path'], "w+") as f:
f.write(txt)
@catch_errors
def ros_fastrack_enable(enable):
'''
Enable or disable fasttrack
:param enable: True to enable, False to disable
'''
if enable:
disabled = 'false'
else:
disabled = 'true'
log.info(f"Changing fasttrack disabled to {disabled}")
connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"],
password=secrets["ros_password"], plaintext_login=True)
api = connection.get_api()
filter = api.get_resource('/ip/firewall/filter')
for i in filter.get():
if config['ros_fasttrack_comment'] in i['comment']:
filter.set(id=i['id'], disabled=disabled)
connection.disconnect()
@catch_errors
def ros_unlimited_speed():
connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"],
password=secrets["ros_password"], plaintext_login=True)
api = connection.get_api()
list_queues = api.get_resource('/queue/simple')
up = down = 4294967295
for queue in list_queues.get():
if queue['name'] in config['ros_queues'] or (
"all" in config['ros_queues'] and queue["name"] not in config['ros_queues_except']):
log.debug(f"Adjust Queue {queue['name']}: max_limit {int(up)}/{int(down)}")
try:
if config["ros_du_invert"] == True:
# Inverting upload and download values, because master queue is most likely applied to the bridge
list_queues.set(id=queue['id'], max_limit=f"{int(down)}/{int(up)}")
else:
# Not inverting, use this in case master queue is applied to something like LTE interface
list_queues.set(id=queue['id'], max_limit=f"{int(up)}/{int(down)}")
except:
log.error("Unable to change queue settings.", exc_info=True)
connection.disconnect()
@catch_errors
def ros_dynamic_speed(upload, download):
'''
Adjust router os queue speed.
:param upload: Upload speed from speedtest
:param download: Download speed from speedtest
'''
log.debug(f"Set Speed to: DOWN {mbits(download)} mbps; UP {mbits(upload)} mbps")
connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"],
password=secrets["ros_password"], plaintext_login=True)
api = connection.get_api()
list_queues = api.get_resource('/queue/simple')
for queue in list_queues.get():
if queue['name'] in config['ros_queues'] or (
"all" in config['ros_queues'] and queue["name"] not in config['ros_queues_except']):
if queue['name'] in data_dict['wan_downloads']:
log.info(f"{queue['name']} found in data dict, setting individual limit.")
down = data_dict['wan_downloads'][queue['name']]
up = data_dict['wan_uploads'][queue['name']]
else:
log.info(f"{queue['name']} was not found in data dict, setting total limit.")
log.debug(data_dict)
down = download
up = upload
log.debug(
f"Adjust Queue {queue['name']}: max_limit {int(up)}/{int(down)}")
try:
if config["ros_du_invert"] == True:
# Inverting upload and download values, because master queue is most likely applied to the bridge
list_queues.set(id=queue['id'], max_limit=f"{int(down)}/{int(up)}")
else:
# Not inverting, use this in case master queue is applied to something like LTE interface
list_queues.set(id=queue['id'], max_limit=f"{int(up)}/{int(down)}")
except:
log.error("Unable to change queue settings.", exc_info=True)
connection.disconnect()
data_dict = {}
wan_upload = None
wan_download = None
results_dict = None
test_started = False
speedtest_failed = False
downloading = True # True for download, False for upload
threads = config['speedtest_threads']
def reset_globals():
global wan_upload
global wan_download
global results_dict
global test_started
global speedtest_failed
global downloading
global data_dict
wan_upload = None
wan_download = None
results_dict = None
test_started = False
speedtest_failed = False
downloading = True
data_dict = {}
def threaded_speedtest():
global test_started
global downloading
global results_dict
global speedtest_failed
try:
s = speedtest.Speedtest()
s.get_servers()
# s.get_best_server()
print(f"Running test...")
test_started = True
downloading = True
s.download(threads=threads)
downloading = False
s.upload(threads=threads, pre_allocate=False)
results_dict = s.results.dict()
log.info(
f"Speedtest.net result: DOWN {mbits(results_dict['download'])} mbps; UP {mbits(results_dict['upload'])} mbps;")
except Exception as e:
speedtest_failed = True
log.error(f"Speedtest.net failed: {e}", exc_info=True)
return results_dict
def threaded_wan_speed():
global test_started
global results_dict
global downloading
global data_dict
print("Waiting for test to start...")
while not test_started:
if speedtest_failed:
return
time.sleep(1)
print("Download warm-up...")
time.sleep(2)
uploads = {"total": []}
downloads = {"total": []}
for interface in config['ros_wan_interface']:
uploads[interface] = []
downloads[interface] = []
print("Monitoring...")
upload_warmed_up = False
while results_dict == None:
print("\n\n---\n")
connection = routeros_api.RouterOsApiPool(config['ros_ip'], username=secrets["ros_login"],
password=secrets["ros_password"], plaintext_login=True)
api = connection.get_api()
rx = 0
tx = 0
for interface in config['ros_wan_interface']:
traffic = api.get_resource('/').call('interface/monitor-traffic',
{'interface': interface, 'once': ' '})[0]
rx += int(traffic['rx-bits-per-second'])
tx += int(traffic['tx-bits-per-second'])
if downloading:
print(f"{interface} DL: {mbits(int(traffic['rx-bits-per-second']))} mbps;")
downloads[interface].append(int(traffic['rx-bits-per-second']))
elif upload_warmed_up:
print(f"{interface} UP: {mbits(int(traffic['tx-bits-per-second']))} mbps;")
uploads[interface].append(int(traffic['tx-bits-per-second']))
if downloading:
downloads['total'].append(rx)
print(f"TOTAL DL: {mbits(rx)} mbps;")
else:
if upload_warmed_up:
uploads['total'].append(tx)
print(f"TOTAL UP: {mbits(tx)} mbps;")
else:
print("Upload warm-up...")
time.sleep(2)
upload_warmed_up = True
time.sleep(1)
global wan_download
global wan_upload
wan_download = median(downloads['total'])
wan_upload = median(uploads['total'])
data_dict['wan_downloads'] = {}
data_dict['wan_uploads'] = {}
for key, value in downloads.items():
data_dict['wan_downloads'][key] = median(value)
for key, value in uploads.items():
data_dict['wan_uploads'][key] = median(value)
log.info(f"Monitor result: {mbits(wan_download)} mbps; {mbits(wan_upload)} mbps;")
def test_speed(parallel=2):
global wan_download
reset_globals()
sws = Thread(target=threaded_wan_speed)
tests = []
for test in range(parallel):
t = Thread(target=threaded_speedtest)
tests.append(t)
log.info(f"Starting test thread {test}...")
t.start()
log.info("Starting monitoring thread...")
sws.start()
for i, test in enumerate(tests):
test.join()
log.info(f"Test thread {i} finished.")
sws.join()
log.info(f"Monitoring thread finished.")
return
def generate_database_reports():
dates, downloads, uploads = gather_data()
generate_txt_output(dates, downloads, uploads)
generate_plot_image(dates, downloads, uploads)
generate_day_median(dates, downloads, uploads)
generate_day_median_diff(dates, downloads, uploads)
generate_diff(dates, downloads, uploads)
import sys
from netutils import test_intertnet_connection
def on_fail_or_no_connection():
ros_fastrack_enable(False)
entry = Entry()
entry.upload = None
entry.download = None
entry.data = None
entry.save()
generate_database_reports()
dash_data({
"wan_download": 0,
"wan_upload": 0,
"quality": 0,
"quality_percent": 0,
"datetime": datetime.now()
})
log.warning("No internet connection! Exiting.")
sys.exit()
def range_convert(value, min_old, max_old, min_new, max_new, clamp=True, integer=True):
a = (((value - min_old) * (max_new - min_new)) / (max_old - min_old)) + min_new
if clamp:
if a > max_new:
a = max_new
elif a < min_new:
a = min_new
if integer:
a = int(a)
return a
def bits_to_quality(mbps, interface='total', percent=False, integer=True):
interface_data = config['ros_quality']
if interface in interface_data:
d = interface_data[interface]
if percent:
return range_convert(mbits(mbps), d[0], d[1], 1, 100, integer=integer)
else:
return range_convert(mbits(mbps), d[0], d[1], 1, 5, integer=integer)
else:
return 0
def make_quality_dict(d):
data = {}
for key, value in d.items():
if key not in data:
data[key] = {}
data[key]['quality'] = bits_to_quality(value, key)
data[key]['quality_percent'] = bits_to_quality(value, key, True)
return data
if __name__ == "__main__":
'''
This script will run a few speed tests, calculate average upload and download speeds and record them into database.
Once finished it will also generate an image with graph plotted.
'''
# generate_plot_image(*gather_data())
# import sys
# sys.exit()
from random import uniform
try:
log.debug("Test internet connection...")
if config["ros_dynamic_speed"]:
# ros_fastrack_enable(True)
# Temporary disable limits by adjusting queues
ros_unlimited_speed() # Thats 4096 Megabits adjust if you got more...
time.sleep(5)
test_speed(config['speedtest_master_threads'])
if speedtest_failed:
on_fail_or_no_connection()
entry = Entry()
entry.upload = mbits(wan_upload)
entry.download = mbits(wan_download)
entry.data = data_dict
entry.save()
if wan_download < config['ros_minimum_speed']:
wan_download = config['ros_minimum_speed']
if config["ros_dynamic_speed"]:
ros_dynamic_speed(wan_upload, wan_download)
# ros_fastrack_enable(False)
generate_database_reports()
data_dict.update({
"download_quality": make_quality_dict(data_dict['wan_downloads']),
"upload_quality": make_quality_dict(data_dict['wan_uploads']),
"datetime": datetime.now()
})
dash_data(data_dict)
# dash_data({
# "wan_download": mbits(wan_download),
# "wan_upload": mbits(wan_upload),
# "quality": int(range_convert(mbits(wan_download), config['bad_speed_mbps'], config['good_speed_mbps'], 1, 5)),
# "quality_percent": int(range_convert(mbits(wan_download), config['bad_speed_mbps'], config['good_speed_mbps'], 1, 100)),
# "datetime": datetime.now()
# })
except:
log.error("Error!", exc_info=True)
# if config["ros_dynamic_speed"]:
# ros_fastrack_enable(False)