I have a script that show some info , like version, date/time, CPU ussage and temp, memory, disk.....
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
opn-lcd.py — LCDproc client for OPNsense
Rotation:
Version → Date/Time → Uptime → CPU → CPU Temp → Memory → Disk Used → LAN IP → WAN IP → Gateway → Live Users
Highlights
- Even timing via time.monotonic()
- CPU% via non-blocking 1s sampler (matches `top -d 1`)
- CPU temps via dev.cpu.*.temperature (coretemp/amdtemp); fallback to ACPI tz
- Disk% via ZFS pools (zpool -Hp size,alloc), fallback to root (/)
- OpenVPN SSL users: parses /var/etc/openvpn/instance-*.conf to find **server** instances,
then counts via instance .sock (mgmt) or .stat (CSV/text)
- WireGuard active peers by recent handshake (default 180s)
"""
import os, re, glob, socket, socket as _sock, subprocess, time
from datetime import datetime
# ----------------------------- CONFIG ---------------------------------
LCDD_HOST = os.environ.get("OPN_LCD_HOST", "127.0.0.1")
LCDD_PORT = int(os.environ.get("OPN_LCD_PORT", "13666"))
LCD_W = int(os.environ.get("OPN_LCD_W", "16"))
LCD_H = int(os.environ.get("OPN_LCD_H", "2"))
SCREEN_SECONDS = float(os.environ.get("OPN_LCD_SCREEN_SEC", "4.0"))
CPU_SAMPLE = float(os.environ.get("OPN_LCD_CPU_SAMPLE", "1.0")) # 1s to match top
WAN_IF_OVERRIDE = os.environ.get("OPN_LCD_WAN_IF", "").strip()
LAN_IF_OVERRIDE = os.environ.get("OPN_LCD_LAN_IF", "").strip()
WG_ACTIVE_WINDOW = int(os.environ.get("OPN_LCD_WG_WINDOW", "180"))
NET_SAMPLE = float(os.environ.get("OPN_LCD_NET_SAMPLE", "1.0")) # 1s WAN rate sampler
DEBUG = os.environ.get("OPN_LCD_DEBUG", "0") not in ("0","false","False","")
def logd(msg):
if DEBUG:
try: print(f"[opn-lcd] {msg}")
except Exception: pass
# --------------------------- HELPERS ----------------------------------
def run_cmd(args, timeout=2.5) -> str:
try:
out = subprocess.check_output(args, timeout=timeout, stderr=subprocess.DEVNULL)
return out.decode(errors="ignore").strip()
except Exception:
return ""
def lcd_escape(s: str) -> str: return s.replace("\\","\\\\").replace('"','\\"')
def trunc(s: str, w: int) -> str: return s if len(s)<=w else s[:w]
def center_x(s: str, w: int) -> int: return 1 if len(s)>=w else 1+(w-len(s))//2
# --------------------------- METRICS ----------------------------------
def sysctl_values(*names):
if not names: return []
out = run_cmd(["/sbin/sysctl","-n",*names]).splitlines()
while len(out) < len(names): out.append("")
return out
# ---- CPU non-blocking sampler ----
def cpu_times():
raw = run_cmd(["/sbin/sysctl","-n","kern.cp_time"])
try:
u,n,s,inte,idle = map(int, raw.split());
return {"user":u,"nice":n,"sys":s,"intr":inte,"idle":idle}
except Exception:
return {"user":0,"nice":0,"sys":0,"intr":0,"idle":0}
_cpu_prev = None; _cpu_prev_t = 0.0; _cpu_pct = 0.0
def cpu_sampler_tick():
global _cpu_prev,_cpu_prev_t,_cpu_pct
now = time.monotonic()
if _cpu_prev is None:
_cpu_prev = cpu_times(); _cpu_prev_t = now; return
if now - _cpu_prev_t >= CPU_SAMPLE:
a = _cpu_prev; b = cpu_times()
d = {k:max(0,b[k]-a[k]) for k in b}
total = sum(d.values()); idle = d.get("idle",0)
_cpu_pct = 0.0 if total<=0 else 100.0*(total-idle)/total
_cpu_prev = b; _cpu_prev_t = now
def cpu_latest_percent(): return _cpu_pct
# ---- CPU temperatures ----
def _parse_celsius(text: str):
m = re.search(r"(-?\d+(?:\.[0-9]+)?)\s*C", text or "")
return float(m.group(1)) if m else None
def cpu_temps_c():
"""Return a list of CPU core/package temperatures in Celsius.
Prefer explicit per-core reads via kern.smp.cpus → dev.cpu.N.temperature.
Fallback to scanning dev.cpu and then ACPI thermal zones."""
temps = []
# 0) Fast path: explicit per-core sysctls
n_str = run_cmd(["/sbin/sysctl", "-n", "kern.smp.cpus"]) or ""
try:
n = max(1, int(n_str))
except Exception:
n = 16 # safe upper bound; we'll just skip missing ones
for i in range(n):
val = run_cmd(["/sbin/sysctl", "-n", f"dev.cpu.{i}.temperature"]) or ""
c = _parse_celsius(val)
if c is not None:
temps.append(c)
if temps:
return temps
# 1) Broader scan under dev.cpu.* (handles sparse indexes)
txt = run_cmd(["/sbin/sysctl", "-a", "dev.cpu"]) or ""
for line in txt.splitlines():
m = re.search(r"dev\.cpu\.(\d+)\.temperature:\s*([-0-9.]+)C", line)
if m:
try:
temps.append(float(m.group(2)))
except Exception:
pass
if temps:
return temps
# 2) ACPI thermal zones fallback
for tz in range(0, 8):
val = run_cmd(["/sbin/sysctl", "-n", f"hw.acpi.thermal.tz{tz}.temperature"]) or ""
c = _parse_celsius(val)
if c is not None:
temps.append(c)
return temps
# ---- Load average ----
def loadavg_values():
s = run_cmd(["/sbin/sysctl","-n","vm.loadavg"]) or ""
vals = re.findall(r"(-?\d+(?:\.\d+)?)", s)
if len(vals) >= 3:
try:
return [float(vals[0]), float(vals[1]), float(vals[2])]
except Exception:
pass
up = run_cmd(["/usr/bin/uptime"]) or ""
vals = re.findall(r"(\d+\.\d+)", up)
if len(vals) >= 3:
try:
return [float(vals[-3]), float(vals[-2]), float(vals[-1])]
except Exception:
pass
return []
def loadavg_str():
v = loadavg_values()
if not v:
return "N/A"
return f"{v[0]:.2f} {v[1]:.2f} {v[2]:.2f}"
# ---- Memory ----
def mem_usage_percent():
total_s, active_s, wired_s = sysctl_values(
"vm.stats.vm.v_page_count","vm.stats.vm.v_active_count","vm.stats.vm.v_wire_count")
try:
total = int(total_s) or 1; used = int(active_s)+int(wired_s)
return 100.0*used/total
except Exception: return 0.0
# ---- Uptime / Date ----
def uptime_tuple():
s = run_cmd(["/sbin/sysctl","-n","kern.boottime"])
m = re.search(r"sec\s*=\s*(\d+)", s)
boot = int(m.group(1)) if m else int(s or 0) if s.isdigit() else int(time.time())
delta = max(0, int(time.time())-boot)
return delta//86400, f"{(delta%86400)//3600:02d}:{(delta%3600)//60:02d}"
def date_time_lines():
now = datetime.now().astimezone()
return now.strftime("%a %d %b %Y"), now.strftime("%H:%M:%S %Z")
def opnsense_version():
for cmd in (["/usr/local/sbin/opnsense-version","-n","-v"],
["/usr/local/sbin/opnsense-version","-v"],
["/usr/local/sbin/opnsense-version"]):
out = run_cmd(cmd)
if out: return out.splitlines()[0].strip()
return run_cmd(["/usr/bin/uname","-sr"]) or "OPNsense"
# ---- Networking ----
def default_route_info():
txt = run_cmd(["/sbin/route","-n","get","-inet","default"])
iface = gw = ""
for line in txt.splitlines():
if "interface:" in line: iface = line.split(":",1)[1].strip()
elif "gateway:" in line: gw = line.split(":",1)[1].strip()
return iface, gw
def iface_ipv4(iface: str) -> str:
if not iface: return ""
out = run_cmd(["/sbin/ifconfig", iface])
m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)\b", out)
return m.group(1) if m else ""
def list_ifaces():
return [n for n in run_cmd(["/sbin/ifconfig","-l"]).split() if n]
def is_private_ipv4(ip: str) -> bool:
parts = ip.split(".")
if len(parts)!=4: return False
try: p1=int(parts[0]); p2=int(parts[1])
except: return False
return (p1==10) or (p1==192 and p2==168) or (p1==172 and 16<=p2<=31)
def pick_lan_iface(exclude:set) -> str:
if LAN_IF_OVERRIDE: return LAN_IF_OVERRIDE
for iface in list_ifaces():
if iface in exclude or iface.startswith(("lo","enc","pflog")): continue
ip = iface_ipv4(iface)
if ip and is_private_ipv4(ip): return iface
return ""
# ---- WAN throughput sampler (bytes → bits/sec) ----
_net_prev_if = None
_net_prev_ibytes = 0
_net_prev_obytes = 0
_net_prev_t = 0.0
_net_rx_bps = 0.0
_net_tx_bps = 0.0
def _netstat_bytes(iface: str):
"""Return (ibytes, obytes) for iface via netstat. Robust to column order."""
if not iface:
return None
txt = run_cmd(["/usr/bin/netstat", "-I", iface, "-b", "-n", "-W"]) or ""
lines = [l for l in txt.splitlines() if l.strip()]
if len(lines) < 2:
return None
header = lines[0].split()
# find indices for Ibytes/Obytes regardless of case
i_idx = o_idx = None
for idx, name in enumerate(header):
if name.lower() == "ibytes": i_idx = idx
if name.lower() == "obytes": o_idx = idx
if i_idx is None or o_idx is None:
return None
for row in lines[1:]:
cols = row.split()
if not cols or cols[0] != iface:
continue
try:
return int(cols[i_idx]), int(cols[o_idx])
except Exception:
continue
return None
def _fmt_rate(bps: float) -> str:
if bps <= 0:
return "0"
units = ["b","Kb","Mb","Gb","Tb"]
val = float(bps)
idx = 0
while val >= 1000.0 and idx < len(units)-1:
val /= 1000.0; idx += 1
if idx >= 2:
return f"{val:.1f}{units[idx]}"
return f"{val:.0f}{units[idx]}"
def wan_sampler_tick(wan_if: str):
global _net_prev_if, _net_prev_ibytes, _net_prev_obytes, _net_prev_t, _net_rx_bps, _net_tx_bps
now = time.monotonic()
if not wan_if:
_net_prev_if = None
_net_rx_bps = _net_tx_bps = 0.0
return
if _net_prev_if != wan_if:
_net_prev_if = wan_if
stats = _netstat_bytes(wan_if)
if not stats:
_net_rx_bps = _net_tx_bps = 0.0
return
_net_prev_ibytes, _net_prev_obytes = stats
_net_prev_t = now
_net_rx_bps = _net_tx_bps = 0.0
return
if now - _net_prev_t < NET_SAMPLE:
return
stats = _netstat_bytes(wan_if)
if not stats:
return
iby, oby = stats
dt = max(0.001, now - _net_prev_t)
di = max(0, iby - _net_prev_ibytes)
do = max(0, oby - _net_prev_obytes)
_net_rx_bps = (di * 8.0) / dt
_net_tx_bps = (do * 8.0) / dt
_net_prev_ibytes, _net_prev_obytes, _net_prev_t = iby, oby, now
def wan_rates():
return _net_rx_bps, _net_tx_bps
# ---- Disk usage ----
def total_disk_usage_percent():
zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size,alloc"])
if zp:
size_b=0; alloc_b=0
for line in zp.splitlines():
parts=line.split()
if len(parts)>=2:
try: size_b+=int(parts[0]); alloc_b+=int(parts[1])
except: pass
if size_b>0: return 100.0*alloc_b/size_b
out = run_cmd(["/bin/df","-kP","/"]).splitlines()
if len(out)>=2:
cols = out[1].split()
if len(cols)>=3:
try:
total_k=int(cols[1]); used_k=int(cols[2])
return 100.0*used_k/total_k if total_k>0 else 0.0
except: pass
return 0.0
def _total_size_gib():
zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size"])
if zp:
try: return max(1, round(sum(int(x) for x in zp.split())/(1<<30)))
except: pass
out = run_cmd(["/bin/df","-kP","/"]).splitlines()
if len(out)>=2:
try: return max(1, round((int(out[1].split()[1])*1024)/(1<<30)))
except: pass
return 0
# ---- OpenVPN (server clients) ----
def _ovpn_instances():
"""Return list of dicts: {'base':path_without_ext, 'conf':..., 'sock':..., 'stat':..., 'is_server':bool}"""
out=[]
for conf in sorted(glob.glob("/var/etc/openvpn/instance-*.conf")):
base = conf[:-5] # strip .conf
sock = base + ".sock"
stat = base + ".stat"
is_server = False
try:
with open(conf,"r",errors="ignore") as f: txt=f.read()
if re.search(r'^\s*mode\s+server\s*$', txt, re.M|re.I): is_server=True
if re.search(r'^\s*server(-bridge)?\b', txt, re.M|re.I): is_server=True
except Exception:
pass
out.append({"base":base,"conf":conf,"sock":sock,"stat":stat,"is_server":is_server})
return out
def _ovpn_count_from_socket(sock_path, timeout=3.0):
"""Try status 3 (CSV), then status 2, then text 'status'."""
try:
s=_sock.socket(_sock.AF_UNIX,_sock.SOCK_STREAM)
s.settimeout(timeout); s.connect(sock_path)
try: s.recv(1024)
except Exception: pass
for cmd in (b"status 3\n", b"status 2\n", b"status\n"):
try:
s.sendall(cmd); buf=b""
while True:
chunk=s.recv(4096)
if not chunk: break
buf+=chunk
if b"\nEND" in buf or b"\r\nEND" in buf: break
txt=buf.decode(errors="ignore")
if cmd!=b"status\n":
cnt=sum(1 for line in txt.splitlines() if line.startswith("CLIENT_LIST,"))
if cnt>0:
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
return cnt
# text format fallback parse
in_clients=False; c=0
for line in txt.splitlines():
L=line.strip()
if L.startswith("CLIENT LIST"):
in_clients=True; continue
if in_clients:
if not L or L.startswith(("Updated,","Common Name")): continue
if L.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
c+=1
if c>0:
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
return c
except Exception:
continue
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
except Exception as e:
logd(f"OpenVPN sock fail {sock_path}: {e}")
return 0
def _ovpn_count_from_status_file(path):
try:
with open(path,"r",errors="ignore") as f: lines=f.read().splitlines()
except Exception:
return 0
v2=[l for l in lines if l.startswith("CLIENT_LIST,")]
if v2: return len(v2)
in_clients=False; c=0
for l in lines:
t=l.strip()
if t.startswith("CLIENT LIST"): in_clients=True; continue
if in_clients:
if not t or t.startswith(("Updated,","Common Name")): continue
if t.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
c+=1
return c
def count_openvpn_clients():
total=0
insts=_ovpn_instances()
if not insts:
for sp in sorted(set(glob.glob("/var/etc/openvpn/*.sock")+glob.glob("/var/run/openvpn/*.sock"))):
total += _ovpn_count_from_socket(sp)
for sf in sorted(set(glob.glob("/var/etc/openvpn/*.stat"))):
total += _ovpn_count_from_status_file(sf)
return total
for inst in insts:
if not inst["is_server"]:
logd(f"skip client instance: {inst['conf']}")
continue
cnt=0
if os.path.exists(inst["sock"]):
cnt=_ovpn_count_from_socket(inst["sock"])
logd(f"sock {inst['sock']} -> {cnt}")
if cnt==0 and os.path.exists(inst["stat"]):
c2=_ovpn_count_from_status_file(inst["stat"])
logd(f"stat {inst['stat']} -> {c2}")
cnt += c2
total += cnt
return total
# ---- WireGuard ----
def _wg_cmd_output():
for cmd in ("/usr/local/bin/wg","/usr/local/sbin/wg","/usr/bin/wg","/sbin/wg"):
if os.path.exists(cmd): return run_cmd([cmd,"show","all","dump"])
return ""
def count_wireguard_active(window=WG_ACTIVE_WINDOW):
txt=_wg_cmd_output()
if not txt: return 0
now=int(time.time()); cnt=0
for line in txt.splitlines():
parts=line.split("\t");
if len(parts)<9: parts=line.split()
if len(parts)>=9:
try: last=int(parts[5])
except: continue
if last>0 and (now-last)<=window: cnt+=1
return cnt
# --------------------------- LCD CLIENT -------------------------------
class LCDClient:
def __init__(self, host=LCDD_HOST, port=LCDD_PORT, w=LCD_W, h=LCD_H):
self.host, self.port, self.w, self.h = host, port, w, h
self.sock=None; self.screen_id="scr"; self.w1_id="l1"; self.w2_id="l2"; self.current_title=None
def connect(self):
self.close()
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.settimeout(3.0); s.connect((self.host,self.port)); self.sock=s
self._read_banner(); self._send("hello")
self._send('client_set name "opn-lcd"')
self._send("client_set -backlight on"); self._send("client_set -heartbeat off")
self._send(f"screen_add {self.screen_id}")
self._send(f'screen_set {self.screen_id} -name "OPNsense" -priority foreground -heartbeat off')
self._send(f"widget_add {self.screen_id} {self.w1_id} string")
if self.h>=2: self._send(f"widget_add {self.screen_id} {self.w2_id} string")
self.current_title="OPNsense"
def close(self):
if self.sock:
try: self.sock.close()
except Exception: pass
self.sock=None
def _read_banner(self):
try: self.sock.settimeout(1.0); _=self.sock.recv(1024)
except Exception: pass
finally: self.sock.settimeout(3.0)
def _send(self,line:str):
if self.sock: self.sock.sendall((line+"\n").encode())
def set_title_once(self,title:str):
title=trunc(title,self.w)
if title!=self.current_title:
self._send(f'screen_set {self.screen_id} -name "{lcd_escape(title)}"')
self.current_title=title
def draw(self,l1:str,l2:str=""):
t1=trunc(l1,self.w); x1=center_x(t1,self.w)
self._send(f'widget_set {self.screen_id} {self.w1_id} {x1} 1 "{lcd_escape(t1)}"')
if self.h>=2:
t2=trunc(l2,self.w); x2=center_x(t2,self.w)
self._send(f'widget_set {self.screen_id} {self.w2_id} {x2} 2 "{lcd_escape(t2)}"')
# ---------------------------- MAIN LOOP ------------------------------
def main():
lcd=LCDClient()
next_refresh=time.monotonic()+1.0
refresh_period=30.0
screens=("FW","DATE","UP","CPU","LOAD","TEMP","MEM","DSK","LAN","WAN","NET","GW","USERS")
idx=0; next_switch=time.monotonic()+SCREEN_SECONDS
wan_if=WAN_IF_OVERRIDE; gateway=""; wan_ip=""; lan_if=""; lan_ip=""
sslvpn=0; wg=0
temp_max=None; temp_avg=None; temp_n=0
while True:
nowm=time.monotonic()
cpu_sampler_tick()
wan_sampler_tick(wan_if)
if lcd.sock is None:
try: lcd.connect()
except Exception:
time.sleep(0.5); continue
if nowm>=next_refresh:
if not WAN_IF_OVERRIDE:
wan_if, gateway = default_route_info()
else:
_, gateway = default_route_info()
wan_ip = iface_ipv4(wan_if) if wan_if else ""
if LAN_IF_OVERRIDE:
lan_if=LAN_IF_OVERRIDE
else:
lan_if = pick_lan_iface({wan_if,"lo0"})
lan_ip = iface_ipv4(lan_if) if lan_if else ""
sslvpn = count_openvpn_clients()
wg = count_wireguard_active(WG_ACTIVE_WINDOW)
temps = cpu_temps_c()
if temps:
temp_max = max(temps)
temp_avg = sum(temps)/len(temps)
temp_n = len(temps)
else:
temp_max = temp_avg = None
temp_n = 0
logd(f"counts: SSLVPN={sslvpn} WG={wg} temps={temps if temps else 'N/A'}")
next_refresh += refresh_period
if nowm>=next_switch:
idx=(idx+1)%len(screens)
while nowm>=next_switch: next_switch+=SCREEN_SECONDS
cur=screens[idx]
try:
if cur=="FW":
lcd.set_title_once("OPNSense FW"); lcd.draw("OPNSense FW", opnsense_version())
elif cur=="DATE":
lcd.set_title_once("Date/Time"); l1,l2=date_time_lines(); lcd.draw(l1,l2)
elif cur=="UP":
lcd.set_title_once("Uptime"); d,hm=uptime_tuple(); lcd.draw("System Uptime", f"{d} days {hm}")
elif cur=="CPU":
lcd.set_title_once("CPU"); lcd.draw("CPU Usage", f"{cpu_latest_percent():.1f}%")
elif cur=="LOAD":
lcd.set_title_once("Load Average");
lcd.draw("Load Average", loadavg_str())
elif cur=="TEMP":
lcd.set_title_once("CPU Temp")
if temp_max is None:
lcd.draw("CPU Temp", "Sensors N/A")
else:
line2 = (f"Max{temp_max:.0f}C Avg{temp_avg:.0f}C" if temp_n>1 else f"{temp_max:.0f}C")
lcd.draw("CPU Temp", line2)
elif cur=="MEM":
lcd.set_title_once("Memory"); lcd.draw("Memory Usage", f"{mem_usage_percent():.0f}% used")
elif cur=="DSK":
lcd.set_title_once("Disk Used"); d=total_disk_usage_percent(); total_g=_total_size_gib()
lcd.draw("Disk Used", f"{d:.0f}% of {total_g}G" if total_g>0 else f"{d:.0f}%")
elif cur=="LAN":
lcd.set_title_once("LAN IP"); lcd.draw("LAN IP", lan_ip or "N/A")
elif cur=="WAN":
lcd.set_title_once("WAN IP"); lcd.draw("WAN IP", wan_ip or "N/A")
elif cur=="NET":
lcd.set_title_once("WAN Rate")
if not wan_if:
lcd.draw("WAN Rate", "N/A")
else:
rx, tx = wan_rates()
lcd.draw("WAN Rate", f"D:{_fmt_rate(rx)} U:{_fmt_rate(tx)}")
elif cur=="GW":
lcd.set_title_once("Gateway"); lcd.draw("Gateway", gateway or "N/A")
elif cur=="USERS":
lcd.set_title_once("Live Users"); lcd.draw("Live Users", f"SSL:{sslvpn} WG:{wg}")
except (BrokenPipeError,OSError):
lcd.close()
time.sleep(0.1)
if __name__=="__main__":
try: main()
except KeyboardInterrupt: pass
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
opn-lcd.py — LCDproc client for OPNsense
Rotation:
Version → Date/Time → Uptime → CPU → CPU Temp → Memory → Disk Used → LAN IP → WAN IP → Gateway → Live Users
Highlights
- Even timing via time.monotonic()
- CPU% via non-blocking 1s sampler (matches `top -d 1`)
- CPU temps via dev.cpu.*.temperature (coretemp/amdtemp); fallback to ACPI tz
- Disk% via ZFS pools (zpool -Hp size,alloc), fallback to root (/)
- OpenVPN SSL users: parses /var/etc/openvpn/instance-*.conf to find **server** instances,
then counts via instance .sock (mgmt) or .stat (CSV/text)
- WireGuard active peers by recent handshake (default 180s)
"""
import os, re, glob, socket, socket as _sock, subprocess, time
from datetime import datetime
# ----------------------------- CONFIG ---------------------------------
LCDD_HOST = os.environ.get("OPN_LCD_HOST", "127.0.0.1")
LCDD_PORT = int(os.environ.get("OPN_LCD_PORT", "13666"))
LCD_W = int(os.environ.get("OPN_LCD_W", "16"))
LCD_H = int(os.environ.get("OPN_LCD_H", "2"))
SCREEN_SECONDS = float(os.environ.get("OPN_LCD_SCREEN_SEC", "4.0"))
CPU_SAMPLE = float(os.environ.get("OPN_LCD_CPU_SAMPLE", "1.0")) # 1s to match top
WAN_IF_OVERRIDE = os.environ.get("OPN_LCD_WAN_IF", "").strip()
LAN_IF_OVERRIDE = os.environ.get("OPN_LCD_LAN_IF", "").strip()
WG_ACTIVE_WINDOW = int(os.environ.get("OPN_LCD_WG_WINDOW", "180"))
NET_SAMPLE = float(os.environ.get("OPN_LCD_NET_SAMPLE", "1.0")) # 1s WAN rate sampler
DEBUG = os.environ.get("OPN_LCD_DEBUG", "0") not in ("0","false","False","")
def logd(msg):
if DEBUG:
try: print(f"[opn-lcd] {msg}")
except Exception: pass
# --------------------------- HELPERS ----------------------------------
def run_cmd(args, timeout=2.5) -> str:
try:
out = subprocess.check_output(args, timeout=timeout, stderr=subprocess.DEVNULL)
return out.decode(errors="ignore").strip()
except Exception:
return ""
def lcd_escape(s: str) -> str: return s.replace("\\","\\\\").replace('"','\\"')
def trunc(s: str, w: int) -> str: return s if len(s)<=w else s[:w]
def center_x(s: str, w: int) -> int: return 1 if len(s)>=w else 1+(w-len(s))//2
# --------------------------- METRICS ----------------------------------
def sysctl_values(*names):
if not names: return []
out = run_cmd(["/sbin/sysctl","-n",*names]).splitlines()
while len(out) < len(names): out.append("")
return out
# ---- CPU non-blocking sampler ----
def cpu_times():
raw = run_cmd(["/sbin/sysctl","-n","kern.cp_time"])
try:
u,n,s,inte,idle = map(int, raw.split());
return {"user":u,"nice":n,"sys":s,"intr":inte,"idle":idle}
except Exception:
return {"user":0,"nice":0,"sys":0,"intr":0,"idle":0}
_cpu_prev = None; _cpu_prev_t = 0.0; _cpu_pct = 0.0
def cpu_sampler_tick():
global _cpu_prev,_cpu_prev_t,_cpu_pct
now = time.monotonic()
if _cpu_prev is None:
_cpu_prev = cpu_times(); _cpu_prev_t = now; return
if now - _cpu_prev_t >= CPU_SAMPLE:
a = _cpu_prev; b = cpu_times()
d = {k:max(0,b[k]-a[k]) for k in b}
total = sum(d.values()); idle = d.get("idle",0)
_cpu_pct = 0.0 if total<=0 else 100.0*(total-idle)/total
_cpu_prev = b; _cpu_prev_t = now
def cpu_latest_percent(): return _cpu_pct
# ---- CPU temperatures ----
def _parse_celsius(text: str):
m = re.search(r"(-?\d+(?:\.[0-9]+)?)\s*C", text or "")
return float(m.group(1)) if m else None
def cpu_temps_c():
"""Return a list of CPU core/package temperatures in Celsius.
Prefer explicit per-core reads via kern.smp.cpus → dev.cpu.N.temperature.
Fallback to scanning dev.cpu and then ACPI thermal zones."""
temps = []
# 0) Fast path: explicit per-core sysctls
n_str = run_cmd(["/sbin/sysctl", "-n", "kern.smp.cpus"]) or ""
try:
n = max(1, int(n_str))
except Exception:
n = 16 # safe upper bound; we'll just skip missing ones
for i in range(n):
val = run_cmd(["/sbin/sysctl", "-n", f"dev.cpu.{i}.temperature"]) or ""
c = _parse_celsius(val)
if c is not None:
temps.append(c)
if temps:
return temps
# 1) Broader scan under dev.cpu.* (handles sparse indexes)
txt = run_cmd(["/sbin/sysctl", "-a", "dev.cpu"]) or ""
for line in txt.splitlines():
m = re.search(r"dev\.cpu\.(\d+)\.temperature:\s*([-0-9.]+)C", line)
if m:
try:
temps.append(float(m.group(2)))
except Exception:
pass
if temps:
return temps
# 2) ACPI thermal zones fallback
for tz in range(0, 8):
val = run_cmd(["/sbin/sysctl", "-n", f"hw.acpi.thermal.tz{tz}.temperature"]) or ""
c = _parse_celsius(val)
if c is not None:
temps.append(c)
return temps
# ---- Load average ----
def loadavg_values():
s = run_cmd(["/sbin/sysctl","-n","vm.loadavg"]) or ""
vals = re.findall(r"(-?\d+(?:\.\d+)?)", s)
if len(vals) >= 3:
try:
return [float(vals[0]), float(vals[1]), float(vals[2])]
except Exception:
pass
up = run_cmd(["/usr/bin/uptime"]) or ""
vals = re.findall(r"(\d+\.\d+)", up)
if len(vals) >= 3:
try:
return [float(vals[-3]), float(vals[-2]), float(vals[-1])]
except Exception:
pass
return []
def loadavg_str():
v = loadavg_values()
if not v:
return "N/A"
return f"{v[0]:.2f} {v[1]:.2f} {v[2]:.2f}"
# ---- Memory ----
def mem_usage_percent():
total_s, active_s, wired_s = sysctl_values(
"vm.stats.vm.v_page_count","vm.stats.vm.v_active_count","vm.stats.vm.v_wire_count")
try:
total = int(total_s) or 1; used = int(active_s)+int(wired_s)
return 100.0*used/total
except Exception: return 0.0
# ---- Uptime / Date ----
def uptime_tuple():
s = run_cmd(["/sbin/sysctl","-n","kern.boottime"])
m = re.search(r"sec\s*=\s*(\d+)", s)
boot = int(m.group(1)) if m else int(s or 0) if s.isdigit() else int(time.time())
delta = max(0, int(time.time())-boot)
return delta//86400, f"{(delta%86400)//3600:02d}:{(delta%3600)//60:02d}"
def date_time_lines():
now = datetime.now().astimezone()
return now.strftime("%a %d %b %Y"), now.strftime("%H:%M:%S %Z")
def opnsense_version():
for cmd in (["/usr/local/sbin/opnsense-version","-n","-v"],
["/usr/local/sbin/opnsense-version","-v"],
["/usr/local/sbin/opnsense-version"]):
out = run_cmd(cmd)
if out: return out.splitlines()[0].strip()
return run_cmd(["/usr/bin/uname","-sr"]) or "OPNsense"
# ---- Networking ----
def default_route_info():
txt = run_cmd(["/sbin/route","-n","get","-inet","default"])
iface = gw = ""
for line in txt.splitlines():
if "interface:" in line: iface = line.split(":",1)[1].strip()
elif "gateway:" in line: gw = line.split(":",1)[1].strip()
return iface, gw
def iface_ipv4(iface: str) -> str:
if not iface: return ""
out = run_cmd(["/sbin/ifconfig", iface])
m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)\b", out)
return m.group(1) if m else ""
def list_ifaces():
return [n for n in run_cmd(["/sbin/ifconfig","-l"]).split() if n]
def is_private_ipv4(ip: str) -> bool:
parts = ip.split(".")
if len(parts)!=4: return False
try: p1=int(parts[0]); p2=int(parts[1])
except: return False
return (p1==10) or (p1==192 and p2==168) or (p1==172 and 16<=p2<=31)
def pick_lan_iface(exclude:set) -> str:
if LAN_IF_OVERRIDE: return LAN_IF_OVERRIDE
for iface in list_ifaces():
if iface in exclude or iface.startswith(("lo","enc","pflog")): continue
ip = iface_ipv4(iface)
if ip and is_private_ipv4(ip): return iface
return ""
# ---- WAN throughput sampler (bytes → bits/sec) ----
_net_prev_if = None
_net_prev_ibytes = 0
_net_prev_obytes = 0
_net_prev_t = 0.0
_net_rx_bps = 0.0
_net_tx_bps = 0.0
def _netstat_bytes(iface: str):
"""Return (ibytes, obytes) for iface via netstat. Robust to column order."""
if not iface:
return None
txt = run_cmd(["/usr/bin/netstat", "-I", iface, "-b", "-n", "-W"]) or ""
lines = [l for l in txt.splitlines() if l.strip()]
if len(lines) < 2:
return None
header = lines[0].split()
# find indices for Ibytes/Obytes regardless of case
i_idx = o_idx = None
for idx, name in enumerate(header):
if name.lower() == "ibytes": i_idx = idx
if name.lower() == "obytes": o_idx = idx
if i_idx is None or o_idx is None:
return None
for row in lines[1:]:
cols = row.split()
if not cols or cols[0] != iface:
continue
try:
return int(cols[i_idx]), int(cols[o_idx])
except Exception:
continue
return None
def _fmt_rate(bps: float) -> str:
if bps <= 0:
return "0"
units = ["b","Kb","Mb","Gb","Tb"]
val = float(bps)
idx = 0
while val >= 1000.0 and idx < len(units)-1:
val /= 1000.0; idx += 1
if idx >= 2:
return f"{val:.1f}{units[idx]}"
return f"{val:.0f}{units[idx]}"
def wan_sampler_tick(wan_if: str):
global _net_prev_if, _net_prev_ibytes, _net_prev_obytes, _net_prev_t, _net_rx_bps, _net_tx_bps
now = time.monotonic()
if not wan_if:
_net_prev_if = None
_net_rx_bps = _net_tx_bps = 0.0
return
if _net_prev_if != wan_if:
_net_prev_if = wan_if
stats = _netstat_bytes(wan_if)
if not stats:
_net_rx_bps = _net_tx_bps = 0.0
return
_net_prev_ibytes, _net_prev_obytes = stats
_net_prev_t = now
_net_rx_bps = _net_tx_bps = 0.0
return
if now - _net_prev_t < NET_SAMPLE:
return
stats = _netstat_bytes(wan_if)
if not stats:
return
iby, oby = stats
dt = max(0.001, now - _net_prev_t)
di = max(0, iby - _net_prev_ibytes)
do = max(0, oby - _net_prev_obytes)
_net_rx_bps = (di * 8.0) / dt
_net_tx_bps = (do * 8.0) / dt
_net_prev_ibytes, _net_prev_obytes, _net_prev_t = iby, oby, now
def wan_rates():
return _net_rx_bps, _net_tx_bps
# ---- Disk usage ----
def total_disk_usage_percent():
zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size,alloc"])
if zp:
size_b=0; alloc_b=0
for line in zp.splitlines():
parts=line.split()
if len(parts)>=2:
try: size_b+=int(parts[0]); alloc_b+=int(parts[1])
except: pass
if size_b>0: return 100.0*alloc_b/size_b
out = run_cmd(["/bin/df","-kP","/"]).splitlines()
if len(out)>=2:
cols = out[1].split()
if len(cols)>=3:
try:
total_k=int(cols[1]); used_k=int(cols[2])
return 100.0*used_k/total_k if total_k>0 else 0.0
except: pass
return 0.0
def _total_size_gib():
zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size"])
if zp:
try: return max(1, round(sum(int(x) for x in zp.split())/(1<<30)))
except: pass
out = run_cmd(["/bin/df","-kP","/"]).splitlines()
if len(out)>=2:
try: return max(1, round((int(out[1].split()[1])*1024)/(1<<30)))
except: pass
return 0
# ---- OpenVPN (server clients) ----
def _ovpn_instances():
"""Return list of dicts: {'base':path_without_ext, 'conf':..., 'sock':..., 'stat':..., 'is_server':bool}"""
out=[]
for conf in sorted(glob.glob("/var/etc/openvpn/instance-*.conf")):
base = conf[:-5] # strip .conf
sock = base + ".sock"
stat = base + ".stat"
is_server = False
try:
with open(conf,"r",errors="ignore") as f: txt=f.read()
if re.search(r'^\s*mode\s+server\s*$', txt, re.M|re.I): is_server=True
if re.search(r'^\s*server(-bridge)?\b', txt, re.M|re.I): is_server=True
except Exception:
pass
out.append({"base":base,"conf":conf,"sock":sock,"stat":stat,"is_server":is_server})
return out
def _ovpn_count_from_socket(sock_path, timeout=3.0):
"""Try status 3 (CSV), then status 2, then text 'status'."""
try:
s=_sock.socket(_sock.AF_UNIX,_sock.SOCK_STREAM)
s.settimeout(timeout); s.connect(sock_path)
try: s.recv(1024)
except Exception: pass
for cmd in (b"status 3\n", b"status 2\n", b"status\n"):
try:
s.sendall(cmd); buf=b""
while True:
chunk=s.recv(4096)
if not chunk: break
buf+=chunk
if b"\nEND" in buf or b"\r\nEND" in buf: break
txt=buf.decode(errors="ignore")
if cmd!=b"status\n":
cnt=sum(1 for line in txt.splitlines() if line.startswith("CLIENT_LIST,"))
if cnt>0:
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
return cnt
# text format fallback parse
in_clients=False; c=0
for line in txt.splitlines():
L=line.strip()
if L.startswith("CLIENT LIST"):
in_clients=True; continue
if in_clients:
if not L or L.startswith(("Updated,","Common Name")): continue
if L.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
c+=1
if c>0:
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
return c
except Exception:
continue
try: s.sendall(b"quit\n")
except Exception: pass
s.close()
except Exception as e:
logd(f"OpenVPN sock fail {sock_path}: {e}")
return 0
def _ovpn_count_from_status_file(path):
try:
with open(path,"r",errors="ignore") as f: lines=f.read().splitlines()
except Exception:
return 0
v2=[l for l in lines if l.startswith("CLIENT_LIST,")]
if v2: return len(v2)
in_clients=False; c=0
for l in lines:
t=l.strip()
if t.startswith("CLIENT LIST"): in_clients=True; continue
if in_clients:
if not t or t.startswith(("Updated,","Common Name")): continue
if t.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
c+=1
return c
def count_openvpn_clients():
total=0
insts=_ovpn_instances()
if not insts:
for sp in sorted(set(glob.glob("/var/etc/openvpn/*.sock")+glob.glob("/var/run/openvpn/*.sock"))):
total += _ovpn_count_from_socket(sp)
for sf in sorted(set(glob.glob("/var/etc/openvpn/*.stat"))):
total += _ovpn_count_from_status_file(sf)
return total
for inst in insts:
if not inst["is_server"]:
logd(f"skip client instance: {inst['conf']}")
continue
cnt=0
if os.path.exists(inst["sock"]):
cnt=_ovpn_count_from_socket(inst["sock"])
logd(f"sock {inst['sock']} -> {cnt}")
if cnt==0 and os.path.exists(inst["stat"]):
c2=_ovpn_count_from_status_file(inst["stat"])
logd(f"stat {inst['stat']} -> {c2}")
cnt += c2
total += cnt
return total
# ---- WireGuard ----
def _wg_cmd_output():
for cmd in ("/usr/local/bin/wg","/usr/local/sbin/wg","/usr/bin/wg","/sbin/wg"):
if os.path.exists(cmd): return run_cmd([cmd,"show","all","dump"])
return ""
def count_wireguard_active(window=WG_ACTIVE_WINDOW):
txt=_wg_cmd_output()
if not txt: return 0
now=int(time.time()); cnt=0
for line in txt.splitlines():
parts=line.split("\t");
if len(parts)<9: parts=line.split()
if len(parts)>=9:
try: last=int(parts[5])
except: continue
if last>0 and (now-last)<=window: cnt+=1
return cnt
# --------------------------- LCD CLIENT -------------------------------
class LCDClient:
def __init__(self, host=LCDD_HOST, port=LCDD_PORT, w=LCD_W, h=LCD_H):
self.host, self.port, self.w, self.h = host, port, w, h
self.sock=None; self.screen_id="scr"; self.w1_id="l1"; self.w2_id="l2"; self.current_title=None
def connect(self):
self.close()
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.settimeout(3.0); s.connect((self.host,self.port)); self.sock=s
self._read_banner(); self._send("hello")
self._send('client_set name "opn-lcd"')
self._send("client_set -backlight on"); self._send("client_set -heartbeat off")
self._send(f"screen_add {self.screen_id}")
self._send(f'screen_set {self.screen_id} -name "OPNsense" -priority foreground -heartbeat off')
self._send(f"widget_add {self.screen_id} {self.w1_id} string")
if self.h>=2: self._send(f"widget_add {self.screen_id} {self.w2_id} string")
self.current_title="OPNsense"
def close(self):
if self.sock:
try: self.sock.close()
except Exception: pass
self.sock=None
def _read_banner(self):
try: self.sock.settimeout(1.0); _=self.sock.recv(1024)
except Exception: pass
finally: self.sock.settimeout(3.0)
def _send(self,line:str):
if self.sock: self.sock.sendall((line+"\n").encode())
def set_title_once(self,title:str):
title=trunc(title,self.w)
if title!=self.current_title:
self._send(f'screen_set {self.screen_id} -name "{lcd_escape(title)}"')
self.current_title=title
def draw(self,l1:str,l2:str=""):
t1=trunc(l1,self.w); x1=center_x(t1,self.w)
self._send(f'widget_set {self.screen_id} {self.w1_id} {x1} 1 "{lcd_escape(t1)}"')
if self.h>=2:
t2=trunc(l2,self.w); x2=center_x(t2,self.w)
self._send(f'widget_set {self.screen_id} {self.w2_id} {x2} 2 "{lcd_escape(t2)}"')
# ---------------------------- MAIN LOOP ------------------------------
def main():
lcd=LCDClient()
next_refresh=time.monotonic()+1.0
refresh_period=30.0
screens=("FW","DATE","UP","CPU","LOAD","TEMP","MEM","DSK","LAN","WAN","NET","GW","USERS")
idx=0; next_switch=time.monotonic()+SCREEN_SECONDS
wan_if=WAN_IF_OVERRIDE; gateway=""; wan_ip=""; lan_if=""; lan_ip=""
sslvpn=0; wg=0
temp_max=None; temp_avg=None; temp_n=0
while True:
nowm=time.monotonic()
cpu_sampler_tick()
wan_sampler_tick(wan_if)
if lcd.sock is None:
try: lcd.connect()
except Exception:
time.sleep(0.5); continue
if nowm>=next_refresh:
if not WAN_IF_OVERRIDE:
wan_if, gateway = default_route_info()
else:
_, gateway = default_route_info()
wan_ip = iface_ipv4(wan_if) if wan_if else ""
if LAN_IF_OVERRIDE:
lan_if=LAN_IF_OVERRIDE
else:
lan_if = pick_lan_iface({wan_if,"lo0"})
lan_ip = iface_ipv4(lan_if) if lan_if else ""
sslvpn = count_openvpn_clients()
wg = count_wireguard_active(WG_ACTIVE_WINDOW)
temps = cpu_temps_c()
if temps:
temp_max = max(temps)
temp_avg = sum(temps)/len(temps)
temp_n = len(temps)
else:
temp_max = temp_avg = None
temp_n = 0
logd(f"counts: SSLVPN={sslvpn} WG={wg} temps={temps if temps else 'N/A'}")
next_refresh += refresh_period
if nowm>=next_switch:
idx=(idx+1)%len(screens)
while nowm>=next_switch: next_switch+=SCREEN_SECONDS
cur=screens[idx]
try:
if cur=="FW":
lcd.set_title_once("OPNSense FW"); lcd.draw("OPNSense FW", opnsense_version())
elif cur=="DATE":
lcd.set_title_once("Date/Time"); l1,l2=date_time_lines(); lcd.draw(l1,l2)
elif cur=="UP":
lcd.set_title_once("Uptime"); d,hm=uptime_tuple(); lcd.draw("System Uptime", f"{d} days {hm}")
elif cur=="CPU":
lcd.set_title_once("CPU"); lcd.draw("CPU Usage", f"{cpu_latest_percent():.1f}%")
elif cur=="LOAD":
lcd.set_title_once("Load Average");
lcd.draw("Load Average", loadavg_str())
elif cur=="TEMP":
lcd.set_title_once("CPU Temp")
if temp_max is None:
lcd.draw("CPU Temp", "Sensors N/A")
else:
line2 = (f"Max{temp_max:.0f}C Avg{temp_avg:.0f}C" if temp_n>1 else f"{temp_max:.0f}C")
lcd.draw("CPU Temp", line2)
elif cur=="MEM":
lcd.set_title_once("Memory"); lcd.draw("Memory Usage", f"{mem_usage_percent():.0f}% used")
elif cur=="DSK":
lcd.set_title_once("Disk Used"); d=total_disk_usage_percent(); total_g=_total_size_gib()
lcd.draw("Disk Used", f"{d:.0f}% of {total_g}G" if total_g>0 else f"{d:.0f}%")
elif cur=="LAN":
lcd.set_title_once("LAN IP"); lcd.draw("LAN IP", lan_ip or "N/A")
elif cur=="WAN":
lcd.set_title_once("WAN IP"); lcd.draw("WAN IP", wan_ip or "N/A")
elif cur=="NET":
lcd.set_title_once("WAN Rate")
if not wan_if:
lcd.draw("WAN Rate", "N/A")
else:
rx, tx = wan_rates()
lcd.draw("WAN Rate", f"D:{_fmt_rate(rx)} U:{_fmt_rate(tx)}")
elif cur=="GW":
lcd.set_title_once("Gateway"); lcd.draw("Gateway", gateway or "N/A")
elif cur=="USERS":
lcd.set_title_once("Live Users"); lcd.draw("Live Users", f"SSL:{sslvpn} WG:{wg}")
except (BrokenPipeError,OSError):
lcd.close()
time.sleep(0.1)
if __name__=="__main__":
try: main()
except KeyboardInterrupt: pass