Skip to content

Commit 258fb16

Browse files
authored
Merge pull request #312 from itsDNNS/feat/surfboard-html-fallback
Add HTML fallback for SB8200 with broken HNAP firmware
2 parents 7f93de4 + 866a0c7 commit 258fb16

6 files changed

Lines changed: 993 additions & 198 deletions

File tree

app/drivers/arris_html.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
"""Shared Arris HTML channel-table parser for DOCSight.
2+
3+
Parses the ``/cmconnectionstatus.html`` status page used by Arris cable
4+
modems (CM8200A, SB8200 HTML fallback, and similar) into the standard
5+
DOCSight channel data format.
6+
7+
The page contains two HTML tables:
8+
- "Downstream Bonded Channels" (8 columns)
9+
- "Upstream Bonded Channels" (7 columns)
10+
11+
DOCSIS version is inferred from modulation / channel type:
12+
- DS: modulation "Other" = OFDM (3.1), anything else = SC-QAM (3.0)
13+
- US: "OFDM" in type without "SC-QAM" = OFDMA (3.1), else SC-QAM (3.0)
14+
"""
15+
16+
import logging
17+
18+
from bs4 import BeautifulSoup, Tag
19+
20+
log = logging.getLogger("docsis.arris_html")
21+
22+
23+
# ---------------------------------------------------------------------------
24+
# Public API
25+
# ---------------------------------------------------------------------------
26+
27+
def parse_arris_channel_tables(html: str) -> dict:
28+
"""Parse Arris modem status page HTML into DOCSight channel format.
29+
30+
Returns::
31+
32+
{"channelDs": {"docsis30": [...], "docsis31": [...]},
33+
"channelUs": {"docsis30": [...], "docsis31": [...]}}
34+
"""
35+
soup = BeautifulSoup(html, "html.parser")
36+
ds_table, us_table = _find_channel_tables(soup)
37+
38+
ds30, ds31 = _parse_downstream(ds_table)
39+
us30, us31 = _parse_upstream(us_table)
40+
41+
return {
42+
"channelDs": {"docsis30": ds30, "docsis31": ds31},
43+
"channelUs": {"docsis30": us30, "docsis31": us31},
44+
}
45+
46+
47+
# ---------------------------------------------------------------------------
48+
# Table discovery
49+
# ---------------------------------------------------------------------------
50+
51+
def _find_channel_tables(soup: BeautifulSoup) -> tuple:
52+
"""Find downstream and upstream channel tables by header text.
53+
54+
Returns ``(ds_table, us_table)`` where either may be ``None``.
55+
"""
56+
ds_table = None
57+
us_table = None
58+
59+
for table in soup.find_all("table"):
60+
header = table.find("tr")
61+
if not header:
62+
continue
63+
text = header.get_text(strip=True).lower()
64+
if "downstream bonded" in text:
65+
ds_table = table
66+
elif "upstream bonded" in text:
67+
us_table = table
68+
69+
return ds_table, us_table
70+
71+
72+
# ---------------------------------------------------------------------------
73+
# Row classification
74+
# ---------------------------------------------------------------------------
75+
76+
def _is_header_row(row: Tag) -> bool:
77+
"""True if *row* is a table title or column-header row (not data)."""
78+
if row.find("th"):
79+
return True
80+
if row.find("strong"):
81+
return True
82+
return False
83+
84+
85+
# ---------------------------------------------------------------------------
86+
# Downstream parser
87+
# ---------------------------------------------------------------------------
88+
89+
def _parse_downstream(table) -> tuple:
90+
"""Parse downstream table into ``(docsis30, docsis31)`` channel lists.
91+
92+
Expected 8 columns per data row:
93+
Channel ID | Lock Status | Modulation | Frequency |
94+
Power | SNR/MER | Corrected | Uncorrectables
95+
"""
96+
ds30: list[dict] = []
97+
ds31: list[dict] = []
98+
if not table:
99+
return ds30, ds31
100+
101+
for row in table.find_all("tr"):
102+
if _is_header_row(row):
103+
continue
104+
cells = [td.get_text(strip=True) for td in row.find_all("td")]
105+
if len(cells) < 8:
106+
continue
107+
108+
lock_status = cells[1]
109+
if lock_status != "Locked":
110+
continue
111+
112+
try:
113+
channel_id = int(cells[0])
114+
modulation = cells[2]
115+
frequency = _parse_freq_hz(cells[3])
116+
power = _parse_value(cells[4])
117+
snr = _parse_value(cells[5])
118+
corrected = int(cells[6])
119+
uncorrectables = int(cells[7])
120+
121+
channel: dict = {
122+
"channelID": channel_id,
123+
"frequency": frequency,
124+
"powerLevel": power,
125+
"modulation": modulation,
126+
"corrErrors": corrected,
127+
"nonCorrErrors": uncorrectables,
128+
}
129+
130+
if modulation == "Other":
131+
# OFDM channel (DOCSIS 3.1)
132+
channel["type"] = "OFDM"
133+
channel["mer"] = snr
134+
channel["mse"] = None
135+
ds31.append(channel)
136+
else:
137+
# SC-QAM channel (DOCSIS 3.0)
138+
channel["mer"] = snr
139+
channel["mse"] = -snr if snr is not None else None
140+
ds30.append(channel)
141+
except (ValueError, TypeError, IndexError) as e:
142+
log.warning("Failed to parse DS row: %s", e)
143+
144+
return ds30, ds31
145+
146+
147+
# ---------------------------------------------------------------------------
148+
# Upstream parser
149+
# ---------------------------------------------------------------------------
150+
151+
def _parse_upstream(table) -> tuple:
152+
"""Parse upstream table into ``(docsis30, docsis31)`` channel lists.
153+
154+
Expected 7 columns per data row:
155+
Channel | Channel ID | Lock Status | US Channel Type |
156+
Frequency | Width | Power
157+
"""
158+
us30: list[dict] = []
159+
us31: list[dict] = []
160+
if not table:
161+
return us30, us31
162+
163+
for row in table.find_all("tr"):
164+
if _is_header_row(row):
165+
continue
166+
cells = [td.get_text(strip=True) for td in row.find_all("td")]
167+
if len(cells) < 7:
168+
continue
169+
170+
lock_status = cells[2]
171+
if lock_status != "Locked":
172+
continue
173+
174+
try:
175+
channel_id = int(cells[1])
176+
channel_type = cells[3]
177+
frequency = _parse_freq_hz(cells[4])
178+
power = _parse_value(cells[6])
179+
180+
channel: dict = {
181+
"channelID": channel_id,
182+
"frequency": frequency,
183+
"powerLevel": power,
184+
"modulation": channel_type,
185+
}
186+
187+
if "OFDM" in channel_type and "SC-QAM" not in channel_type:
188+
# OFDMA channel (DOCSIS 3.1)
189+
channel["type"] = "OFDMA"
190+
channel["multiplex"] = ""
191+
us31.append(channel)
192+
else:
193+
# SC-QAM channel (DOCSIS 3.0)
194+
channel["multiplex"] = "SC-QAM"
195+
us30.append(channel)
196+
except (ValueError, TypeError, IndexError) as e:
197+
log.warning("Failed to parse US row: %s", e)
198+
199+
return us30, us31
200+
201+
202+
# ---------------------------------------------------------------------------
203+
# Value helpers
204+
# ---------------------------------------------------------------------------
205+
206+
def _parse_freq_hz(freq_str: str) -> str:
207+
"""Convert ``'795000000 Hz'`` to ``'795 MHz'``."""
208+
if not freq_str:
209+
return ""
210+
parts = freq_str.strip().split()
211+
try:
212+
hz = float(parts[0])
213+
mhz = hz / 1_000_000
214+
if mhz == int(mhz):
215+
return f"{int(mhz)} MHz"
216+
return f"{mhz:.1f} MHz"
217+
except (ValueError, IndexError):
218+
return freq_str
219+
220+
221+
def _parse_value(val_str: str):
222+
"""Parse ``'8.2 dBmV'`` or ``'43.0 dB'`` to float."""
223+
if not val_str:
224+
return None
225+
parts = val_str.strip().split()
226+
try:
227+
return float(parts[0])
228+
except (ValueError, IndexError):
229+
return None

0 commit comments

Comments
 (0)