netbox-plugin-interface-sync/netbox_interface_sync/views.py

462 lines
21 KiB
Python
Raw Permalink Normal View History

2022-03-03 12:39:16 +03:00
from collections import namedtuple
from typing import Type, Tuple
from django.db.models import QuerySet
from django.shortcuts import get_object_or_404, redirect, render
2021-04-20 10:22:26 +03:00
from django.views.generic import View
2022-03-03 12:39:16 +03:00
from dcim.models import (Device, Interface, InterfaceTemplate, PowerPort, PowerPortTemplate, ConsolePort,
ConsolePortTemplate, ConsoleServerPort, ConsoleServerPortTemplate, DeviceBay,
DeviceBayTemplate, FrontPort, FrontPortTemplate, PowerOutlet, PowerOutletTemplate, RearPort,
RearPortTemplate)
from django.contrib.auth.mixins import PermissionRequiredMixin
2021-04-20 10:22:26 +03:00
from django.conf import settings
from django.contrib import messages
2022-03-03 12:39:16 +03:00
from netbox.models import PrimaryModel
from dcim.constants import VIRTUAL_IFACE_TYPES
2021-04-20 10:22:26 +03:00
2022-03-03 12:39:16 +03:00
from . import comparison
from .utils import get_permissions_for_model, make_integer_list, human_sorted
2021-04-20 10:22:26 +03:00
2022-03-03 12:39:16 +03:00
config = settings.PLUGINS_CONFIG['netbox_interface_sync']
ComparisonTableRow = namedtuple('ComparisonTableRow', ('component_template', 'component'))
class GenericComparisonView(PermissionRequiredMixin, View):
"""
Generic object comparison view
obj_model: Model of the object involved in the comparison (for example, Interface)
obj_template_model: Model of the object template involved in the comparison (for example, InterfaceTemplate)
"""
obj_model: Type[PrimaryModel] = None
obj_template_model: Type[PrimaryModel] = None
def get_permission_required(self):
# User must have permission to view the device whose components are being compared
permissions = ["dcim.view_device"]
# Resolve permissions related to the object and the object template
permissions.extend(get_permissions_for_model(self.obj_model, ("view", "add", "change", "delete")))
permissions.extend(get_permissions_for_model(self.obj_template_model, ("view",)))
return permissions
@staticmethod
def filter_comparison_components(component_templates: QuerySet, components: QuerySet) -> Tuple[QuerySet, QuerySet]:
"""Override this in the inherited View to implement special comparison objects filtering logic"""
return component_templates, components
def _fetch_comparison_objects(self, device_id: int):
self.device = get_object_or_404(Device, id=device_id)
component_templates = self.obj_template_model.objects.filter(device_type_id=self.device.device_type.id)
components = self.obj_model.objects.filter(device_id=device_id)
self.component_templates, self.components = self.filter_comparison_components(component_templates, components)
self.comparison_component_templates = [comparison.from_netbox_object(obj) for obj in self.component_templates]
self.comparison_components = [comparison.from_netbox_object(obj) for obj in self.components]
name_comparison_config = config['name_comparison']
def name_key(obj_name: str) -> str:
name = obj_name
if name_comparison_config.get('case-insensitive'):
name = name.lower()
if name_comparison_config.get('space-insensitive'):
name = name.replace(' ', '')
return name
component_templates_dict = {name_key(obj.name): obj for obj in self.comparison_component_templates}
components_dict = {name_key(obj.name): obj for obj in self.comparison_components}
self.comparison_table = tuple(
ComparisonTableRow(
component_template=component_templates_dict.get(component_name),
component=components_dict.get(component_name)
)
2022-03-03 12:39:16 +03:00
for component_name in human_sorted(set().union(component_templates_dict.keys(), components_dict.keys()))
)
def get(self, request, device_id):
2022-03-03 12:39:16 +03:00
self._fetch_comparison_objects(device_id)
2022-03-03 12:39:16 +03:00
return render(request, "netbox_interface_sync/components_comparison.html", {
"component_type_name": self.obj_model._meta.verbose_name_plural,
"comparison_items": self.comparison_table,
"templates_count": len(self.comparison_component_templates),
"components_count": len(self.comparison_components),
"device": self.device,
})
def post(self, request, device_id):
2022-03-03 12:39:16 +03:00
components_to_add = make_integer_list(request.POST.getlist("add"))
components_to_delete = make_integer_list(request.POST.getlist("remove"))
components_to_sync = make_integer_list(request.POST.getlist("sync"))
if not any((components_to_add, components_to_delete, components_to_sync)):
messages.warning(request, "No actions selected")
return redirect(request.path)
2022-03-03 12:39:16 +03:00
self._fetch_comparison_objects(device_id)
2022-03-03 12:39:16 +03:00
component_ids_to_delete = []
components_to_bulk_create = []
synced_count = 0
for template, component in self.comparison_table:
if template and (template.id in components_to_add):
# Add component to the device from the template
components_to_bulk_create.append(
self.obj_model(device=self.device, **template.get_fields_for_netbox_component())
)
elif component and (component.id in components_to_delete):
# Delete component from the device
component_ids_to_delete.append(component.id)
elif (template and component) and (component.id in components_to_sync):
# Update component attributes from the template
synced_count += self.components.filter(id=component.id).update(
**template.get_fields_for_netbox_component(sync=True)
)
2022-03-03 12:39:16 +03:00
deleted_count = self.obj_model.objects.filter(id__in=component_ids_to_delete).delete()[0]
created_count = len(self.obj_model.objects.bulk_create(components_to_bulk_create))
2022-03-03 12:39:16 +03:00
# Generating result message
component_type_name = self.obj_model._meta.verbose_name_plural
message = []
if synced_count > 0:
message.append(f"synced {synced_count} {component_type_name}")
if created_count > 0:
message.append(f"created {created_count} {component_type_name}")
if deleted_count > 0:
message.append(f"deleted {deleted_count} {component_type_name}")
messages.success(request, "; ".join(message).capitalize())
2022-03-03 12:39:16 +03:00
return redirect(request.path)
2022-03-03 12:39:16 +03:00
class ConsolePortComparisonView(GenericComparisonView):
"""Comparison of console ports between a device and a device type and beautiful visualization"""
obj_model = ConsolePort
obj_template_model = ConsolePortTemplate
2022-03-03 12:39:16 +03:00
class ConsoleServerPortComparisonView(GenericComparisonView):
2022-01-12 16:36:55 +03:00
"""Comparison of console server ports between a device and a device type and beautiful visualization"""
2022-03-03 12:39:16 +03:00
obj_model = ConsoleServerPort
obj_template_model = ConsoleServerPortTemplate
2022-03-03 12:39:16 +03:00
class InterfaceComparisonView(GenericComparisonView):
"""Comparison of interfaces between a device and a device type and beautiful visualization"""
obj_model = Interface
obj_template_model = InterfaceTemplate
2022-03-03 12:39:16 +03:00
@staticmethod
def filter_comparison_components(component_templates: QuerySet, components: QuerySet) -> Tuple[QuerySet, QuerySet]:
if config["exclude_virtual_interfaces"]:
components = components.exclude(type__in=VIRTUAL_IFACE_TYPES)
component_templates = component_templates.exclude(type__in=VIRTUAL_IFACE_TYPES)
return component_templates, components
2022-03-03 12:39:16 +03:00
class PowerPortComparisonView(GenericComparisonView):
"""Comparison of power ports between a device and a device type and beautiful visualization"""
obj_model = PowerPort
obj_template_model = PowerPortTemplate
2022-03-03 12:39:16 +03:00
class PowerOutletComparisonView(GenericComparisonView):
2022-01-12 16:36:55 +03:00
"""Comparison of power outlets between a device and a device type and beautiful visualization"""
2022-03-03 12:39:16 +03:00
obj_model = PowerOutlet
obj_template_model = PowerOutletTemplate
2022-03-03 12:39:16 +03:00
def post(self, request, device_id):
device = get_object_or_404(Device.objects.filter(id=device_id))
2022-03-03 12:39:16 +03:00
poweroutlets = device.poweroutlets.all()
poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type)
2021-04-20 10:22:26 +03:00
2022-03-03 12:39:16 +03:00
# Generating result message
message = []
created = 0
updated = 0
fixed = 0
remove_from_device = filter(
lambda i: i in poweroutlets.values_list("id", flat=True),
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove")))
)
# Remove selected power outlets from the device and count them
deleted = PowerOutlet.objects.filter(id__in=remove_from_device).delete()[0]
# Get device power ports to check dependency between power outlets
device_pp = PowerPort.objects.filter(device_id=device.id)
matching = {}
mismatch = False
for i in poweroutlets_templates:
found = False
if i.power_port_id is not None:
ppt = PowerPortTemplate.objects.get(id=i.power_port_id)
for pp in device_pp:
if pp.name == ppt.name:
# Save matching to add the correct power port later
matching[i.id] = pp.id
found = True
# If at least one power port is not found in device there is a dependency
# Better not to sync at all
if not found:
mismatch = True
break
if not mismatch:
add_to_device = filter(
lambda i: i in poweroutlets_templates.values_list("id", flat=True),
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add")))
)
2022-03-03 12:39:16 +03:00
# Add selected component to the device and count them
add_to_device_component = PowerOutletTemplate.objects.filter(id__in=add_to_device)
2022-03-03 12:39:16 +03:00
bulk_create = []
2022-01-11 15:47:43 +03:00
updated = 0
2022-03-03 12:39:16 +03:00
keys_to_avoid = ["id"]
if not config["compare_description"]:
keys_to_avoid.append("description")
for i in add_to_device_component.values():
to_create = False
try:
# If power outlets already exists, update and do not recreate
po = device.poweroutlets.get(name=i["name"])
except PowerOutlet.DoesNotExist:
po = PowerOutlet()
po.device = device
to_create = True
# Copy all fields from template
for k in i.keys():
if k not in keys_to_avoid:
setattr(po, k, i[k])
po.power_port_id = matching.get(i["id"], None)
if to_create:
bulk_create.append(po)
else:
po.save()
updated += 1
created = len(PowerOutlet.objects.bulk_create(bulk_create))
# Getting and validating a list of components to rename
fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), poweroutlets)
# Casting component templates into Unified objects for proper comparison with component for renaming
unified_component_templates = [
PowerOutletComparison(i.id, i.name, i.label, i.description, i.type, i.get_type_display(),
power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name
if i.power_port_id is not None else "",
feed_leg=i.feed_leg, is_template=True)
for i in poweroutlets_templates]
# Rename selected power outlets
2022-01-11 15:47:43 +03:00
fixed = 0
2022-03-03 12:39:16 +03:00
for component in fix_name_components:
unified_poweroutlet = PowerOutletComparison(
component.id, component.name, component.label, component.description, component.type,
component.get_type_display(),
power_port_name=PowerPort.objects.get(id=component.power_port_id).name
if component.power_port_id is not None else "",
feed_leg=component.feed_leg
2022-01-11 15:47:43 +03:00
)
2022-03-03 12:39:16 +03:00
try:
# Try to extract a component template with the corresponding name
corresponding_template = unified_component_templates[
unified_component_templates.index(unified_poweroutlet)
]
component.name = corresponding_template.name
component.save()
fixed += 1
except ValueError:
pass
else:
messages.error(request, "Dependency detected, sync power ports first!")
if created > 0:
message.append(f"created {created} power outlets")
if updated > 0:
message.append(f"updated {updated} power outlets")
if deleted > 0:
message.append(f"deleted {deleted} power outlets")
if fixed > 0:
message.append(f"fixed {fixed} power outlets")
messages.info(request, "; ".join(message).capitalize())
return redirect(request.path)
class RearPortComparisonView(GenericComparisonView):
2022-01-12 16:36:55 +03:00
"""Comparison of rear ports between a device and a device type and beautiful visualization"""
2022-03-03 12:39:16 +03:00
obj_model = RearPort
obj_template_model = RearPortTemplate
2022-01-11 15:47:43 +03:00
2022-03-03 12:39:16 +03:00
class DeviceBayComparisonView(GenericComparisonView):
2022-01-12 16:36:55 +03:00
"""Comparison of device bays between a device and a device type and beautiful visualization"""
2022-03-03 12:39:16 +03:00
obj_model = DeviceBay
obj_template_model = DeviceBayTemplate
#
#
# class FrontPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View):
# """Comparison of front ports between a device and a device type and beautiful visualization"""
# permission_required = get_permissions_for_object("dcim", "frontport")
#
# def get(self, request, device_id):
#
# device = get_object_or_404(Device.objects.filter(id=device_id))
#
# frontports = device.frontports.all()
# frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type)
#
# unified_frontports = [
# FrontPortComparison(
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(), i.color, i.rear_port_position)
# for i in frontports]
# unified_frontports_templates = [
# FrontPortComparison(
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(), i.color,
# i.rear_port_position, is_template=True)
# for i in frontports_templates]
#
# return get_components(request, device, frontports, unified_frontports, unified_frontports_templates)
#
# def post(self, request, device_id):
# form = ComponentComparisonForm(request.POST)
# if form.is_valid():
# device = get_object_or_404(Device.objects.filter(id=device_id))
#
# frontports = device.frontports.all()
# frontports_templates = FrontPortTemplate.objects.filter(device_type=device.device_type)
#
# # Generating result message
# message = []
# created = 0
# updated = 0
# fixed = 0
#
# remove_from_device = filter(
# lambda i: i in frontports.values_list("id", flat=True),
# map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device")))
# )
#
# # Remove selected front ports from the device and count them
# deleted = FrontPort.objects.filter(id__in=remove_from_device).delete()[0]
#
# # Get device rear ports to check dependency between front ports
# device_rp = RearPort.objects.filter(device_id=device.id)
#
# matching = {}
# mismatch = False
# for i in frontports_templates:
# found = False
# if i.rear_port_id is not None:
# rpt = RearPortTemplate.objects.get(id=i.rear_port_id)
# for rp in device_rp:
# if rp.name == rpt.name:
# # Save matching to add the correct rear port later
# matching[i.id] = rp.id
# found = True
#
# # If at least one rear port is not found in device there is a dependency
# # Better not to sync at all
# if not found:
# mismatch = True
# break
#
# if not mismatch:
# add_to_device = filter(
# lambda i: i in frontports_templates.values_list("id", flat=True),
# map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device")))
# )
#
# # Add selected component to the device and count them
# add_to_device_component = FrontPortTemplate.objects.filter(id__in=add_to_device)
#
# bulk_create = []
# updated = 0
# keys_to_avoid = ["id"]
#
# if not config["compare_description"]:
# keys_to_avoid.append("description")
#
# for i in add_to_device_component.values():
# to_create = False
#
# try:
# # If front port already exists, update and do not recreate
# fp = device.frontports.get(name=i["name"])
# except FrontPort.DoesNotExist:
# fp = FrontPort()
# fp.device = device
# to_create = True
#
# # Copy all fields from template
# for k in i.keys():
# if k not in keys_to_avoid:
# setattr(fp, k, i[k])
# fp.rear_port_id = matching.get(i["id"], None)
#
# if to_create:
# bulk_create.append(fp)
# else:
# fp.save()
# updated += 1
#
# created = len(FrontPort.objects.bulk_create(bulk_create))
#
# # Getting and validating a list of components to rename
# fix_name_components = filter(lambda i: str(i.id) in request.POST.getlist("fix_name"), frontports)
#
# # Casting component templates into Unified objects for proper comparison with component for renaming
# unified_frontports_templates = [
# FrontPortComparison(
# i.id, i.name, i.label, i.description, i.type, i.get_type_display(),
# i.color, i.rear_port_position, is_template=True)
# for i in frontports_templates]
# # Rename selected front ports
# fixed = 0
# for component in fix_name_components:
# unified_frontport = FrontPortComparison(
# component.id, component.name, component.label, component.description, component.type,
# component.get_type_display(), component.color, component.rear_port_position
# )
#
# try:
# # Try to extract a component template with the corresponding name
# corresponding_template = unified_frontports_templates[
# unified_frontports_templates.index(unified_frontport)
# ]
# component.name = corresponding_template.name
# component.save()
# fixed += 1
# except ValueError:
# pass
# else:
# messages.error(request, "Dependency detected, sync rear ports first!")
#
# if created > 0:
# message.append(f"created {created} front ports")
# if updated > 0:
# message.append(f"updated {updated} front ports")
# if deleted > 0:
# message.append(f"deleted {deleted} front ports")
# if fixed > 0:
# message.append(f"fixed {fixed} front ports")
#
# messages.info(request, "; ".join(message).capitalize())
#
# return redirect(request.path)