wgctl/core/lib/importer.py
2026-05-27 16:46:09 +00:00

184 lines
No EOL
6.1 KiB
Python

import os
import json
import sys
import glob
def import_peer(file, data_key, name, clients_dir, meta_dir,
groups_dir, blocks_dir, force):
"""
Import a single peer from an export bundle.
data_key: 'data' for peer export, 'peers' for full backup
Returns: list of imported items as strings
"""
import base64, os
d = json.load(open(file))
if data_key == 'data':
peer = d['data']
else:
# Find peer in full backup peers array
peers = d['data'].get('peers', [])
peer = next((p for p in peers if p['name'] == name), None)
if not peer:
print(f"error: peer '{name}' not found in backup", file=sys.stderr)
sys.exit(1)
imported = []
# conf
conf_path = os.path.join(clients_dir, f"{name}.conf")
if os.path.exists(conf_path) and force != 'true':
print(f"error: peer '{name}' already exists, use --force to overwrite",
file=sys.stderr)
sys.exit(1)
os.makedirs(clients_dir, exist_ok=True)
conf = base64.b64decode(peer['conf']).decode()
open(conf_path, 'w').write(conf)
imported.append('conf')
# meta
meta = peer.get('meta', {})
if meta:
os.makedirs(meta_dir, exist_ok=True)
open(os.path.join(meta_dir, f"{name}.meta"), 'w').write(
json.dumps(meta, indent=2))
imported.append('meta')
# groups
for grp in peer.get('groups', []):
grp_file = os.path.join(groups_dir, f"{grp}.group")
if os.path.exists(grp_file):
try:
g = json.load(open(grp_file))
if name not in g.get('peers', []):
g.setdefault('peers', []).append(name)
open(grp_file, 'w').write(json.dumps(g, indent=2))
imported.append(f"group:{grp}")
except Exception:
pass
# blocks
blocks = peer.get('blocks', {})
if blocks.get('is_blocked') and blocks.get('block_file'):
os.makedirs(blocks_dir, exist_ok=True)
block_data = base64.b64decode(blocks['block_file'])
open(os.path.join(blocks_dir, f"{name}.block"), 'wb').write(block_data)
imported.append('block')
print('\n'.join(imported))
def import_identity(file, name, identities_dir, clients_dir, force):
"""Import an identity from an export bundle."""
import os
d = json.load(open(file))
id_data = d['data'].get('identity', d['data'])
# Check all referenced peers exist
peers = id_data.get('peers', [])
missing = [p for p in peers
if not os.path.exists(os.path.join(clients_dir, f"{p}.conf"))]
if missing:
print(f"error: missing peers: {' '.join(missing)}", file=sys.stderr)
sys.exit(1)
id_file = os.path.join(identities_dir, f"{name}.identity")
if os.path.exists(id_file) and force != 'true':
print(f"error: identity '{name}' already exists, use --force to overwrite",
file=sys.stderr)
sys.exit(1)
os.makedirs(identities_dir, exist_ok=True)
open(id_file, 'w').write(json.dumps(id_data, indent=2))
print('identity')
def import_full(file, clients_dir, meta_dir, rules_dir, identities_dir,
groups_dir, blocks_dir, policies_file, subnets_file,
net_file, hosts_file, force):
"""Import a full backup bundle."""
import base64, os, glob
d = json.load(open(file))
data = d['data']
results = []
# Peers
for peer in data.get('peers', []):
name = peer.get('name', '')
if not name:
continue
try:
conf_path = os.path.join(clients_dir, f"{name}.conf")
if os.path.exists(conf_path) and force != 'true':
results.append(f"skip:{name}")
continue
os.makedirs(clients_dir, exist_ok=True)
conf = base64.b64decode(peer['conf']).decode()
open(conf_path, 'w').write(conf)
meta = peer.get('meta', {})
if meta:
os.makedirs(meta_dir, exist_ok=True)
open(os.path.join(meta_dir, f"{name}.meta"), 'w').write(
json.dumps(meta, indent=2))
blocks = peer.get('blocks', {})
if blocks.get('is_blocked') and blocks.get('block_file'):
os.makedirs(blocks_dir, exist_ok=True)
block_data = base64.b64decode(blocks['block_file'])
open(os.path.join(blocks_dir, f"{name}.block"), 'wb').write(block_data)
results.append(f"peer:{name}")
except Exception as e:
results.append(f"error:{name}:{e}")
# Rules
os.makedirs(rules_dir, exist_ok=True)
for rule in data.get('rules', []):
name = rule.get('name', '')
if name:
open(os.path.join(rules_dir, f"{name}.rule"), 'w').write(
json.dumps(rule, indent=2))
results.append('rules')
# Identities
os.makedirs(identities_dir, exist_ok=True)
for identity in data.get('identities', []):
name = identity.get('name', '')
if name:
open(os.path.join(identities_dir, f"{name}.identity"), 'w').write(
json.dumps(identity, indent=2))
results.append('identities')
# Groups
os.makedirs(groups_dir, exist_ok=True)
for grp in data.get('groups', []):
name = grp.get('name', '')
if name:
open(os.path.join(groups_dir, f"{name}.group"), 'w').write(
json.dumps(grp, indent=2))
results.append('groups')
# Flat JSON files
for key, path in [('policies', policies_file), ('subnets', subnets_file),
('services', net_file), ('hosts', hosts_file)]:
section = data.get(key)
if section is not None:
open(path, 'w').write(json.dumps(section, indent=2))
results.append(key)
print('\n'.join(results))
def import_get_field(file, *keys):
"""Get a field from export JSON. Keys are dot-separated path."""
d = json.load(open(file))
val = d
for k in keys:
val = val.get(k, '')
if not val:
break
print(val if val else '')