mirror of
https://github.com/drygdryg/netbox-plugin-interface-sync
synced 2024-11-29 18:20:52 +03:00
462 lines
21 KiB
Python
462 lines
21 KiB
Python
from collections import namedtuple
|
|
from typing import Type, Tuple
|
|
|
|
from django.db.models import QuerySet
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.views.generic import View
|
|
from dcim.models import (Device, Interface, InterfaceTemplate, PowerPort, PowerPortTemplate, ConsolePort,
|
|
ConsolePortTemplate, ConsoleServerPort, ConsoleServerPortTemplate, DeviceBay,
|
|
DeviceBayTemplate, FrontPort, FrontPortTemplate, PowerOutlet, PowerOutletTemplate, RearPort,
|
|
RearPortTemplate)
|
|
from django.contrib.auth.mixins import PermissionRequiredMixin
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
|
|
from netbox.models import PrimaryModel
|
|
from dcim.constants import VIRTUAL_IFACE_TYPES
|
|
|
|
from . import comparison
|
|
from .utils import get_permissions_for_model, make_integer_list, human_sorted
|
|
|
|
config = settings.PLUGINS_CONFIG['netbox_interface_sync']
|
|
ComparisonTableRow = namedtuple('ComparisonTableRow', ('component_template', 'component'))
|
|
|
|
|
|
class GenericComparisonView(PermissionRequiredMixin, View):
|
|
"""
|
|
Generic object comparison view
|
|
|
|
obj_model: Model of the object involved in the comparison (for example, Interface)
|
|
obj_template_model: Model of the object template involved in the comparison (for example, InterfaceTemplate)
|
|
"""
|
|
obj_model: Type[PrimaryModel] = None
|
|
obj_template_model: Type[PrimaryModel] = None
|
|
|
|
def get_permission_required(self):
|
|
# User must have permission to view the device whose components are being compared
|
|
permissions = ["dcim.view_device"]
|
|
|
|
# Resolve permissions related to the object and the object template
|
|
permissions.extend(get_permissions_for_model(self.obj_model, ("view", "add", "change", "delete")))
|
|
permissions.extend(get_permissions_for_model(self.obj_template_model, ("view",)))
|
|
|
|
return permissions
|
|
|
|
@staticmethod
|
|
def filter_comparison_components(component_templates: QuerySet, components: QuerySet) -> Tuple[QuerySet, QuerySet]:
|
|
"""Override this in the inherited View to implement special comparison objects filtering logic"""
|
|
return component_templates, components
|
|
|
|
def _fetch_comparison_objects(self, device_id: int):
|
|
self.device = get_object_or_404(Device, id=device_id)
|
|
component_templates = self.obj_template_model.objects.filter(device_type_id=self.device.device_type.id)
|
|
components = self.obj_model.objects.filter(device_id=device_id)
|
|
self.component_templates, self.components = self.filter_comparison_components(component_templates, components)
|
|
self.comparison_component_templates = [comparison.from_netbox_object(obj) for obj in self.component_templates]
|
|
self.comparison_components = [comparison.from_netbox_object(obj) for obj in self.components]
|
|
|
|
name_comparison_config = config['name_comparison']
|
|
|
|
def name_key(obj_name: str) -> str:
|
|
name = obj_name
|
|
if name_comparison_config.get('case-insensitive'):
|
|
name = name.lower()
|
|
if name_comparison_config.get('space-insensitive'):
|
|
name = name.replace(' ', '')
|
|
return name
|
|
|
|
component_templates_dict = {name_key(obj.name): obj for obj in self.comparison_component_templates}
|
|
components_dict = {name_key(obj.name): obj for obj in self.comparison_components}
|
|
|
|
self.comparison_table = tuple(
|
|
ComparisonTableRow(
|
|
component_template=component_templates_dict.get(component_name),
|
|
component=components_dict.get(component_name)
|
|
)
|
|
for component_name in human_sorted(set().union(component_templates_dict.keys(), components_dict.keys()))
|
|
)
|
|
|
|
def get(self, request, device_id):
|
|
self._fetch_comparison_objects(device_id)
|
|
|
|
return render(request, "netbox_interface_sync/components_comparison.html", {
|
|
"component_type_name": self.obj_model._meta.verbose_name_plural,
|
|
"comparison_items": self.comparison_table,
|
|
"templates_count": len(self.comparison_component_templates),
|
|
"components_count": len(self.comparison_components),
|
|
"device": self.device,
|
|
})
|
|
|
|
def post(self, request, device_id):
|
|
components_to_add = make_integer_list(request.POST.getlist("add"))
|
|
components_to_delete = make_integer_list(request.POST.getlist("remove"))
|
|
components_to_sync = make_integer_list(request.POST.getlist("sync"))
|
|
if not any((components_to_add, components_to_delete, components_to_sync)):
|
|
messages.warning(request, "No actions selected")
|
|
return redirect(request.path)
|
|
|
|
self._fetch_comparison_objects(device_id)
|
|
|
|
component_ids_to_delete = []
|
|
components_to_bulk_create = []
|
|
synced_count = 0
|
|
for template, component in self.comparison_table:
|
|
if template and (template.id in components_to_add):
|
|
# Add component to the device from the template
|
|
components_to_bulk_create.append(
|
|
self.obj_model(device=self.device, **template.get_fields_for_netbox_component())
|
|
)
|
|
elif component and (component.id in components_to_delete):
|
|
# Delete component from the device
|
|
component_ids_to_delete.append(component.id)
|
|
elif (template and component) and (component.id in components_to_sync):
|
|
# Update component attributes from the template
|
|
synced_count += self.components.filter(id=component.id).update(
|
|
**template.get_fields_for_netbox_component(sync=True)
|
|
)
|
|
|
|
deleted_count = self.obj_model.objects.filter(id__in=component_ids_to_delete).delete()[0]
|
|
created_count = len(self.obj_model.objects.bulk_create(components_to_bulk_create))
|
|
|
|
# Generating result message
|
|
component_type_name = self.obj_model._meta.verbose_name_plural
|
|
message = []
|
|
if synced_count > 0:
|
|
message.append(f"synced {synced_count} {component_type_name}")
|
|
if created_count > 0:
|
|
message.append(f"created {created_count} {component_type_name}")
|
|
if deleted_count > 0:
|
|
message.append(f"deleted {deleted_count} {component_type_name}")
|
|
messages.success(request, "; ".join(message).capitalize())
|
|
|
|
return redirect(request.path)
|
|
|
|
|
|
class ConsolePortComparisonView(GenericComparisonView):
|
|
"""Comparison of console ports between a device and a device type and beautiful visualization"""
|
|
obj_model = ConsolePort
|
|
obj_template_model = ConsolePortTemplate
|
|
|
|
|
|
class ConsoleServerPortComparisonView(GenericComparisonView):
|
|
"""Comparison of console server ports between a device and a device type and beautiful visualization"""
|
|
obj_model = ConsoleServerPort
|
|
obj_template_model = ConsoleServerPortTemplate
|
|
|
|
|
|
class InterfaceComparisonView(GenericComparisonView):
|
|
"""Comparison of interfaces between a device and a device type and beautiful visualization"""
|
|
obj_model = Interface
|
|
obj_template_model = InterfaceTemplate
|
|
|
|
@staticmethod
|
|
def filter_comparison_components(component_templates: QuerySet, components: QuerySet) -> Tuple[QuerySet, QuerySet]:
|
|
if config["exclude_virtual_interfaces"]:
|
|
components = components.exclude(type__in=VIRTUAL_IFACE_TYPES)
|
|
component_templates = component_templates.exclude(type__in=VIRTUAL_IFACE_TYPES)
|
|
return component_templates, components
|
|
|
|
|
|
class PowerPortComparisonView(GenericComparisonView):
|
|
"""Comparison of power ports between a device and a device type and beautiful visualization"""
|
|
obj_model = PowerPort
|
|
obj_template_model = PowerPortTemplate
|
|
|
|
|
|
class PowerOutletComparisonView(GenericComparisonView):
|
|
"""Comparison of power outlets between a device and a device type and beautiful visualization"""
|
|
obj_model = PowerOutlet
|
|
obj_template_model = PowerOutletTemplate
|
|
|
|
def post(self, request, device_id):
|
|
device = get_object_or_404(Device.objects.filter(id=device_id))
|
|
|
|
poweroutlets = device.poweroutlets.all()
|
|
poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type)
|
|
|
|
# Generating result message
|
|
message = []
|
|
created = 0
|
|
updated = 0
|
|
fixed = 0
|
|
|
|
remove_from_device = filter(
|
|
lambda i: i in poweroutlets.values_list("id", flat=True),
|
|
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove")))
|
|
)
|
|
|
|
# Remove selected power outlets from the device and count them
|
|
deleted = PowerOutlet.objects.filter(id__in=remove_from_device).delete()[0]
|
|
|
|
# Get device power ports to check dependency between power outlets
|
|
device_pp = PowerPort.objects.filter(device_id=device.id)
|
|
|
|
matching = {}
|
|
mismatch = False
|
|
for i in poweroutlets_templates:
|
|
found = False
|
|
if i.power_port_id is not None:
|
|
ppt = PowerPortTemplate.objects.get(id=i.power_port_id)
|
|
for pp in device_pp:
|
|
if pp.name == ppt.name:
|
|
# Save matching to add the correct power port later
|
|
matching[i.id] = pp.id
|
|
found = True
|
|
|
|
# If at least one power port is not found in device there is a dependency
|
|
# Better not to sync at all
|
|
if not found:
|
|
mismatch = True
|
|
break
|
|
|
|
if not mismatch:
|
|
add_to_device = filter(
|
|
lambda i: i in poweroutlets_templates.values_list("id", flat=True),
|
|
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add")))
|
|
)
|
|
|
|
# Add selected component to the device and count them
|
|
add_to_device_component = PowerOutletTemplate.objects.filter(id__in=add_to_device)
|
|
|
|
bulk_create = []
|
|
updated = 0
|
|
keys_to_avoid = ["id"]
|
|
|
|
if not config["compare_description"]:
|
|
keys_to_avoid.append("description")
|
|
|
|
for i in add_to_device_component.values():
|
|
to_create = False
|
|
|
|
try:
|
|
# If power outlets already exists, update and do not recreate
|
|
po = device.poweroutlets.get(name=i["name"])
|
|
except PowerOutlet.DoesNotExist:
|
|
po = PowerOutlet()
|
|
po.device = device
|
|
to_create = True
|
|
|
|
# Copy all fields from template
|
|
for k in i.keys():
|
|
if k not in keys_to_avoid:
|
|
setattr(po, k, i[k])
|
|
po.power_port_id = matching.get(i["id"], None)
|
|
|
|
if to_create:
|
|
bulk_create.append(po)
|
|
else:
|
|
po.save()
|
|
updated += 1
|
|
|
|
created = len(PowerOutlet.objects.bulk_create(bulk_create))
|
|
|
|
# Getting and validating a list of components to rename
|
|
fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), poweroutlets)
|
|
|
|
# Casting component templates into Unified objects for proper comparison with component for renaming
|
|
unified_component_templates = [
|
|
PowerOutletComparison(i.id, i.name, i.label, i.description, i.type, i.get_type_display(),
|
|
power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name
|
|
if i.power_port_id is not None else "",
|
|
feed_leg=i.feed_leg, is_template=True)
|
|
for i in poweroutlets_templates]
|
|
|
|
# Rename selected power outlets
|
|
fixed = 0
|
|
for component in fix_name_components:
|
|
unified_poweroutlet = PowerOutletComparison(
|
|
component.id, component.name, component.label, component.description, component.type,
|
|
component.get_type_display(),
|
|
power_port_name=PowerPort.objects.get(id=component.power_port_id).name
|
|
if component.power_port_id is not None else "",
|
|
feed_leg=component.feed_leg
|
|
)
|
|
try:
|
|
# Try to extract a component template with the corresponding name
|
|
corresponding_template = unified_component_templates[
|
|
unified_component_templates.index(unified_poweroutlet)
|
|
]
|
|
component.name = corresponding_template.name
|
|
component.save()
|
|
fixed += 1
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
messages.error(request, "Dependency detected, sync power ports first!")
|
|
|
|
if created > 0:
|
|
message.append(f"created {created} power outlets")
|
|
if updated > 0:
|
|
message.append(f"updated {updated} power outlets")
|
|
if deleted > 0:
|
|
message.append(f"deleted {deleted} power outlets")
|
|
if fixed > 0:
|
|
message.append(f"fixed {fixed} power outlets")
|
|
|
|
messages.info(request, "; ".join(message).capitalize())
|
|
|
|
return redirect(request.path)
|
|
|
|
|
|
class RearPortComparisonView(GenericComparisonView):
|
|
"""Comparison of rear ports between a device and a device type and beautiful visualization"""
|
|
obj_model = RearPort
|
|
obj_template_model = RearPortTemplate
|
|
|
|
|
|
class DeviceBayComparisonView(GenericComparisonView):
|
|
"""Comparison of device bays between a device and a device type and beautiful visualization"""
|
|
obj_model = DeviceBay
|
|
obj_template_model = DeviceBayTemplate
|
|
#
|
|
#
|
|
# class FrontPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View):
|
|
# """Comparison of front ports between a device and a device type and beautiful visualization"""
|
|
# permission_required = get_permissions_for_object("dcim", "frontport")
|
|
#
|
|
# def get(self, request, device_id):
|
|
#
|
|
# device = get_object_or_404(Device.objects.filter(id=device_id))
|
|
#
|
|
# frontports = device.frontports.all()
|
|
# frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type)
|
|
#
|
|
# unified_frontports = [
|
|
# FrontPortComparison(
|
|
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(), i.color, i.rear_port_position)
|
|
# for i in frontports]
|
|
# unified_frontports_templates = [
|
|
# FrontPortComparison(
|
|
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(), i.color,
|
|
# i.rear_port_position, is_template=True)
|
|
# for i in frontports_templates]
|
|
#
|
|
# return get_components(request, device, frontports, unified_frontports, unified_frontports_templates)
|
|
#
|
|
# def post(self, request, device_id):
|
|
# form = ComponentComparisonForm(request.POST)
|
|
# if form.is_valid():
|
|
# device = get_object_or_404(Device.objects.filter(id=device_id))
|
|
#
|
|
# frontports = device.frontports.all()
|
|
# frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type)
|
|
#
|
|
# # Generating result message
|
|
# message = []
|
|
# created = 0
|
|
# updated = 0
|
|
# fixed = 0
|
|
#
|
|
# remove_from_device = filter(
|
|
# lambda i: i in frontports.values_list("id", flat=True),
|
|
# map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device")))
|
|
# )
|
|
#
|
|
# # Remove selected front ports from the device and count them
|
|
# deleted = FrontPort.objects.filter(id__in=remove_from_device).delete()[0]
|
|
#
|
|
# # Get device rear ports to check dependency between front ports
|
|
# device_rp = RearPort.objects.filter(device_id=device.id)
|
|
#
|
|
# matching = {}
|
|
# mismatch = False
|
|
# for i in frontports_templates:
|
|
# found = False
|
|
# if i.rear_port_id is not None:
|
|
# rpt = RearPortTemplate.objects.get(id=i.rear_port_id)
|
|
# for rp in device_rp:
|
|
# if rp.name == rpt.name:
|
|
# # Save matching to add the correct rear port later
|
|
# matching[i.id] = rp.id
|
|
# found = True
|
|
#
|
|
# # If at least one rear port is not found in device there is a dependency
|
|
# # Better not to sync at all
|
|
# if not found:
|
|
# mismatch = True
|
|
# break
|
|
#
|
|
# if not mismatch:
|
|
# add_to_device = filter(
|
|
# lambda i: i in frontports_templates.values_list("id", flat=True),
|
|
# map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device")))
|
|
# )
|
|
#
|
|
# # Add selected component to the device and count them
|
|
# add_to_device_component = FrontPortTemplate.objects.filter(id__in=add_to_device)
|
|
#
|
|
# bulk_create = []
|
|
# updated = 0
|
|
# keys_to_avoid = ["id"]
|
|
#
|
|
# if not config["compare_description"]:
|
|
# keys_to_avoid.append("description")
|
|
#
|
|
# for i in add_to_device_component.values():
|
|
# to_create = False
|
|
#
|
|
# try:
|
|
# # If front port already exists, update and do not recreate
|
|
# fp = device.frontports.get(name=i["name"])
|
|
# except FrontPort.DoesNotExist:
|
|
# fp = FrontPort()
|
|
# fp.device = device
|
|
# to_create = True
|
|
#
|
|
# # Copy all fields from template
|
|
# for k in i.keys():
|
|
# if k not in keys_to_avoid:
|
|
# setattr(fp, k, i[k])
|
|
# fp.rear_port_id = matching.get(i["id"], None)
|
|
#
|
|
# if to_create:
|
|
# bulk_create.append(fp)
|
|
# else:
|
|
# fp.save()
|
|
# updated += 1
|
|
#
|
|
# created = len(FrontPort.objects.bulk_create(bulk_create))
|
|
#
|
|
# # Getting and validating a list of components to rename
|
|
# fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), frontports)
|
|
#
|
|
# # Casting component templates into Unified objects for proper comparison with component for renaming
|
|
# unified_frontports_templates = [
|
|
# FrontPortComparison(
|
|
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(),
|
|
# i.color, i.rear_port_position, is_template=True)
|
|
# for i in frontports_templates]
|
|
# # Rename selected front ports
|
|
# fixed = 0
|
|
# for component in fix_name_components:
|
|
# unified_frontport = FrontPortComparison(
|
|
# component.id, component.name, component.label, component.description, component.type,
|
|
# component.get_type_display(), component.color, component.rear_port_position
|
|
# )
|
|
#
|
|
# try:
|
|
# # Try to extract a component template with the corresponding name
|
|
# corresponding_template = unified_frontports_templates[
|
|
# unified_frontports_templates.index(unified_frontport)
|
|
# ]
|
|
# component.name = corresponding_template.name
|
|
# component.save()
|
|
# fixed += 1
|
|
# except ValueError:
|
|
# pass
|
|
# else:
|
|
# messages.error(request, "Dependency detected, sync rear ports first!")
|
|
#
|
|
# if created > 0:
|
|
# message.append(f"created {created} front ports")
|
|
# if updated > 0:
|
|
# message.append(f"updated {updated} front ports")
|
|
# if deleted > 0:
|
|
# message.append(f"deleted {deleted} front ports")
|
|
# if fixed > 0:
|
|
# message.append(f"fixed {fixed} front ports")
|
|
#
|
|
# messages.info(request, "; ".join(message).capitalize())
|
|
#
|
|
# return redirect(request.path)
|