⚡
FLASH SALE
— 30% OFF Yearly Network Automation Memberships! Use code
SALE30
at checkout. Offer ends August 20th.
➜ Sign up here
Training
Bootcamp
Courses
Tech Sessions
Resources
Blog
Tips
GitHub
YouTube
Newsletter
Playgrounds
Pricing
Sign In
Become Member
TTP Parser
Validate and test the parsing of your TTP templates
Tools
J2 Renderer
TTP Parser
TextFSM Parser
XPath Tester
Members Only
JMESPath Validator
JSON Schema Validator
Data Format Converter
TTP Template
"""Source Adapter where we translate data from FortiManager Data model into Nautobots DM.""" from ipaddress import ip_network from os import getenv from pathlib import Path from typing import Any, override from diffsync import Adapter from nautobot_ssot_firewall_policies.diffsync.models.base import ( AddressObjectDiffSyncModel, AddressObjectGroupDiffSyncModel, FQDNDiffSyncModel, IPAddressDiffSyncModel, IPRangeDiffSyncModel, PolicyRuleDiffSyncModel, ServiceDiffSyncModel, ServiceObjectGroupDiffSyncModel, ZoneDiffSyncModel, ) _FW_DATA_JSON_PATH: str | None = getenv("FW_DATA_JSON_PATH") FORTIMANAGER_NAUTOBOT_NAMESPACE: str | None = getenv("FORTIMANAGER_NAUTOBOT_NAMESPACE") if _FW_DATA_JSON_PATH is None: raise ValueError("Environment Variable 'FW_DATA_JSON_PATH' is not set") if FORTIMANAGER_NAUTOBOT_NAMESPACE is None: raise ValueError("Environment Variable 'FORTIMANAGER_NAUTOBOT_NAMESPACE' is not set") FW_DATA_JSON_PATH: Path = Path(_FW_DATA_JSON_PATH) if not FW_DATA_JSON_PATH.exists(): raise FileNotFoundError(f"FW_DATA_JSON_PATH: '{_FW_DATA_JSON_PATH}' does not exists") if not FW_DATA_JSON_PATH.is_file(): raise ValueError(f"FW_DATA_JSON_PATH: '{_FW_DATA_JSON_PATH}' is not a file") class FortiManagerBaseAdapter(Adapter): def __init__( self, url: str | None = None, username: str | None = None, password: str | None = None, use_cache=False, *args, job=None, sync=None, client=None, **kwargs, ): super().__init__(*args, **kwargs) self.url = url self.username = username self.password = password self.use_cache = use_cache self.job = job self.sync = sync self.json_file = FW_DATA_JSON_PATH class FortiManagerFWRulesAdapter(FortiManagerBaseAdapter): address_object = AddressObjectDiffSyncModel address_object_group = AddressObjectGroupDiffSyncModel fqdn = FQDNDiffSyncModel ip_address = IPAddressDiffSyncModel ip_range = IPRangeDiffSyncModel policy_rule = PolicyRuleDiffSyncModel service = ServiceDiffSyncModel service_object_group = ServiceObjectGroupDiffSyncModel zone = ZoneDiffSyncModel top_level = [ "ip_address", "fqdn", "ip_range", "zone", "service", "service_object_group", "address_object", "address_object_group", "policy_rule", ] def read_json_file(self, path: str): """Read the JSON file and return its content.""" import json with open(path, "r") as file: data = json.load(file) return data def load_policy_rule(self, data: dict[str, dict[str, Any]]): for param in data.values(): if param.get("type") != "policy_rule": continue policy_rule = self.policy_rule( name=param.get("name"), status__name="Active", description=param.get("description"), action=param.get("policy_action"), source_zone__name=param.get("source_zone"), ) policy_rule.tags.append({"name": "Fortimanager"}) # Source fields source_addresses = param.get("source_addresses", []) source_addresses.sort() for source_address in source_addresses: policy_rule.source_addresses.append({"name": source_address}) source_address_groups = param.get("source_address_groups", []) source_address_groups.sort() for source_address_group in source_address_groups: policy_rule.source_address_groups.append({"name": source_address_group}) source_services = param.get("source_services", []) source_services.sort() for source_service in source_services: policy_rule.source_services.append({"name": source_service}) source_service_groups = param.get("source_service_groups", []) source_service_groups.sort() for source_service_group in source_service_groups: policy_rule.source_service_groups.append({"name": source_service_group}) self.add(policy_rule) def load_address_object(self, data: dict[str, dict[str, Any]]): for param in data.values(): if param.get("type") != "address_object": continue address_object = self.address_object( name=param.get("name"), status__name="Active", description=param.get("description"), ) if param.get("fqdn"): address_object.fqdn__name = param.get("fqdn") if param.get("ip_address"): ip_object = ip_network(param["ip_address"], strict=False) address_object.ip_address__host = str(ip_object.network_address) address_object.ip_address__mask_length = int(ip_object.prefixlen) if param.get("ip_range"): address_object.ip_range__start_address = param.get("ip_range")["start_range"] address_object.ip_range__end_address = param.get("ip_range")["end_range"] address_object.tags.append({"name": "Fortimanager"}) self.add(address_object) def load_address_object_group(self, data: dict[str, dict[str, Any]]): for param in data.values(): if param.get("type") != "address_object_group": continue address_object_group = self.address_object_group( name=param.get("name"), status__name="Active", description=param.get("description"), ) address_object_group.tags.append({"name": "Fortimanager"}) members = param.get("members", []) members.sort() for member in members: address_object_group.address_objects.append({"name": member}) self.add(address_object_group) def load_zone(self, data: dict[str, dict[str, Any]]): """ Processes the raw JSON data and extracts only the zones """ for _, param in data.items(): if param.get("type") != "zone": continue zone = self.zone( name=param.get("zone"), description=param.get("description"), status__name="Active" ) self.add(zone) def load_service(self, data: dict[str, dict[str, Any]]): """Extracts only the services and adds them from the raw data""" for _, param in data.items(): if param.get("type") != "service": continue service = self.service( ip_protocol=param.get("ip_protocol"), name=param.get("service"), description=param.get("description"), status__name="Active", ) if param.get("port"): service.port = param.get("port") service.tags.append({"name": "Fortimanager"}) self.add(service) def load_fqdn(self, data: dict[str, dict[str, Any]]): """ Processes the raw JSON data and extracts only the fqdn """ for _, param in data.items(): if param.get("type") != "fqdn": continue fqdn = self.fqdn( name=param.get("fqdn"), description=param.get("name"), status__name="Active", ) self.add(fqdn) def load_ip_range(self, data: dict[str, dict[str, Any]]): for _, param in data.items(): if param.get("type") != "ip_range": continue ip_range = self.ip_range( start_address=param["ip_range"]["start_range"], end_address=param["ip_range"]["end_range"], status__name="Active", description=param.get("name"), ) self.add(ip_range) def load_ip(self, ip_addresses: dict[str, dict[str, Any]]): """ Processes the raw JSON data, extracts only IP addresses, and creates IPAddressDiffSyncModel instances. """ for param in ip_addresses.values(): if param.get("type") != "ip_address": continue # Check if the address is an IP Addresss if not param.get("ip_address"): self.job.logger.debug("SKIPPED: Entry processed is not an IP. Entry %s", param) else: ip_object = ip_network(param["ip_address"], strict=False) ip_add: str = str(ip_object.network_address) ip_mask: int = ip_object.prefixlen ip = self.ip_address( host=ip_add, mask_length=ip_mask, description=param.get("description") or "N/A", parent__namespace__name=FORTIMANAGER_NAUTOBOT_NAMESPACE, status__name="Active", ) self.add(ip) def load_service_object_group(self, data: dict[str, dict[str, Any]]): for _, param in data.items(): if param.get("type") != "service_group": continue service_grp = self.service_object_group( name=param.get("name"), description=param.get("description"), status__name="Active" ) service_grp.tags.append({"name": "Fortimanager"}) members = param.get("members", []) members.sort() for member in members: service_grp.service_objects.append({"name": member}) self.add(service_grp) @override def load(self): json_data = self.read_json_file(path=self.json_file) if not json_data: self.job.logger.error("No data found in the JSON file.") return self.job.logger.debug("Source Data from JSON: %s", json_data) self.load_ip(ip_addresses=json_data) self.load_fqdn(data=json_data) self.load_ip_range(data=json_data) self.load_zone(data=json_data) self.load_service(data=json_data) self.load_address_object(data=json_data) self.load_address_object_group(data=json_data) self.load_service_object_group(data=json_data) self.load_policy_rule(data=json_data) self.job.logger.debug("Use cache: %s", self.use_cache) self.job.logger.debug("Reading data from JSON File at: %s", self.json_file)
0
/ 20000
Raw Text
{ "address1": { "name": "cloudflare-fqdn", "type": "fqdn", "fqdn": "www.cloudflare.com" }, "address2": { "name": "Cloudflare-IP", "type": "ip_address", "ip_address": "104.16.123.2/32", "description": "FortiManager - Cloudflare IP1" }, "address3": { "name": "LAN", "type": "ip_range", "ip_range": { "start_range": "10.1.1.1", "end_range": "10.1.1.254" } }, "address4": { "name": "All-Networks", "prefix": "0.0.0.0/0" }, "address5": { "name": "Cloudflare-IP2", "type": "ip_address", "ip_address": "104.16.123.5/32", "description": "FortiManager - Cloudflare IP2" }, "address6": { "name": "Cloudflare-IP3", "type": "ip_address", "ip_address": "104.16.123.119/32" }, "zone1": { "zone": "Outside", "description": "Internet Zone", "type": "zone" }, "zone2": { "zone": "DMZ", "description": "Web Lan", "type": "zone" }, "zone3": { "zone": "Inside", "description": "Internal Network", "type": "zone" }, "service1": { "service": "MYSQL", "ip_protocol": "TCP", "port": "3306", "description": "MySQL Port", "type": "service" }, "service2": { "service": "POSTGRES", "ip_protocol": "TCP", "port": "5532", "description": "Postgres Port", "type": "service" }, "service_obj_grp1": { "type": "service_group", "name": "DB-PORTS", "members": [ "MYSQL", "POSTGRES" ], "description": "Postgres Port" }, "add-obj1": { "type": "address_object", "name": "Cloudflare-FQDN1", "fqdn": "www.cloudflare.com", "description": "Cloudflare Address Object" }, "add-obj2": { "type": "address_object", "name": "Cloudflare-IP1", "ip_address": "104.16.123.119/32", "description": "Cloudflare-IP3" }, "add-obj3": { "name": "LAN", "type": "address_object", "description": "LAN Zone IP Range", "ip_range": { "start_range": "10.1.1.1", "end_range": "10.1.1.254" } }, "add-obj-grp1": { "name": "Branches", "type": "address_object_group", "description": "Address Object Group for Branches", "members": [ "LAN" ] }, "add-obj-grp2": { "name": "Cloudflare IPs", "type": "address_object_group", "description": "Cloudflare IPs", "members": [ "Cloudflare-FQDN1", "Cloudflare-IP1" ] }, "fw-policy1": { "name": "Branches to the Internet", "type": "policy_rule", "description": "Branches to the Internet", "policy_action": "deny", "source_addresses": [ "LAN" ], "source_address_groups" :[ "Branches" ], "source_zone": "Inside", "source_services": [ "MYSQL", "POSTGRES" ], "source_service_groups": [ "DB-PORTS" ], "destination_addresses": [ "Cloudflare-IP1" ] } }
0
/ 20000
Result
[ {} ]
Parse
Share
Auto-Sync