Comparison classes in different files. Inerithance with a parent comparison class. Check all fields in poweroutlet object for sync. Added update strategy if an object already exists.

This commit is contained in:
rizlas 2021-12-28 15:43:30 +01:00
parent 2f104442c3
commit b6bdbf9028
3 changed files with 141 additions and 70 deletions

View File

@ -0,0 +1,79 @@
from dataclasses import dataclass
@dataclass(frozen=True)
class ParentComparison:
id: int
name: str
label: str
description: str
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return self.name.lower().replace(" ", "") == other.name.lower().replace(" ", "")
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash(self.name.lower().replace(" ", ""))
@dataclass(frozen=True)
class ParentTypedComparison(ParentComparison):
type: str
type_display: str
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return (
self.name.lower().replace(" ", "") == other.name.lower().replace(" ", "")
) and (self.type == other.type)
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash((self.name.lower().replace(" ", ""), self.type))
@dataclass(frozen=True)
class UnifiedInterface:
"""A unified way to represent the interface and interface template"""
id: int
name: str
type: str = ""
type_display: str = ""
is_template: bool = False
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return (
self.name.lower().replace(" ", "") == other.name.lower().replace(" ", "")
) and (self.type == other.type)
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash((self.name.lower().replace(" ", ""), self.type))
@dataclass(frozen=True)
class ComparisonPowerOutlet(ParentTypedComparison):
power_port_name: str = ""
feed_leg: str = ""
is_template: bool = False
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return (
(self.name.lower().replace(" ", "") == other.name.lower().replace(" ", ""))
and (self.label == other.label)
and (self.type == other.type)
and (self.power_port_name == other.power_port_name)
and (self.feed_leg == other.feed_leg)
and (self.description == other.description)
)
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash(
(self.name.lower().replace(" ", ""), self.type, self.power_port_name)
)

View File

@ -1,8 +1,8 @@
import re, copy import re
from typing import Iterable from typing import Iterable
from dataclasses import dataclass
from django.shortcuts import render, redirect from django.shortcuts import render, redirect
from django.contrib import messages from django.contrib import messages
from .comparison import UnifiedInterface, ComparisonPowerOutlet
def split(s): def split(s):
for x, y in re.findall(r'(\d*)(\D*)', s): for x, y in re.findall(r'(\d*)(\D*)', s):
@ -126,34 +126,3 @@ def post_components(request, device, components, component_templates, ObjectType
messages.success(request, "; ".join(message).capitalize()) messages.success(request, "; ".join(message).capitalize())
return redirect(request.path) return redirect(request.path)
@dataclass(frozen=True)
class UnifiedInterface:
"""A unified way to represent the interface and interface template"""
id: int
name: str
type: str = ""
type_display: str = ""
is_template: bool = False
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return (self.name.lower().replace(' ', '') == other.name.lower().replace(' ', '')) and (self.type == other.type)
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash((self.name.lower().replace(' ', ''), self.type))
@dataclass(frozen=True)
class ComparisonPowerOutlet(UnifiedInterface):
power_port_name: str = ""
def __eq__(self, other):
# Ignore some fields when comparing; ignore interface name case and whitespaces
return (self.name.lower().replace(' ', '') == other.name.lower().replace(' ', '')) and (self.type == other.type) and (self.power_port_name == other.power_port_name)
def __hash__(self):
# Ignore some fields when hashing; ignore interface name case and whitespaces
return hash((self.name.lower().replace(' ', ''), self.type, self.power_port_name))

View File

@ -7,7 +7,8 @@ from django.contrib.auth.mixins import LoginRequiredMixin, PermissionRequiredMix
from django.conf import settings from django.conf import settings
from django.contrib import messages from django.contrib import messages
from .utils import ComparisonPowerOutlet, UnifiedInterface, natural_keys, get_components, post_components from .utils import natural_keys, get_components, post_components
from .comparison import ComparisonPowerOutlet, UnifiedInterface
from .forms import InterfaceComparisonForm from .forms import InterfaceComparisonForm
config = settings.PLUGINS_CONFIG['netbox_interface_sync'] config = settings.PLUGINS_CONFIG['netbox_interface_sync']
@ -113,9 +114,10 @@ class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, Vie
poweroutlets = device.poweroutlets.all() poweroutlets = device.poweroutlets.all()
poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type) poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type)
unified_components = [ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets]
unified_components = [ComparisonPowerOutlet(i.id, i.name, i.label, i.description, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "", feed_leg=i.feed_leg) for i in poweroutlets]
unified_component_templates = [ unified_component_templates = [
ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), is_template=True, power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets_templates] ComparisonPowerOutlet(i.id, i.name, i.label, i.description, i.type, i.get_type_display(), power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "", feed_leg=i.feed_leg, is_template=True) for i in poweroutlets_templates]
# List of interfaces and interface templates presented in the unified format # List of interfaces and interface templates presented in the unified format
overall_powers = list(set(unified_component_templates + unified_components)) overall_powers = list(set(unified_component_templates + unified_components))
@ -153,7 +155,21 @@ class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, Vie
poweroutlets = device.poweroutlets.all() poweroutlets = device.poweroutlets.all()
poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type) poweroutlets_templates = PowerOutletTemplate.objects.filter(device_type=device.device_type)
#se il template ha una power port che non ho nel device fisico stop # Generating result message
message = []
created = 0
updated = 0
fixed = 0
remove_from_device = filter(
lambda i: i in poweroutlets.values_list("id", flat=True),
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device")))
)
# Remove selected interfaces from the device and count them
deleted = PowerOutlet.objects.filter(id__in=remove_from_device).delete()[0]
# Get device power ports to check dependency between power outlets
device_pp = PowerPort.objects.filter(device_id=device.id) device_pp = PowerPort.objects.filter(device_id=device.id)
matching = {} matching = {}
@ -164,44 +180,52 @@ class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, Vie
ppt = PowerPortTemplate.objects.get(id=i.power_port_id) ppt = PowerPortTemplate.objects.get(id=i.power_port_id)
for pp in device_pp: for pp in device_pp:
if pp.name == ppt.name: if pp.name == ppt.name:
# Save matching to add the correct power port later
matching[i.id] = pp.id matching[i.id] = pp.id
found = True found = True
# If at least on power port is found there is a dependency
# Better not to sync at all
if not found: if not found:
mismatch = True mismatch = True
break break
if not mismatch: if not mismatch:
# Manually validating interfaces and interface templates lists # Manually validating interfaces and interface templates lists
with open("/tmp/ciccio.log", "w") as f:
f.write(str(request.POST.getlist("add_to_device")))
add_to_device = filter( add_to_device = filter(
lambda i: i in poweroutlets_templates.values_list("id", flat=True), lambda i: i in poweroutlets_templates.values_list("id", flat=True),
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device"))) map(int, filter(lambda x: x.isdigit(), request.POST.getlist("add_to_device")))
) )
remove_from_device = filter(
lambda i: i in poweroutlets.values_list("id", flat=True),
map(int, filter(lambda x: x.isdigit(), request.POST.getlist("remove_from_device")))
)
# Remove selected interfaces from the device and count them
deleted = PowerOutlet.objects.filter(id__in=remove_from_device).delete()[0]
# Add selected interfaces to the device and count them # Add selected interfaces to the device and count them
add_to_device_component = PowerOutletTemplate.objects.filter(id__in=add_to_device) add_to_device_component = PowerOutletTemplate.objects.filter(id__in=add_to_device)
bulk_create = [] bulk_create = []
updated = 0
keys_to_avoid = ["id"] keys_to_avoid = ["id"]
for i in add_to_device_component.values(): for i in add_to_device_component.values():
tmp = PowerOutlet() to_create = False
tmp.device = device
try:
# If power outlets already exists, update and do not recreate
po = device.poweroutlets.get(name=i["name"])
except PowerOutlet.DoesNotExist:
po = PowerOutlet()
po.device = device
to_create = True
# Copy all fields from template
for k in i.keys(): for k in i.keys():
if k not in keys_to_avoid: if k not in keys_to_avoid:
setattr(tmp, k, i[k]) setattr(po, k, i[k])
tmp.power_port_id = matching.get(i["id"], None) po.power_port_id = matching.get(i["id"], None)
bulk_create.append(tmp)
if to_create:
bulk_create.append(po)
else:
po.save()
updated += 1
created = len(PowerOutlet.objects.bulk_create(bulk_create)) created = len(PowerOutlet.objects.bulk_create(bulk_create))
@ -210,13 +234,12 @@ class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, Vie
# Casting interface templates into UnifiedInterface objects for proper comparison with interfaces for renaming # Casting interface templates into UnifiedInterface objects for proper comparison with interfaces for renaming
unified_component_templates = [ unified_component_templates = [
ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), is_template=True, power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets_templates] ComparisonPowerOutlet(i.id, i.name, i.label, i.description, i.type, i.get_type_display(), power_port_name=PowerPortTemplate.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "", feed_leg=i.feed_leg, is_template=True) for i in poweroutlets_templates]
# Rename selected interfaces # Rename selected interfaces
fixed = 0 fixed = 0
for component in fix_name_components: for component in fix_name_components:
unified_component = [ComparisonPowerOutlet(i.id, i.name, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "") for i in poweroutlets] unified_component = [ComparisonPowerOutlet(i.id, i.name, i.label, i.description, i.type, i.get_type_display(), power_port_name=PowerPort.objects.get(id=i.power_port_id).name if i.power_port_id is not None else "", feed_leg=i.feed_leg) for i in poweroutlets]
try: try:
# Try to extract an interface template with the corresponding name # Try to extract an interface template with the corresponding name
@ -226,21 +249,21 @@ class PowerOutletComparisonView(LoginRequiredMixin, PermissionRequiredMixin, Vie
fixed += 1 fixed += 1
except ValueError: except ValueError:
pass pass
# Generating result message
message = []
if created > 0:
message.append(f"created {created} interfaces")
if deleted > 0:
message.append(f"deleted {deleted} interfaces")
if fixed > 0:
message.append(f"fixed {fixed} interfaces")
messages.success(request, "; ".join(message).capitalize())
return redirect(request.path)
else: else:
messages.error(request, "Fai prima le power ports") message.append("Dependecy detected, sync power ports first!")
return redirect(request.path)
if created > 0:
message.append(f"created {created} interfaces")
if updated > 0:
message.append(f"updated {updated} interfaces")
if deleted > 0:
message.append(f"deleted {deleted} interfaces")
if fixed > 0:
message.append(f"fixed {fixed} interfaces")
messages.info(request, "; ".join(message).capitalize())
return redirect(request.path)
class FrontPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View): class FrontPortComparisonView(LoginRequiredMixin, PermissionRequiredMixin, View):
"""Comparison of interfaces between a device and a device type and beautiful visualization""" """Comparison of interfaces between a device and a device type and beautiful visualization"""