Files
cdata_onu_tools/enable_shell_access.py
Victor Golovanenko 117622257b Initial commit
2025-07-27 21:07:02 +03:00

174 lines
7.3 KiB
Python

import argparse
import hashlib
import io
import sys
import xml.etree.ElementTree as ET
from typing import Literal, Optional, Union
import httpx
CONFIG_ENC_KEY = 0x26
def xor_xcrypt(data: bytes, key: int, result_as_string: bool = False) -> Union[bytes, str]:
if result_as_string:
return ''.join(chr(c ^ key) for c in data)
else:
return bytes(c ^ key for c in data)
def get_shell_access_creds(config: ET.Element, protocol: Literal['SSH', 'Telnet'] = 'SSH') -> str:
remote_management = config.find(f'.//RemoteManagement/{protocol.upper()}')
username = remote_management.find('TelnetUserName' if protocol == 'Telnet' else 'UserName').text
password = remote_management.find('TelnetPassword' if protocol == 'Telnet' else 'Password').text
return (f'username: {username}\n'
f'password: {password}\n')
def set_shell_access_creds(config: ET.Element, protocol: Literal['SSH', 'Telnet'] = 'SSH',
username: Optional[str] = None, password: Optional[str] = None) -> bool:
remote_management = config.find(f'.//RemoteManagement/{protocol.upper()}')
if remote_management is None:
return False
if username:
remote_management.find('TelnetUserName' if protocol == 'Telnet' else 'UserName').text = username
if password:
remote_management.find('TelnetPassword' if protocol == 'Telnet' else 'Password').text = password
return True
def is_access_enabled(config: ET.Element,
interface: Literal['LAN', 'WAN'] = 'LAN', protocol: Literal['SSH', 'Telnet'] = 'SSH') -> bool:
ssh_policy = config.find(f".//ServiceControl/*[ServiceList='{protocol.upper()}'][Interface='{interface}']/Policy")
return ssh_policy.text == "Permit"
def enable_access(config: ET.Element, interface: Literal['LAN', 'WAN'] = 'LAN',
protocol: Literal['SSH', 'Telnet'] = 'SSH', disable: bool = False) -> bool:
service_policy = config.find(
f".//ServiceControl/*[ServiceList='{protocol.upper()}'][Interface='{interface}']/Policy")
remote_management = config.find(f'.//RemoteManagement/{protocol.upper()}/{protocol}Enable')
if (service_policy is not None) and (remote_management is not None):
service_policy.text = "Discard" if disable else "Permit"
if not disable:
remote_management.text = '1'
return True
return False
def create_arg_parser():
arg_parser = argparse.ArgumentParser(
description='Program to enable or disable Telnet or SSH access to the C-Data FD511G-X ONU')
arg_parser.add_argument(
'host',
nargs='?',
default='192.168.101.1',
help='ONU host (IP address or domain). Default: %(default)s.')
arg_parser.add_argument(
'-a', '--authorization',
default='adminisp:adminisp',
help='Web interface authorization credentials, separated by colon. Default: %(default)s.')
arg_parser.add_argument(
'-d', '--disable',
action='store_true',
help='Disable access. If this argument is not passed, it enables access.')
arg_parser.add_argument(
'-i', '--interface',
choices=('LAN', 'WAN'),
default='LAN',
type=str,
help='Interface for enabling access: LAN or WAN. Default: %(default)s.')
arg_parser.add_argument(
'-p', '--protocol',
choices=('SSH', 'Telnet'),
default='SSH',
type=str,
help='Protocol for enabling access: SSH or Telnet. Default: %(default)s.')
arg_parser.add_argument(
'--change-username',
type=str,
help='Set new username')
arg_parser.add_argument(
'--change-password',
type=str,
help='Set new password')
return arg_parser
if __name__ == '__main__':
arguments = create_arg_parser().parse_args()
host = arguments.host
username, password = arguments.authorization.split(':', maxsplit=1)
disable = arguments.disable
interface = arguments.interface
protocol = arguments.protocol
change_username = arguments.change_username
change_password = arguments.change_password
http = httpx.Client(base_url=f'http://{host}')
hashed_password = hashlib.md5(password.encode('utf-8')).hexdigest()
print(f'Logging in to http://{host} with {username}:{password}...', file=sys.stderr)
resp = http.post(
'/post.json',
json={'module': 'login', 'username': username, 'encryPassword': hashed_password}
)
if resp.json()['code'] != 0:
sys.exit(f'Authorization error:\n{resp.text}')
resp = http.post('/boaform/formSaveConfig', data={'save_cs': 'backup_config'})
resp.raise_for_status()
encrypted_config = resp.content
decrypted_config = xor_xcrypt(encrypted_config, CONFIG_ENC_KEY, result_as_string=True).replace(',', '\n')
config = ET.fromstring(decrypted_config)
config_changed = False
print(f'Current {protocol} credentials:\n', get_shell_access_creds(config, protocol), sep='')
if change_username or change_password:
if set_shell_access_creds(config, protocol, change_username, change_password):
print(f'{protocol} changed to:\n', get_shell_access_creds(config, protocol), sep='')
config_changed = True
else:
print(f'Failed to change {protocol} credentials', file=sys.stderr)
enabled = is_access_enabled(config, interface, protocol)
if disable == enabled:
if enable_access(config, interface, protocol, disable=disable):
print(f'{protocol} access from {interface}', 'disabled' if disable else 'enabled')
config_changed = True
else:
print(f'Failed to {"disable" if disable else "enable"} {protocol}. Check configuration', file=sys.stderr)
else:
_tip = [f'{protocol} access from {interface} is already {"enabled" if enabled else "disabled"}.']
if enabled:
_tip.append(f"To disable, run \"{' '.join(sys.argv)} --disable\"")
print(' '.join(_tip))
if config_changed:
modified_config = ET.tostring(config, encoding='utf-8', xml_declaration=True, short_empty_elements=False)
encrypted_modified_config = xor_xcrypt(modified_config.replace(b'\n', b','), CONFIG_ENC_KEY)
print('Uploading new configuration to the ONU...', file=sys.stderr)
resp = http.post('/boaform/formSaveConfig', files={
'binary': ('config.bin', io.BytesIO(encrypted_modified_config), 'application/octet-stream')})
resp.raise_for_status()
if resp.json()['code'] != 0:
sys.exit(f'Error uploading config:\n{resp.text}')
resp = http.get('/get.json', params={'module': 'restore_info'})
resp.raise_for_status()
if resp.json()['code'] != 0:
sys.exit(f'Error applying config:\n{resp.text}')
if input('Config successfully uploaded. Do you want to reboot ONU now? (Y/n) : ').lower() != 'n':
try:
resp = http.post('/post.json', json={'module': 'dev_config', 'reboot': 1})
except (httpx.ConnectError, httpx.ReadError):
print('ONU reboot error. The device is probably already rebooting.')
sys.exit(0)
else:
resp.raise_for_status()
print('Device is rebooting. Please wait.', file=sys.stderr)
else:
print('Configuration has not been changed.')