From 2f104442c321e583bcfb0dcf54258ed9aa693319 Mon Sep 17 00:00:00 2001 From: rizlas Date: Mon, 27 Dec 2021 17:37:56 +0100 Subject: [PATCH] Added sync for all components. WIP on PowerOutlets. --- .../compare_interfaces_button.html | 68 +++- netbox_interface_sync/urls.py | 41 ++- netbox_interface_sync/utils.py | 132 +++++++- netbox_interface_sync/views.py | 307 ++++++++++++++---- 4 files changed, 484 insertions(+), 64 deletions(-) diff --git a/netbox_interface_sync/templates/netbox_interface_sync/compare_interfaces_button.html b/netbox_interface_sync/templates/netbox_interface_sync/compare_interfaces_button.html index dd2c179..e095442 100644 --- a/netbox_interface_sync/templates/netbox_interface_sync/compare_interfaces_button.html +++ b/netbox_interface_sync/templates/netbox_interface_sync/compare_interfaces_button.html @@ -1,3 +1,65 @@ - - Interface sync - \ No newline at end of file +{% if perms.dcim.change_device %} + +{% endif %} \ No newline at end of file diff --git a/netbox_interface_sync/urls.py b/netbox_interface_sync/urls.py index cef81f2..b51488f 100644 --- a/netbox_interface_sync/urls.py +++ b/netbox_interface_sync/urls.py @@ -6,5 +6,44 @@ from . import views # Define a list of URL patterns to be imported by NetBox. Each pattern maps a URL to # a specific view so that it can be accessed by users. urlpatterns = ( - path("interface-comparison//", views.InterfaceComparisonView.as_view(), name="interface_comparison"), + path( + "interface-comparison//", + views.InterfaceComparisonView.as_view(), + name="interface_comparison", + ), + path( + "powerport-comparison//", + views.PowerPortComparisonView.as_view(), + name="powerport_comparison", + ), + path( + "consoleport-comparison//", + views.ConsolePortComparisonView.as_view(), + name="consoleport_comparison", + ), + path( + "consoleserverport-comparison//", + views.ConsoleServerPortComparisonView.as_view(), + name="consoleserverport_comparison", + ), + path( + "poweroutlet-comparison//", + views.PowerOutletComparisonView.as_view(), + name="poweroutlet_comparison", + ), + path( + "frontport-comparison//", + views.FrontPortComparisonView.as_view(), + name="frontport_comparison", + ), + path( + "rearport-comparison//", + views.RearPortComparisonView.as_view(), + name="rearport_comparison", + ), + path( + "devicebay-comparison//", + views.DeviceBayComparisonView.as_view(), + name="devicebay_comparison", + ), ) diff --git a/netbox_interface_sync/utils.py b/netbox_interface_sync/utils.py index 19d7fd1..97f3462 100644 --- a/netbox_interface_sync/utils.py +++ b/netbox_interface_sync/utils.py @@ -1,7 +1,8 @@ -import re +import re, copy from typing import Iterable from dataclasses import dataclass - +from django.shortcuts import render, redirect +from django.contrib import messages def split(s): for x, y in re.findall(r'(\d*)(\D*)', s): @@ -16,14 +17,124 @@ def natural_keys(c): def human_sorted(iterable: Iterable): return sorted(iterable, key=natural_keys) +def get_components(request, device, components, component_templates): + try: + unified_components = [UnifiedInterface(i.id, i.name, i.type, i.get_type_display()) for i in components] + except AttributeError: + unified_components = [UnifiedInterface(i.id, i.name) for i in components] + + try: + unified_component_templates = [ + UnifiedInterface(i.id, i.name, i.type, i.get_type_display(), is_template=True) for i in component_templates] + except AttributeError: + unified_component_templates = [ + UnifiedInterface(i.id, i.name, is_template=True) for i in component_templates] + + # List of interfaces and interface templates presented in the unified format + overall_powers = list(set(unified_component_templates + unified_components)) + overall_powers.sort(key=lambda o: natural_keys(o.name)) + + comparison_templates = [] + comparison_interfaces = [] + for i in overall_powers: + try: + comparison_templates.append(unified_component_templates[unified_component_templates.index(i)]) + except ValueError: + comparison_templates.append(None) + + try: + comparison_interfaces.append(unified_components[unified_components.index(i)]) + except ValueError: + comparison_interfaces.append(None) + + comparison_items = list(zip(comparison_templates, comparison_interfaces)) + return render( + request, "netbox_interface_sync/interface_comparison.html", + { + "comparison_items": comparison_items, + "templates_count": len(unified_component_templates), + "interfaces_count": len(components), + "device": device + } + ) + +def post_components(request, device, components, component_templates, ObjectType, ObjectTemplateType): + # Manually validating interfaces and interface templates lists + add_to_device = filter( + lambda i: i in component_templates.values_list("id", flat=True), + map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device"))) + ) + remove_from_device = filter( + lambda i: i in components.values_list("id", flat=True), + map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device"))) + ) + + # Remove selected interfaces from the device and count them + deleted = ObjectType.objects.filter(id__in=remove_from_device).delete()[0] + + # Add selected interfaces to the device and count them + add_to_device_component = ObjectTemplateType.objects.filter(id__in=add_to_device) + + bulk_create = [] + keys_to_avoid = ["id"] + + for i in add_to_device_component.values(): + tmp = ObjectType() + tmp.device = device + for k in i.keys(): + if k not in keys_to_avoid: + setattr(tmp, k, i[k]) + bulk_create.append(tmp) + + created = len(ObjectType.objects.bulk_create(bulk_create)) + + # Getting and validating a list of interfaces to rename + fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), components) + # Casting interface templates into UnifiedInterface objects for proper comparison with interfaces for renaming + try: + unified_component_templates = [ + UnifiedInterface(i.id, i.name, i.type, i.get_type_display()) for i in component_templates] + except AttributeError: + unified_component_templates = [ + UnifiedInterface(i.id, i.name) for i in component_templates] + + # Rename selected interfaces + fixed = 0 + for component in fix_name_components: + try: + unified_component = UnifiedInterface(component.id, component.name, component.type, component.get_type_display()) + except AttributeError: + unified_component = UnifiedInterface(component.id, component.name) + + try: + # Try to extract an interface template with the corresponding name + corresponding_template = unified_component_templates[unified_component_templates.index(unified_component)] + component.name = corresponding_template.name + component.save() + fixed += 1 + except ValueError: + pass + + # Generating result message + message = [] + if created > 0: + message.append(f"created {created} interfaces") + if deleted > 0: + message.append(f"deleted {deleted} interfaces") + if fixed > 0: + message.append(f"fixed {fixed} interfaces") + messages.success(request, "; ".join(message).capitalize()) + + return redirect(request.path) + @dataclass(frozen=True) class UnifiedInterface: """A unified way to represent the interface and interface template""" id: int name: str - type: str - type_display: str + type: str = "" + type_display: str = "" is_template: bool = False def __eq__(self, other): @@ -33,3 +144,16 @@ class UnifiedInterface: def __hash__(self): # Ignore some fields when hashing; ignore interface name case and whitespaces return hash((self.name.lower().replace(' ', ''), self.type)) + +@dataclass(frozen=True) +class ComparisonPowerOutlet(UnifiedInterface): + + power_port_name: str = "" + + def __eq__(self, other): + # Ignore some fields when comparing; ignore interface name case and whitespaces + return (self.name.lower().replace(' ', '') == other.name.lower().replace(' ', '')) and (self.type == other.type) and (self.power_port_name == other.power_port_name) + + def __hash__(self): + # Ignore some fields when hashing; ignore interface name case and whitespaces + return hash((self.name.lower().replace(' ', ''), self.type, self.power_port_name)) \ No newline at end of file diff --git a/netbox_interface_sync/views.py b/netbox_interface_sync/views.py index 55adfcd..0d092ad 100644 --- a/netbox_interface_sync/views.py +++ b/netbox_interface_sync/views.py @@ -1,11 +1,13 @@ +from django.db.models.fields.related import ForeignKey from django.shortcuts import get_object_or_404, render, redirect from django.views.generic import View -from dcim.models import Device, Interface, InterfaceTemplate +from dcim.choices import PortTypeChoices +from dcim.models import Device, Interface, InterfaceTemplate, PowerPort, PowerPortTemplate, ConsolePort, ConsolePortTemplate, ConsoleServerPort, ConsoleServerPortTemplate, DeviceBay, DeviceBayTemplate, FrontPort, FrontPortTemplate,PowerOutlet, PowerOutletTemplate, RearPort, RearPortTemplate from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMixin from django.conf import settings from django.contrib import messages -from .utils import UnifiedInterface, natural_keys +from .utils import ComparisonPowerOutlet, UnifiedInterface, natural_keys, get_components, post_components from .forms import InterfaceComparisonForm config = settings.PLUGINS_CONFIG['netbox_interface_sync'] @@ -22,37 +24,7 @@ class InterfaceComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View) interfaces = list(filter(lambda i: not i.is_virtual, interfaces)) interface_templates = InterfaceTemplate.objects.filter(device_type=device.device_type) - unified_interfaces = [UnifiedInterface(i.id, i.name, i.type, i.get_type_display()) for i in interfaces] - unified_interface_templates = [ - UnifiedInterface(i.id, i.name, i.type, i.get_type_display(), is_template=True) for i in interface_templates] - - # List of interfaces and interface templates presented in the unified format - overall_interfaces = list(set(unified_interface_templates + unified_interfaces)) - overall_interfaces.sort(key=lambda o: natural_keys(o.name)) - - comparison_templates = [] - comparison_interfaces = [] - for i in overall_interfaces: - try: - comparison_templates.append(unified_interface_templates[unified_interface_templates.index(i)]) - except ValueError: - comparison_templates.append(None) - - try: - comparison_interfaces.append(unified_interfaces[unified_interfaces.index(i)]) - except ValueError: - comparison_interfaces.append(None) - - comparison_items = list(zip(comparison_templates, comparison_interfaces)) - return render( - request, "netbox_interface_sync/interface_comparison.html", - { - "comparison_items": comparison_items, - "templates_count": len(interface_templates), - "interfaces_count": len(interfaces), - "device": device - } - ) + return get_components(request, device, interfaces, interface_templates) def post(self, request, device_id): form = InterfaceComparisonForm(request.POST) @@ -62,53 +34,276 @@ class InterfaceComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View) if config["exclude_virtual_interfaces"]: interfaces = interfaces.exclude(type__in=["virtual", "lag"]) interface_templates = InterfaceTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, interfaces, interface_templates, Interface, InterfaceTemplate) +class PowerPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + powerports = device.powerports.all() + powerports_templates = PowerPortTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, powerports, powerports_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + powerports = device.powerports.all() + powerports_templates = PowerPortTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, powerports, powerports_templates, PowerPort, PowerPortTemplate) + +class ConsolePortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + consoleports = device.consoleports.all() + consoleports_templates = ConsolePortTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, consoleports, consoleports_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + consoleports = device.consoleports.all() + consoleports_templates = ConsolePortTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, consoleports, consoleports_templates, ConsolePort, ConsolePortTemplate) + +class ConsoleServerPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + consoleserverports = device.consoleserverports.all() + consoleserverports_templates = ConsoleServerPortTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, consoleserverports, consoleserverports_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + consoleserverports = device.consoleserverports.all() + consoleserverports_templates = ConsoleServerPortTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, consoleserverports, consoleserverports_templates, ConsoleServerPort, ConsoleServerPortTemplate) + +class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + poweroutlets = device.poweroutlets.all() + poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type) + + unified_components = [ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets] + unified_component_templates = [ + ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), is_template=True, power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets_templates] + + # List of interfaces and interface templates presented in the unified format + overall_powers = list(set(unified_component_templates + unified_components)) + overall_powers.sort(key=lambda o: natural_keys(o.name)) + + comparison_templates = [] + comparison_interfaces = [] + for i in overall_powers: + try: + comparison_templates.append(unified_component_templates[unified_component_templates.index(i)]) + except ValueError: + comparison_templates.append(None) + + try: + comparison_interfaces.append(unified_components[unified_components.index(i)]) + except ValueError: + comparison_interfaces.append(None) + + comparison_items = list(zip(comparison_templates, comparison_interfaces)) + return render( + request, "netbox_interface_sync/interface_comparison.html", + { + "comparison_items": comparison_items, + "templates_count": len(unified_component_templates), + "interfaces_count": len(poweroutlets), + "device": device + } + ) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + poweroutlets = device.poweroutlets.all() + poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type) + + #se il template ha una power port che non ho nel device fisico stop + device_pp = PowerPort.objects.filter(device_id=device.id) + + matching = {} + mismatch = False + for i in poweroutlets_templates: + found = False + if i.power_port_id is not None: + ppt = PowerPortTemplate.objects.get(id=i.power_port_id) + for pp in device_pp: + if pp.name == ppt.name: + matching[i.id] = pp.id + found = True + + if not found: + mismatch = True + break + + if not mismatch: # Manually validating interfaces and interface templates lists + with open("/tmp/ciccio.log", "w") as f: + f.write(str(request.POST.getlist("add_to_device"))) + add_to_device = filter( - lambda i: i in interface_templates.values_list("id", flat=True), + lambda i: i in poweroutlets_templates.values_list("id", flat=True), map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device"))) ) remove_from_device = filter( - lambda i: i in interfaces.values_list("id", flat=True), + lambda i: i in poweroutlets.values_list("id", flat=True), map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device"))) ) # Remove selected interfaces from the device and count them - interfaces_deleted = Interface.objects.filter(id__in=remove_from_device).delete()[0] + deleted = PowerOutlet.objects.filter(id__in=remove_from_device).delete()[0] # Add selected interfaces to the device and count them - add_to_device_interfaces = InterfaceTemplate.objects.filter(id__in=add_to_device) - interfaces_created = len(Interface.objects.bulk_create([ - Interface(device=device, name=i.name, type=i.type) for i in add_to_device_interfaces - ])) + add_to_device_component = PowerOutletTemplate.objects.filter(id__in=add_to_device) + + bulk_create = [] + keys_to_avoid = ["id"] + + for i in add_to_device_component.values(): + tmp = PowerOutlet() + tmp.device = device + for k in i.keys(): + if k not in keys_to_avoid: + setattr(tmp, k, i[k]) + tmp.power_port_id = matching.get(i["id"], None) + bulk_create.append(tmp) + + created = len(PowerOutlet.objects.bulk_create(bulk_create)) # Getting and validating a list of interfaces to rename - fix_name_interfaces = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), interfaces) + fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), poweroutlets) + # Casting interface templates into UnifiedInterface objects for proper comparison with interfaces for renaming - unified_interface_templates = [ - UnifiedInterface(i.id, i.name, i.type, i.get_type_display()) for i in interface_templates] + unified_component_templates = [ + ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), is_template=True, power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets_templates] + # Rename selected interfaces - interfaces_fixed = 0 - for interface in fix_name_interfaces: - unified_interface = UnifiedInterface(interface.id, interface.name, interface.type, interface.get_type_display()) + fixed = 0 + for component in fix_name_components: + unified_component = [ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets] + try: # Try to extract an interface template with the corresponding name - corresponding_template = unified_interface_templates[unified_interface_templates.index(unified_interface)] - interface.name = corresponding_template.name - interface.save() - interfaces_fixed += 1 + corresponding_template = unified_component_templates[unified_component_templates.index(unified_component)] + component.name = corresponding_template.name + component.save() + fixed += 1 except ValueError: pass # Generating result message message = [] - if interfaces_created > 0: - message.append(f"created {interfaces_created} interfaces") - if interfaces_deleted > 0: - message.append(f"deleted {interfaces_deleted} interfaces") - if interfaces_fixed > 0: - message.append(f"fixed {interfaces_fixed} interfaces") + if created > 0: + message.append(f"created {created} interfaces") + if deleted > 0: + message.append(f"deleted {deleted} interfaces") + if fixed > 0: + message.append(f"fixed {fixed} interfaces") messages.success(request, "; ".join(message).capitalize()) return redirect(request.path) + else: + messages.error(request, "Fai prima le power ports") + return redirect(request.path) + +class FrontPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + frontports = device.frontports.all() + frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, frontports, frontports_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + frontports = device.frontports.all() + frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, frontports, frontports_templates, FrontPort, FrontPortTemplate) + +class RearPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + rearports = device.rearports.all() + rearports_templates = RearPortTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, rearports, rearports_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + rearports = device.rearports.all() + rearports_templates = RearPortTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, rearports, rearports_templates, RearPort, RearPortTemplate) + +class DeviceBayComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): + """Comparison of interfaces between a device and a device type and beautiful visualization""" + permission_required = ("dcim.view_interface", "dcim.add_interface", "dcim.change_interface", "dcim.delete_interface") + + def get(self, request, device_id): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + devicebays = device.devicebays.all() + devicebays_templates = DeviceBayTemplate.objects.filter(device_type=device.device_type) + + return get_components(request, device, devicebays, devicebays_templates) + + def post(self, request, device_id): + form = InterfaceComparisonForm(request.POST) + if form.is_valid(): + device = get_object_or_404(Device.objects.filter(id=device_id)) + + devicebays = device.devicebays.all() + devicebays_templates = DeviceBayTemplate.objects.filter(device_type=device.device_type) + + return post_components(request, device, devicebays, devicebays_templates, DeviceBay, DeviceBayTemplate)