Menu

Show posts

This section allows you to view all posts made by this member. Note that you can only see posts made in areas you currently have access to.

Show posts Menu

Messages - hgarca089

#1
I have a script that show some info , like version, date/time, CPU ussage and temp, memory, disk.....

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
opn-lcd.py — LCDproc client for OPNsense

Rotation:
  Version → Date/Time → Uptime → CPU → CPU Temp → Memory → Disk Used → LAN IP → WAN IP → Gateway → Live Users

Highlights
- Even timing via time.monotonic()
- CPU% via non-blocking 1s sampler (matches `top -d 1`)
- CPU temps via dev.cpu.*.temperature (coretemp/amdtemp); fallback to ACPI tz
- Disk% via ZFS pools (zpool -Hp size,alloc), fallback to root (/)
- OpenVPN SSL users: parses /var/etc/openvpn/instance-*.conf to find **server** instances,
  then counts via instance .sock (mgmt) or .stat (CSV/text)
- WireGuard active peers by recent handshake (default 180s)
"""

import os, re, glob, socket, socket as _sock, subprocess, time
from datetime import datetime

# ----------------------------- CONFIG ---------------------------------
LCDD_HOST = os.environ.get("OPN_LCD_HOST", "127.0.0.1")
LCDD_PORT = int(os.environ.get("OPN_LCD_PORT", "13666"))
LCD_W = int(os.environ.get("OPN_LCD_W", "16"))
LCD_H = int(os.environ.get("OPN_LCD_H", "2"))
SCREEN_SECONDS = float(os.environ.get("OPN_LCD_SCREEN_SEC", "4.0"))
CPU_SAMPLE = float(os.environ.get("OPN_LCD_CPU_SAMPLE", "1.0"))  # 1s to match top
WAN_IF_OVERRIDE = os.environ.get("OPN_LCD_WAN_IF", "").strip()
LAN_IF_OVERRIDE = os.environ.get("OPN_LCD_LAN_IF", "").strip()
WG_ACTIVE_WINDOW = int(os.environ.get("OPN_LCD_WG_WINDOW", "180"))
NET_SAMPLE = float(os.environ.get("OPN_LCD_NET_SAMPLE", "1.0"))  # 1s WAN rate sampler
DEBUG = os.environ.get("OPN_LCD_DEBUG", "0") not in ("0","false","False","")

def logd(msg):
    if DEBUG:
        try: print(f"[opn-lcd] {msg}")
        except Exception: pass

# --------------------------- HELPERS ----------------------------------
def run_cmd(args, timeout=2.5) -> str:
    try:
        out = subprocess.check_output(args, timeout=timeout, stderr=subprocess.DEVNULL)
        return out.decode(errors="ignore").strip()
    except Exception:
        return ""

def lcd_escape(s: str) -> str: return s.replace("\\","\\\\").replace('"','\\"')
def trunc(s: str, w: int) -> str: return s if len(s)<=w else s[:w]
def center_x(s: str, w: int) -> int: return 1 if len(s)>=w else 1+(w-len(s))//2

# --------------------------- METRICS ----------------------------------
def sysctl_values(*names):
    if not names: return []
    out = run_cmd(["/sbin/sysctl","-n",*names]).splitlines()
    while len(out) < len(names): out.append("")
    return out

# ---- CPU non-blocking sampler ----
def cpu_times():
    raw = run_cmd(["/sbin/sysctl","-n","kern.cp_time"])
    try:
        u,n,s,inte,idle = map(int, raw.split());
        return {"user":u,"nice":n,"sys":s,"intr":inte,"idle":idle}
    except Exception:
        return {"user":0,"nice":0,"sys":0,"intr":0,"idle":0}

_cpu_prev = None; _cpu_prev_t = 0.0; _cpu_pct = 0.0

def cpu_sampler_tick():
    global _cpu_prev,_cpu_prev_t,_cpu_pct
    now = time.monotonic()
    if _cpu_prev is None:
        _cpu_prev = cpu_times(); _cpu_prev_t = now; return
    if now - _cpu_prev_t >= CPU_SAMPLE:
        a = _cpu_prev; b = cpu_times()
        d = {k:max(0,b[k]-a[k]) for k in b}
        total = sum(d.values()); idle = d.get("idle",0)
        _cpu_pct = 0.0 if total<=0 else 100.0*(total-idle)/total
        _cpu_prev = b; _cpu_prev_t = now

def cpu_latest_percent(): return _cpu_pct

# ---- CPU temperatures ----

def _parse_celsius(text: str):
    m = re.search(r"(-?\d+(?:\.[0-9]+)?)\s*C", text or "")
    return float(m.group(1)) if m else None

def cpu_temps_c():
    """Return a list of CPU core/package temperatures in Celsius.
    Prefer explicit per-core reads via kern.smp.cpus → dev.cpu.N.temperature.
    Fallback to scanning dev.cpu and then ACPI thermal zones."""
    temps = []
    # 0) Fast path: explicit per-core sysctls
    n_str = run_cmd(["/sbin/sysctl", "-n", "kern.smp.cpus"]) or ""
    try:
        n = max(1, int(n_str))
    except Exception:
        n = 16  # safe upper bound; we'll just skip missing ones
    for i in range(n):
        val = run_cmd(["/sbin/sysctl", "-n", f"dev.cpu.{i}.temperature"]) or ""
        c = _parse_celsius(val)
        if c is not None:
            temps.append(c)
    if temps:
        return temps

    # 1) Broader scan under dev.cpu.* (handles sparse indexes)
    txt = run_cmd(["/sbin/sysctl", "-a", "dev.cpu"]) or ""
    for line in txt.splitlines():
        m = re.search(r"dev\.cpu\.(\d+)\.temperature:\s*([-0-9.]+)C", line)
        if m:
            try:
                temps.append(float(m.group(2)))
            except Exception:
                pass
    if temps:
        return temps

    # 2) ACPI thermal zones fallback
    for tz in range(0, 8):
        val = run_cmd(["/sbin/sysctl", "-n", f"hw.acpi.thermal.tz{tz}.temperature"]) or ""
        c = _parse_celsius(val)
        if c is not None:
            temps.append(c)
    return temps

# ---- Load average ----
def loadavg_values():
    s = run_cmd(["/sbin/sysctl","-n","vm.loadavg"]) or ""
    vals = re.findall(r"(-?\d+(?:\.\d+)?)", s)
    if len(vals) >= 3:
        try:
            return [float(vals[0]), float(vals[1]), float(vals[2])]
        except Exception:
            pass
    up = run_cmd(["/usr/bin/uptime"]) or ""
    vals = re.findall(r"(\d+\.\d+)", up)
    if len(vals) >= 3:
        try:
            return [float(vals[-3]), float(vals[-2]), float(vals[-1])]
        except Exception:
            pass
    return []

def loadavg_str():
    v = loadavg_values()
    if not v:
        return "N/A"
    return f"{v[0]:.2f} {v[1]:.2f} {v[2]:.2f}"

# ---- Memory ----
def mem_usage_percent():
    total_s, active_s, wired_s = sysctl_values(
        "vm.stats.vm.v_page_count","vm.stats.vm.v_active_count","vm.stats.vm.v_wire_count")
    try:
        total = int(total_s) or 1; used = int(active_s)+int(wired_s)
        return 100.0*used/total
    except Exception: return 0.0

# ---- Uptime / Date ----
def uptime_tuple():
    s = run_cmd(["/sbin/sysctl","-n","kern.boottime"])
    m = re.search(r"sec\s*=\s*(\d+)", s)
    boot = int(m.group(1)) if m else int(s or 0) if s.isdigit() else int(time.time())
    delta = max(0, int(time.time())-boot)
    return delta//86400, f"{(delta%86400)//3600:02d}:{(delta%3600)//60:02d}"

def date_time_lines():
    now = datetime.now().astimezone()
    return now.strftime("%a %d %b %Y"), now.strftime("%H:%M:%S %Z")

def opnsense_version():
    for cmd in (["/usr/local/sbin/opnsense-version","-n","-v"],
                ["/usr/local/sbin/opnsense-version","-v"],
                ["/usr/local/sbin/opnsense-version"]):
        out = run_cmd(cmd)
        if out: return out.splitlines()[0].strip()
    return run_cmd(["/usr/bin/uname","-sr"]) or "OPNsense"

# ---- Networking ----
def default_route_info():
    txt = run_cmd(["/sbin/route","-n","get","-inet","default"])
    iface = gw = ""
    for line in txt.splitlines():
        if "interface:" in line: iface = line.split(":",1)[1].strip()
        elif "gateway:" in line: gw = line.split(":",1)[1].strip()
    return iface, gw

def iface_ipv4(iface: str) -> str:
    if not iface: return ""
    out = run_cmd(["/sbin/ifconfig", iface])
    m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)\b", out)
    return m.group(1) if m else ""

def list_ifaces():
    return [n for n in run_cmd(["/sbin/ifconfig","-l"]).split() if n]

def is_private_ipv4(ip: str) -> bool:
    parts = ip.split(".")
    if len(parts)!=4: return False
    try: p1=int(parts[0]); p2=int(parts[1])
    except: return False
    return (p1==10) or (p1==192 and p2==168) or (p1==172 and 16<=p2<=31)

def pick_lan_iface(exclude:set) -> str:
    if LAN_IF_OVERRIDE: return LAN_IF_OVERRIDE
    for iface in list_ifaces():
        if iface in exclude or iface.startswith(("lo","enc","pflog")): continue
        ip = iface_ipv4(iface)
        if ip and is_private_ipv4(ip): return iface
    return ""

# ---- WAN throughput sampler (bytes → bits/sec) ----
_net_prev_if = None
_net_prev_ibytes = 0
_net_prev_obytes = 0
_net_prev_t = 0.0
_net_rx_bps = 0.0
_net_tx_bps = 0.0

def _netstat_bytes(iface: str):
    """Return (ibytes, obytes) for iface via netstat. Robust to column order."""
    if not iface:
        return None
    txt = run_cmd(["/usr/bin/netstat", "-I", iface, "-b", "-n", "-W"]) or ""
    lines = [l for l in txt.splitlines() if l.strip()]
    if len(lines) < 2:
        return None
    header = lines[0].split()
    # find indices for Ibytes/Obytes regardless of case
    i_idx = o_idx = None
    for idx, name in enumerate(header):
        if name.lower() == "ibytes": i_idx = idx
        if name.lower() == "obytes": o_idx = idx
    if i_idx is None or o_idx is None:
        return None
    for row in lines[1:]:
        cols = row.split()
        if not cols or cols[0] != iface:
            continue
        try:
            return int(cols[i_idx]), int(cols[o_idx])
        except Exception:
            continue
    return None

def _fmt_rate(bps: float) -> str:
    if bps <= 0:
        return "0"
    units = ["b","Kb","Mb","Gb","Tb"]
    val = float(bps)
    idx = 0
    while val >= 1000.0 and idx < len(units)-1:
        val /= 1000.0; idx += 1
    if idx >= 2:
        return f"{val:.1f}{units[idx]}"
    return f"{val:.0f}{units[idx]}"

def wan_sampler_tick(wan_if: str):
    global _net_prev_if, _net_prev_ibytes, _net_prev_obytes, _net_prev_t, _net_rx_bps, _net_tx_bps
    now = time.monotonic()
    if not wan_if:
        _net_prev_if = None
        _net_rx_bps = _net_tx_bps = 0.0
        return
    if _net_prev_if != wan_if:
        _net_prev_if = wan_if
        stats = _netstat_bytes(wan_if)
        if not stats:
            _net_rx_bps = _net_tx_bps = 0.0
            return
        _net_prev_ibytes, _net_prev_obytes = stats
        _net_prev_t = now
        _net_rx_bps = _net_tx_bps = 0.0
        return
    if now - _net_prev_t < NET_SAMPLE:
        return
    stats = _netstat_bytes(wan_if)
    if not stats:
        return
    iby, oby = stats
    dt = max(0.001, now - _net_prev_t)
    di = max(0, iby - _net_prev_ibytes)
    do = max(0, oby - _net_prev_obytes)
    _net_rx_bps = (di * 8.0) / dt
    _net_tx_bps = (do * 8.0) / dt
    _net_prev_ibytes, _net_prev_obytes, _net_prev_t = iby, oby, now

def wan_rates():
    return _net_rx_bps, _net_tx_bps

# ---- Disk usage ----
def total_disk_usage_percent():
    zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size,alloc"])
    if zp:
        size_b=0; alloc_b=0
        for line in zp.splitlines():
            parts=line.split()
            if len(parts)>=2:
                try: size_b+=int(parts[0]); alloc_b+=int(parts[1])
                except: pass
        if size_b>0: return 100.0*alloc_b/size_b
    out = run_cmd(["/bin/df","-kP","/"]).splitlines()
    if len(out)>=2:
        cols = out[1].split()
        if len(cols)>=3:
            try:
                total_k=int(cols[1]); used_k=int(cols[2])
                return 100.0*used_k/total_k if total_k>0 else 0.0
            except: pass
    return 0.0

def _total_size_gib():
    zp = run_cmd(["/sbin/zpool","list","-Hp","-o","size"])
    if zp:
        try: return max(1, round(sum(int(x) for x in zp.split())/(1<<30)))
        except: pass
    out = run_cmd(["/bin/df","-kP","/"]).splitlines()
    if len(out)>=2:
        try: return max(1, round((int(out[1].split()[1])*1024)/(1<<30)))
        except: pass
    return 0

# ---- OpenVPN (server clients) ----
def _ovpn_instances():
    """Return list of dicts: {'base':path_without_ext, 'conf':..., 'sock':..., 'stat':..., 'is_server':bool}"""
    out=[]
    for conf in sorted(glob.glob("/var/etc/openvpn/instance-*.conf")):
        base = conf[:-5]  # strip .conf
        sock = base + ".sock"
        stat = base + ".stat"
        is_server = False
        try:
            with open(conf,"r",errors="ignore") as f: txt=f.read()
            if re.search(r'^\s*mode\s+server\s*$', txt, re.M|re.I): is_server=True
            if re.search(r'^\s*server(-bridge)?\b', txt, re.M|re.I): is_server=True
        except Exception:
            pass
        out.append({"base":base,"conf":conf,"sock":sock,"stat":stat,"is_server":is_server})
    return out

def _ovpn_count_from_socket(sock_path, timeout=3.0):
    """Try status 3 (CSV), then status 2, then text 'status'."""
    try:
        s=_sock.socket(_sock.AF_UNIX,_sock.SOCK_STREAM)
        s.settimeout(timeout); s.connect(sock_path)
        try: s.recv(1024)
        except Exception: pass

        for cmd in (b"status 3\n", b"status 2\n", b"status\n"):
            try:
                s.sendall(cmd); buf=b""
                while True:
                    chunk=s.recv(4096)
                    if not chunk: break
                    buf+=chunk
                    if b"\nEND" in buf or b"\r\nEND" in buf: break
                txt=buf.decode(errors="ignore")
                if cmd!=b"status\n":
                    cnt=sum(1 for line in txt.splitlines() if line.startswith("CLIENT_LIST,"))
                    if cnt>0:
                        try: s.sendall(b"quit\n")
                        except Exception: pass
                        s.close()
                        return cnt
                # text format fallback parse
                in_clients=False; c=0
                for line in txt.splitlines():
                    L=line.strip()
                    if L.startswith("CLIENT LIST"):
                        in_clients=True; continue
                    if in_clients:
                        if not L or L.startswith(("Updated,","Common Name")): continue
                        if L.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
                        c+=1
                if c>0:
                    try: s.sendall(b"quit\n")
                    except Exception: pass
                    s.close()
                    return c
            except Exception:
                continue
        try: s.sendall(b"quit\n")
        except Exception: pass
        s.close()
    except Exception as e:
        logd(f"OpenVPN sock fail {sock_path}: {e}")
    return 0

def _ovpn_count_from_status_file(path):
    try:
        with open(path,"r",errors="ignore") as f: lines=f.read().splitlines()
    except Exception:
        return 0
    v2=[l for l in lines if l.startswith("CLIENT_LIST,")]
    if v2: return len(v2)
    in_clients=False; c=0
    for l in lines:
        t=l.strip()
        if t.startswith("CLIENT LIST"): in_clients=True; continue
        if in_clients:
            if not t or t.startswith(("Updated,","Common Name")): continue
            if t.startswith(("ROUTING TABLE","GLOBAL STATS","END")): break
            c+=1
    return c

def count_openvpn_clients():
    total=0
    insts=_ovpn_instances()
    if not insts:
        for sp in sorted(set(glob.glob("/var/etc/openvpn/*.sock")+glob.glob("/var/run/openvpn/*.sock"))):
            total += _ovpn_count_from_socket(sp)
        for sf in sorted(set(glob.glob("/var/etc/openvpn/*.stat"))):
            total += _ovpn_count_from_status_file(sf)
        return total
    for inst in insts:
        if not inst["is_server"]:
            logd(f"skip client instance: {inst['conf']}")
            continue
        cnt=0
        if os.path.exists(inst["sock"]):
            cnt=_ovpn_count_from_socket(inst["sock"])
            logd(f"sock {inst['sock']} -> {cnt}")
        if cnt==0 and os.path.exists(inst["stat"]):
            c2=_ovpn_count_from_status_file(inst["stat"])
            logd(f"stat {inst['stat']} -> {c2}")
            cnt += c2
        total += cnt
    return total

# ---- WireGuard ----
def _wg_cmd_output():
    for cmd in ("/usr/local/bin/wg","/usr/local/sbin/wg","/usr/bin/wg","/sbin/wg"):
        if os.path.exists(cmd): return run_cmd([cmd,"show","all","dump"])
    return ""

def count_wireguard_active(window=WG_ACTIVE_WINDOW):
    txt=_wg_cmd_output()
    if not txt: return 0
    now=int(time.time()); cnt=0
    for line in txt.splitlines():
        parts=line.split("\t");
        if len(parts)<9: parts=line.split()
        if len(parts)>=9:
            try: last=int(parts[5])
            except: continue
            if last>0 and (now-last)<=window: cnt+=1
    return cnt

# --------------------------- LCD CLIENT -------------------------------
class LCDClient:
    def __init__(self, host=LCDD_HOST, port=LCDD_PORT, w=LCD_W, h=LCD_H):
        self.host, self.port, self.w, self.h = host, port, w, h
        self.sock=None; self.screen_id="scr"; self.w1_id="l1"; self.w2_id="l2"; self.current_title=None
    def connect(self):
        self.close()
        s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        s.settimeout(3.0); s.connect((self.host,self.port)); self.sock=s
        self._read_banner(); self._send("hello")
        self._send('client_set name "opn-lcd"')
        self._send("client_set -backlight on"); self._send("client_set -heartbeat off")
        self._send(f"screen_add {self.screen_id}")
        self._send(f'screen_set {self.screen_id} -name "OPNsense" -priority foreground -heartbeat off')
        self._send(f"widget_add {self.screen_id} {self.w1_id} string")
        if self.h>=2: self._send(f"widget_add {self.screen_id} {self.w2_id} string")
        self.current_title="OPNsense"
    def close(self):
        if self.sock:
            try: self.sock.close()
            except Exception: pass
        self.sock=None
    def _read_banner(self):
        try: self.sock.settimeout(1.0); _=self.sock.recv(1024)
        except Exception: pass
        finally: self.sock.settimeout(3.0)
    def _send(self,line:str):
        if self.sock: self.sock.sendall((line+"\n").encode())
    def set_title_once(self,title:str):
        title=trunc(title,self.w)
        if title!=self.current_title:
            self._send(f'screen_set {self.screen_id} -name "{lcd_escape(title)}"')
            self.current_title=title
    def draw(self,l1:str,l2:str=""):
        t1=trunc(l1,self.w); x1=center_x(t1,self.w)
        self._send(f'widget_set {self.screen_id} {self.w1_id} {x1} 1 "{lcd_escape(t1)}"')
        if self.h>=2:
            t2=trunc(l2,self.w); x2=center_x(t2,self.w)
            self._send(f'widget_set {self.screen_id} {self.w2_id} {x2} 2 "{lcd_escape(t2)}"')

# ---------------------------- MAIN LOOP ------------------------------
def main():
    lcd=LCDClient()
    next_refresh=time.monotonic()+1.0
    refresh_period=30.0
    screens=("FW","DATE","UP","CPU","LOAD","TEMP","MEM","DSK","LAN","WAN","NET","GW","USERS")
    idx=0; next_switch=time.monotonic()+SCREEN_SECONDS

    wan_if=WAN_IF_OVERRIDE; gateway=""; wan_ip=""; lan_if=""; lan_ip=""
    sslvpn=0; wg=0
    temp_max=None; temp_avg=None; temp_n=0

    while True:
        nowm=time.monotonic()
        cpu_sampler_tick()
        wan_sampler_tick(wan_if)

        if lcd.sock is None:
            try: lcd.connect()
            except Exception:
                time.sleep(0.5); continue

        if nowm>=next_refresh:
            if not WAN_IF_OVERRIDE:
                wan_if, gateway = default_route_info()
            else:
                _, gateway = default_route_info()
            wan_ip = iface_ipv4(wan_if) if wan_if else ""
            if LAN_IF_OVERRIDE:
                lan_if=LAN_IF_OVERRIDE
            else:
                lan_if = pick_lan_iface({wan_if,"lo0"})
            lan_ip = iface_ipv4(lan_if) if lan_if else ""

            sslvpn = count_openvpn_clients()
            wg = count_wireguard_active(WG_ACTIVE_WINDOW)

            temps = cpu_temps_c()
            if temps:
                temp_max = max(temps)
                temp_avg = sum(temps)/len(temps)
                temp_n = len(temps)
            else:
                temp_max = temp_avg = None
                temp_n = 0

            logd(f"counts: SSLVPN={sslvpn} WG={wg} temps={temps if temps else 'N/A'}")

            next_refresh += refresh_period

        if nowm>=next_switch:
            idx=(idx+1)%len(screens)
            while nowm>=next_switch: next_switch+=SCREEN_SECONDS

        cur=screens[idx]
        try:
            if cur=="FW":
                lcd.set_title_once("OPNSense FW"); lcd.draw("OPNSense FW", opnsense_version())
            elif cur=="DATE":
                lcd.set_title_once("Date/Time"); l1,l2=date_time_lines(); lcd.draw(l1,l2)
            elif cur=="UP":
                lcd.set_title_once("Uptime"); d,hm=uptime_tuple(); lcd.draw("System Uptime", f"{d} days {hm}")
            elif cur=="CPU":
                lcd.set_title_once("CPU"); lcd.draw("CPU Usage", f"{cpu_latest_percent():.1f}%")
            elif cur=="LOAD":
                lcd.set_title_once("Load Average");
                lcd.draw("Load Average", loadavg_str())
            elif cur=="TEMP":
                lcd.set_title_once("CPU Temp")
                if temp_max is None:
                    lcd.draw("CPU Temp", "Sensors N/A")
                else:
                    line2 = (f"Max{temp_max:.0f}C Avg{temp_avg:.0f}C" if temp_n>1 else f"{temp_max:.0f}C")
                    lcd.draw("CPU Temp", line2)
            elif cur=="MEM":
                lcd.set_title_once("Memory"); lcd.draw("Memory Usage", f"{mem_usage_percent():.0f}% used")
            elif cur=="DSK":
                lcd.set_title_once("Disk Used"); d=total_disk_usage_percent(); total_g=_total_size_gib()
                lcd.draw("Disk Used", f"{d:.0f}% of {total_g}G" if total_g>0 else f"{d:.0f}%")
            elif cur=="LAN":
                lcd.set_title_once("LAN IP"); lcd.draw("LAN IP", lan_ip or "N/A")
            elif cur=="WAN":
                lcd.set_title_once("WAN IP"); lcd.draw("WAN IP", wan_ip or "N/A")
            elif cur=="NET":
                lcd.set_title_once("WAN Rate")
                if not wan_if:
                    lcd.draw("WAN Rate", "N/A")
                else:
                    rx, tx = wan_rates()
                    lcd.draw("WAN Rate", f"D:{_fmt_rate(rx)} U:{_fmt_rate(tx)}")
            elif cur=="GW":
                lcd.set_title_once("Gateway"); lcd.draw("Gateway", gateway or "N/A")
            elif cur=="USERS":
                lcd.set_title_once("Live Users"); lcd.draw("Live Users", f"SSL:{sslvpn} WG:{wg}")
        except (BrokenPipeError,OSError):
            lcd.close()

        time.sleep(0.1)

if __name__=="__main__":
    try: main()
    except KeyboardInterrupt: pass