Files
lawgit/update_laws.py
Michael Hermann b0c100f312 update_laws.py: Nutze RSS-Summary statt Title für Matching, 129 Gesetze aktualisiert
- RSS entry.summary enthält den Gesetzes-Namen, entry.title nur BGBl-Nr
- Kern-Gesetznamen werden aus Änderungs-Beschreibungen extrahiert
- ETag/Last-Modified Caching für effiziente Updates
- Metadaten werden in laws_metadata.json gespeichert
- last_update.txt auf Oktober 2025 zurückgesetzt für erneuten Test
2026-02-20 20:29:11 +01:00

310 lines
9.5 KiB
Python

import feedparser
import requests
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
import zipfile
import io
import os
import re
import json
import time
import hashlib
from datetime import datetime, timedelta
import difflib
RSS_URL = 'https://www.gesetze-im-internet.de/aktuDienst-rss-feed.xml'
TOC_URL = 'https://www.gesetze-im-internet.de/gii-toc.xml'
OUTPUT_DIR = 'laws'
LAST_UPDATE_FILE = 'last_update.txt'
METADATA_FILE = 'laws_metadata.json'
# Delay zwischen Downloads (Rate-Limiting vermeiden)
DOWNLOAD_DELAY = 0.5
def load_last_update():
"""Letztes Update-Datum laden"""
if os.path.exists(LAST_UPDATE_FILE):
with open(LAST_UPDATE_FILE, 'r') as f:
return datetime.fromisoformat(f.read().strip())
return datetime.now() - timedelta(days=7)
def save_last_update():
"""Aktuelles Datum als letztes Update speichern"""
with open(LAST_UPDATE_FILE, 'w') as f:
f.write(datetime.now().isoformat())
def load_metadata():
"""Gespeicherte Metadaten laden (ETags, Last-Modified, etc.)"""
if os.path.exists(METADATA_FILE):
with open(METADATA_FILE, 'r') as f:
return json.load(f)
return {}
def save_metadata(metadata):
"""Metadaten speichern"""
with open(METADATA_FILE, 'w') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
def get_abbrev_from_url(url):
"""Extrahiert die Gesetzesabkürzung aus der TOC-URL.
z.B. http://www.gesetze-im-internet.de/bgb/xml.zip -> bgb
"""
path = urlparse(url).path
# Pfad ist z.B. /bgb/xml.zip
parts = path.strip('/').split('/')
if parts:
return parts[0]
return None
def extract_core_law_name(summary):
"""Extrahiert den Kern-Gesetznamen aus dem RSS-Summary.
z.B. 'Drittes Gesetz zur Änderung des Gesetzes gegen den unlauteren Wettbewerb vom 12. Februar 2026'
-> 'Gesetzes gegen den unlauteren Wettbewerb'
"""
# Entferne Datum am Ende (z.B. "vom 12. Februar 2026")
text = re.sub(r'\s+vom\s+\d{1,2}\.\s+\w+\s+\d{4}\s*$', '', summary.strip())
# Versuche den Kern-Gesetznamen zu extrahieren
# Pattern: "... Änderung des/der [GESETZNAME]"
patterns = [
r'(?:Änderung|Neufassung|Aufhebung)\s+(?:des|der)\s+(.+?)(?:\s*$)',
r'(?:Änderung|Neufassung|Aufhebung)\s+(.+?)(?:\s*$)',
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip()
return text
def load_toc():
"""TOC (Inhaltsverzeichnis) aller Gesetze laden"""
print("Lade TOC von gesetze-im-internet.de...")
toc_response = requests.get(TOC_URL, timeout=30)
toc_response.raise_for_status()
toc_root = ET.fromstring(toc_response.content)
toc = {}
for item in toc_root.findall('item'):
title = item.find('title').text.strip()
link = item.find('link').text.strip()
abbrev = get_abbrev_from_url(link)
if abbrev:
toc[abbrev] = {
'title': title,
'link': link,
}
print(f" {len(toc)} Gesetze in der TOC gefunden")
return toc
def find_matching_law(summary, toc):
"""Findet das passende Gesetz in der TOC basierend auf dem RSS-Summary.
Gibt (abbrev, toc_entry) zurück oder (None, None).
"""
# 1. Versuch: Direkt den Summary gegen TOC-Titel matchen
best_abbrev = None
best_ratio = 0
for abbrev, entry in toc.items():
ratio = difflib.SequenceMatcher(None, entry['title'].lower(), summary.lower()).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_abbrev = abbrev
if best_ratio >= 0.8:
return best_abbrev, toc[best_abbrev]
# 2. Versuch: Kern-Gesetznamen extrahieren und matchen
core_name = extract_core_law_name(summary)
if core_name != summary:
for abbrev, entry in toc.items():
ratio = difflib.SequenceMatcher(None, entry['title'].lower(), core_name.lower()).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_abbrev = abbrev
if best_ratio >= 0.7:
return best_abbrev, toc[best_abbrev]
return None, None
def download_and_save_law(abbrev, link, change_date, metadata):
"""Lädt ein Gesetz herunter und speichert es, wenn es sich geändert hat.
Gibt True zurück wenn das Gesetz aktualisiert wurde.
"""
try:
# Prüfe ob sich die Datei geändert hat (via ETag/Last-Modified)
headers = {}
if abbrev in metadata:
if 'etag' in metadata[abbrev]:
headers['If-None-Match'] = metadata[abbrev]['etag']
if 'last_modified' in metadata[abbrev]:
headers['If-Modified-Since'] = metadata[abbrev]['last_modified']
response = requests.get(link, headers=headers, timeout=60)
if response.status_code == 304:
# Nicht geändert
return False
if response.status_code != 200:
print(f" WARNUNG: HTTP {response.status_code} für {abbrev}")
return False
# Metadaten aktualisieren
metadata[abbrev] = {
'etag': response.headers.get('ETag', ''),
'last_modified': response.headers.get('Last-Modified', ''),
'last_checked': datetime.now().isoformat(),
}
# ZIP entpacken
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
xml_files = [f for f in z.namelist() if f.endswith('.xml')]
if not xml_files:
print(f" WARNUNG: Keine XML-Datei in {abbrev}/xml.zip")
return False
xml_content = z.read(xml_files[0])
# XML parsen und Stand prüfen
xml_root = ET.fromstring(xml_content)
content_hash = hashlib.md5(xml_content).hexdigest()[:8]
# Stand extrahieren
meta = xml_root.find('.//metadaten')
stand_comment = ''
if meta is not None:
stand_elem = meta.find('standangabe')
if stand_elem is not None:
comment_elem = stand_elem.find('standkommentar')
if comment_elem is not None:
stand_comment = ET.tostring(comment_elem, encoding='unicode', method='text').strip()
stand_hash = hashlib.md5(stand_comment.encode()).hexdigest()
# Lokalen Stand prüfen
law_dir = os.path.join(OUTPUT_DIR, abbrev)
os.makedirs(law_dir, exist_ok=True)
stand_file = os.path.join(law_dir, 'last_stand.txt')
old_stand_hash = ''
if os.path.exists(stand_file):
with open(stand_file, 'r') as f:
old_stand_hash = f.read().strip()
if stand_hash == old_stand_hash:
return False
# Speichern
file_path = os.path.join(law_dir, f"{abbrev}_{change_date}_{content_hash}.xml")
with open(file_path, 'wb') as f:
f.write(xml_content)
with open(stand_file, 'w') as f:
f.write(stand_hash)
return True
except Exception as e:
print(f" FEHLER bei {abbrev}: {e}")
return False
def main():
last_update = load_last_update()
metadata = load_metadata()
print(f"Letztes Update: {last_update.strftime('%Y-%m-%d %H:%M')}")
print()
# 1. RSS-Feed laden
print("Lade RSS-Feed...")
feed = feedparser.parse(RSS_URL)
new_entries = []
for entry in feed.entries:
pub_date = datetime(*entry.published_parsed[:6])
if pub_date > last_update:
summary = entry.get('summary', entry.title).strip()
new_entries.append({
'title': entry.title.strip(),
'summary': summary,
'date': pub_date.strftime('%Y-%m-%d'),
'link': entry.get('link', ''),
})
print(f" {len(new_entries)} neue Einträge seit letztem Update")
if not new_entries:
print("Keine neuen Änderungen.")
save_last_update()
save_metadata(metadata)
exit(0)
# 2. TOC laden
toc = load_toc()
print()
# 3. Gesetze matchen und herunterladen
updated_count = 0
skipped_count = 0
not_found_count = 0
seen_abbrevs = set() # Duplikate vermeiden
for i, entry in enumerate(new_entries):
summary = entry['summary']
change_date = entry['date']
# Gesetz in TOC finden
abbrev, toc_entry = find_matching_law(summary, toc)
if abbrev is None:
print(f" [{i+1}/{len(new_entries)}] NICHT GEFUNDEN: {summary[:70]}")
not_found_count += 1
continue
if abbrev in seen_abbrevs:
# Schon in diesem Run verarbeitet
continue
seen_abbrevs.add(abbrev)
print(f" [{i+1}/{len(new_entries)}] {abbrev}: {toc_entry['title'][:60]}")
# Herunterladen und speichern
updated = download_and_save_law(abbrev, toc_entry['link'], change_date, metadata)
if updated:
print(f" -> AKTUALISIERT")
updated_count += 1
else:
print(f" -> unverändert")
skipped_count += 1
time.sleep(DOWNLOAD_DELAY)
# 4. Ergebnis speichern
save_last_update()
save_metadata(metadata)
print()
print("=" * 50)
print("ZUSAMMENFASSUNG")
print("=" * 50)
print(f" Neue RSS-Einträge: {len(new_entries)}")
print(f" Aktualisiert: {updated_count}")
print(f" Unverändert: {skipped_count}")
print(f" Nicht gefunden: {not_found_count}")
if updated_count > 0:
print(f"\n{updated_count} Gesetze aktualisiert.")
else:
print("\nKeine Gesetze aktualisiert.")
if __name__ == "__main__":
main()