import argparse import hashlib import io import sys import xml.etree.ElementTree as ET from typing import Literal, Optional, Union import httpx CONFIG_ENC_KEY = 0x26 def xor_xcrypt(data: bytes, key: int, result_as_string: bool = False) -> Union[bytes, str]: if result_as_string: return ''.join(chr(c ^ key) for c in data) else: return bytes(c ^ key for c in data) def get_shell_access_creds(config: ET.Element, protocol: Literal['SSH', 'Telnet'] = 'SSH') -> str: remote_management = config.find(f'.//RemoteManagement/{protocol.upper()}') username = remote_management.find('TelnetUserName' if protocol == 'Telnet' else 'UserName').text password = remote_management.find('TelnetPassword' if protocol == 'Telnet' else 'Password').text return (f'username: {username}\n' f'password: {password}\n') def set_shell_access_creds(config: ET.Element, protocol: Literal['SSH', 'Telnet'] = 'SSH', username: Optional[str] = None, password: Optional[str] = None) -> bool: remote_management = config.find(f'.//RemoteManagement/{protocol.upper()}') if remote_management is None: return False if username: remote_management.find('TelnetUserName' if protocol == 'Telnet' else 'UserName').text = username if password: remote_management.find('TelnetPassword' if protocol == 'Telnet' else 'Password').text = password return True def is_shell_access_enabled(config: ET.Element, interface: Literal['LAN', 'WAN'] = 'LAN', protocol: Literal['SSH', 'Telnet'] = 'SSH') -> bool: service_policy = config.find(f".//ServiceControl/*[ServiceList='{protocol.upper()}'][Interface='{interface}']/Policy") return (service_policy is not None) and (service_policy.text == "Permit") def enable_shell_access(config: ET.Element, interface: Literal['LAN', 'WAN'] = 'LAN', protocol: Literal['SSH', 'Telnet'] = 'SSH', disable: bool = False) -> bool: service = config.find(f".//ServiceControl/*[ServiceList='{protocol.upper()}'][Interface='{interface}']") if service is None: return False service_policy = service.find(".Policy") if service_policy is None: service_policy = ET.SubElement(service, 'Policy', attrib={'PARAMETER': 'configured'}) service_policy.text = "Discard" if disable else "Permit" return True def create_arg_parser(): arg_parser = argparse.ArgumentParser( description='Program to enable or disable Telnet or SSH access to the C-Data FD511G-X ONU') arg_parser.add_argument( 'host', nargs='?', default='192.168.101.1', help='ONU host (IP address or domain). Default: %(default)s.') arg_parser.add_argument( '-a', '--authorization', default='adminisp:adminisp', help='Web interface authorization credentials, separated by colon. Default: %(default)s.') arg_parser.add_argument( '-d', '--disable', action='store_true', help='Disable access. If this argument is not passed, it enables access.') arg_parser.add_argument( '-i', '--interface', choices=('LAN', 'WAN'), default='LAN', type=str, help='Interface for enabling access: LAN or WAN. Default: %(default)s.') arg_parser.add_argument( '-p', '--protocol', choices=('SSH', 'Telnet'), default='SSH', type=str, help='Protocol for enabling access: SSH or Telnet. Default: %(default)s.') arg_parser.add_argument( '--change-username', type=str, help='Set new username') arg_parser.add_argument( '--change-password', type=str, help='Set new password') return arg_parser if __name__ == '__main__': arguments = create_arg_parser().parse_args() host = arguments.host username, password = arguments.authorization.split(':', maxsplit=1) disable = arguments.disable interface = arguments.interface protocol = arguments.protocol change_username = arguments.change_username change_password = arguments.change_password http = httpx.Client(base_url=f'http://{host}') hashed_password = hashlib.md5(password.encode('utf-8')).hexdigest() print(f'Logging in to http://{host} with {username}:{password}...', file=sys.stderr) resp = http.post( '/post.json', json={'module': 'login', 'username': username, 'encryPassword': hashed_password} ) if resp.json()['code'] != 0: sys.exit(f'Authorization error:\n{resp.text}') resp = http.post('/boaform/formSaveConfig', data={'save_cs': 'backup_config'}) resp.raise_for_status() encrypted_config = resp.content decrypted_config = xor_xcrypt(encrypted_config, CONFIG_ENC_KEY, result_as_string=True).replace(',', '\n') config = ET.fromstring(decrypted_config) config_changed = False print(f'Current {protocol} credentials:\n', get_shell_access_creds(config, protocol), sep='') if change_username or change_password: if set_shell_access_creds(config, protocol, change_username, change_password): print(f'{protocol} changed to:\n', get_shell_access_creds(config, protocol), sep='') config_changed = True else: print(f'Failed to change {protocol} credentials', file=sys.stderr) enabled = is_shell_access_enabled(config, interface, protocol) if disable == enabled: if enable_shell_access(config, interface, protocol, disable=disable): print(f'{protocol} access from {interface}', 'disabled' if disable else 'enabled') config_changed = True else: print(f'Failed to {"disable" if disable else "enable"} {protocol}. Check configuration', file=sys.stderr) else: _tip = [f'{protocol} access from {interface} is already {"enabled" if enabled else "disabled"}.'] if enabled: _tip.append(f"To disable, run \"{' '.join(sys.argv)} --disable\"") print(' '.join(_tip)) if config_changed: modified_config = ET.tostring(config, encoding='utf-8', xml_declaration=True, short_empty_elements=False) encrypted_modified_config = xor_xcrypt(modified_config.replace(b'\n', b','), CONFIG_ENC_KEY) print('Uploading new configuration to the ONU...', file=sys.stderr) resp = http.post('/boaform/formSaveConfig', files={ 'binary': ('config.bin', io.BytesIO(encrypted_modified_config), 'application/octet-stream')}) resp.raise_for_status() if resp.json()['code'] != 0: sys.exit(f'Error uploading config:\n{resp.text}') resp = http.get('/get.json', params={'module': 'restore_info'}) resp.raise_for_status() if resp.json()['code'] != 0: sys.exit(f'Error applying config:\n{resp.text}') if input('Config successfully uploaded. Do you want to reboot ONU now? (Y/n) : ').lower() != 'n': try: resp = http.post('/post.json', json={'module': 'dev_config', 'reboot': 1}) except (httpx.TimeoutException, httpx.NetworkError): print('ONU reboot error. The device is probably already rebooting.') sys.exit(0) else: resp.raise_for_status() print('Device is rebooting. Please wait.', file=sys.stderr) else: print('Configuration has not been changed.')