- RSS entry.summary enthält den Gesetzes-Namen, entry.title nur BGBl-Nr - Kern-Gesetznamen werden aus Änderungs-Beschreibungen extrahiert - ETag/Last-Modified Caching für effiziente Updates - Metadaten werden in laws_metadata.json gespeichert - last_update.txt auf Oktober 2025 zurückgesetzt für erneuten Test
310 lines
9.5 KiB
Python
310 lines
9.5 KiB
Python
import feedparser
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
from urllib.parse import urlparse
|
|
import zipfile
|
|
import io
|
|
import os
|
|
import re
|
|
import json
|
|
import time
|
|
import hashlib
|
|
from datetime import datetime, timedelta
|
|
import difflib
|
|
|
|
RSS_URL = 'https://www.gesetze-im-internet.de/aktuDienst-rss-feed.xml'
|
|
TOC_URL = 'https://www.gesetze-im-internet.de/gii-toc.xml'
|
|
OUTPUT_DIR = 'laws'
|
|
LAST_UPDATE_FILE = 'last_update.txt'
|
|
METADATA_FILE = 'laws_metadata.json'
|
|
|
|
# Delay zwischen Downloads (Rate-Limiting vermeiden)
|
|
DOWNLOAD_DELAY = 0.5
|
|
|
|
|
|
def load_last_update():
|
|
"""Letztes Update-Datum laden"""
|
|
if os.path.exists(LAST_UPDATE_FILE):
|
|
with open(LAST_UPDATE_FILE, 'r') as f:
|
|
return datetime.fromisoformat(f.read().strip())
|
|
return datetime.now() - timedelta(days=7)
|
|
|
|
|
|
def save_last_update():
|
|
"""Aktuelles Datum als letztes Update speichern"""
|
|
with open(LAST_UPDATE_FILE, 'w') as f:
|
|
f.write(datetime.now().isoformat())
|
|
|
|
|
|
def load_metadata():
|
|
"""Gespeicherte Metadaten laden (ETags, Last-Modified, etc.)"""
|
|
if os.path.exists(METADATA_FILE):
|
|
with open(METADATA_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def save_metadata(metadata):
|
|
"""Metadaten speichern"""
|
|
with open(METADATA_FILE, 'w') as f:
|
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def get_abbrev_from_url(url):
|
|
"""Extrahiert die Gesetzesabkürzung aus der TOC-URL.
|
|
z.B. http://www.gesetze-im-internet.de/bgb/xml.zip -> bgb
|
|
"""
|
|
path = urlparse(url).path
|
|
# Pfad ist z.B. /bgb/xml.zip
|
|
parts = path.strip('/').split('/')
|
|
if parts:
|
|
return parts[0]
|
|
return None
|
|
|
|
|
|
def extract_core_law_name(summary):
|
|
"""Extrahiert den Kern-Gesetznamen aus dem RSS-Summary.
|
|
z.B. 'Drittes Gesetz zur Änderung des Gesetzes gegen den unlauteren Wettbewerb vom 12. Februar 2026'
|
|
-> 'Gesetzes gegen den unlauteren Wettbewerb'
|
|
"""
|
|
# Entferne Datum am Ende (z.B. "vom 12. Februar 2026")
|
|
text = re.sub(r'\s+vom\s+\d{1,2}\.\s+\w+\s+\d{4}\s*$', '', summary.strip())
|
|
|
|
# Versuche den Kern-Gesetznamen zu extrahieren
|
|
# Pattern: "... Änderung des/der [GESETZNAME]"
|
|
patterns = [
|
|
r'(?:Änderung|Neufassung|Aufhebung)\s+(?:des|der)\s+(.+?)(?:\s*$)',
|
|
r'(?:Änderung|Neufassung|Aufhebung)\s+(.+?)(?:\s*$)',
|
|
]
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
return match.group(1).strip()
|
|
|
|
return text
|
|
|
|
|
|
def load_toc():
|
|
"""TOC (Inhaltsverzeichnis) aller Gesetze laden"""
|
|
print("Lade TOC von gesetze-im-internet.de...")
|
|
toc_response = requests.get(TOC_URL, timeout=30)
|
|
toc_response.raise_for_status()
|
|
toc_root = ET.fromstring(toc_response.content)
|
|
|
|
toc = {}
|
|
for item in toc_root.findall('item'):
|
|
title = item.find('title').text.strip()
|
|
link = item.find('link').text.strip()
|
|
abbrev = get_abbrev_from_url(link)
|
|
if abbrev:
|
|
toc[abbrev] = {
|
|
'title': title,
|
|
'link': link,
|
|
}
|
|
print(f" {len(toc)} Gesetze in der TOC gefunden")
|
|
return toc
|
|
|
|
|
|
def find_matching_law(summary, toc):
|
|
"""Findet das passende Gesetz in der TOC basierend auf dem RSS-Summary.
|
|
Gibt (abbrev, toc_entry) zurück oder (None, None).
|
|
"""
|
|
# 1. Versuch: Direkt den Summary gegen TOC-Titel matchen
|
|
best_abbrev = None
|
|
best_ratio = 0
|
|
for abbrev, entry in toc.items():
|
|
ratio = difflib.SequenceMatcher(None, entry['title'].lower(), summary.lower()).ratio()
|
|
if ratio > best_ratio:
|
|
best_ratio = ratio
|
|
best_abbrev = abbrev
|
|
|
|
if best_ratio >= 0.8:
|
|
return best_abbrev, toc[best_abbrev]
|
|
|
|
# 2. Versuch: Kern-Gesetznamen extrahieren und matchen
|
|
core_name = extract_core_law_name(summary)
|
|
if core_name != summary:
|
|
for abbrev, entry in toc.items():
|
|
ratio = difflib.SequenceMatcher(None, entry['title'].lower(), core_name.lower()).ratio()
|
|
if ratio > best_ratio:
|
|
best_ratio = ratio
|
|
best_abbrev = abbrev
|
|
|
|
if best_ratio >= 0.7:
|
|
return best_abbrev, toc[best_abbrev]
|
|
|
|
return None, None
|
|
|
|
|
|
def download_and_save_law(abbrev, link, change_date, metadata):
|
|
"""Lädt ein Gesetz herunter und speichert es, wenn es sich geändert hat.
|
|
Gibt True zurück wenn das Gesetz aktualisiert wurde.
|
|
"""
|
|
try:
|
|
# Prüfe ob sich die Datei geändert hat (via ETag/Last-Modified)
|
|
headers = {}
|
|
if abbrev in metadata:
|
|
if 'etag' in metadata[abbrev]:
|
|
headers['If-None-Match'] = metadata[abbrev]['etag']
|
|
if 'last_modified' in metadata[abbrev]:
|
|
headers['If-Modified-Since'] = metadata[abbrev]['last_modified']
|
|
|
|
response = requests.get(link, headers=headers, timeout=60)
|
|
|
|
if response.status_code == 304:
|
|
# Nicht geändert
|
|
return False
|
|
|
|
if response.status_code != 200:
|
|
print(f" WARNUNG: HTTP {response.status_code} für {abbrev}")
|
|
return False
|
|
|
|
# Metadaten aktualisieren
|
|
metadata[abbrev] = {
|
|
'etag': response.headers.get('ETag', ''),
|
|
'last_modified': response.headers.get('Last-Modified', ''),
|
|
'last_checked': datetime.now().isoformat(),
|
|
}
|
|
|
|
# ZIP entpacken
|
|
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
|
|
xml_files = [f for f in z.namelist() if f.endswith('.xml')]
|
|
if not xml_files:
|
|
print(f" WARNUNG: Keine XML-Datei in {abbrev}/xml.zip")
|
|
return False
|
|
|
|
xml_content = z.read(xml_files[0])
|
|
|
|
# XML parsen und Stand prüfen
|
|
xml_root = ET.fromstring(xml_content)
|
|
content_hash = hashlib.md5(xml_content).hexdigest()[:8]
|
|
|
|
# Stand extrahieren
|
|
meta = xml_root.find('.//metadaten')
|
|
stand_comment = ''
|
|
if meta is not None:
|
|
stand_elem = meta.find('standangabe')
|
|
if stand_elem is not None:
|
|
comment_elem = stand_elem.find('standkommentar')
|
|
if comment_elem is not None:
|
|
stand_comment = ET.tostring(comment_elem, encoding='unicode', method='text').strip()
|
|
stand_hash = hashlib.md5(stand_comment.encode()).hexdigest()
|
|
|
|
# Lokalen Stand prüfen
|
|
law_dir = os.path.join(OUTPUT_DIR, abbrev)
|
|
os.makedirs(law_dir, exist_ok=True)
|
|
stand_file = os.path.join(law_dir, 'last_stand.txt')
|
|
|
|
old_stand_hash = ''
|
|
if os.path.exists(stand_file):
|
|
with open(stand_file, 'r') as f:
|
|
old_stand_hash = f.read().strip()
|
|
|
|
if stand_hash == old_stand_hash:
|
|
return False
|
|
|
|
# Speichern
|
|
file_path = os.path.join(law_dir, f"{abbrev}_{change_date}_{content_hash}.xml")
|
|
with open(file_path, 'wb') as f:
|
|
f.write(xml_content)
|
|
with open(stand_file, 'w') as f:
|
|
f.write(stand_hash)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" FEHLER bei {abbrev}: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
last_update = load_last_update()
|
|
metadata = load_metadata()
|
|
|
|
print(f"Letztes Update: {last_update.strftime('%Y-%m-%d %H:%M')}")
|
|
print()
|
|
|
|
# 1. RSS-Feed laden
|
|
print("Lade RSS-Feed...")
|
|
feed = feedparser.parse(RSS_URL)
|
|
new_entries = []
|
|
for entry in feed.entries:
|
|
pub_date = datetime(*entry.published_parsed[:6])
|
|
if pub_date > last_update:
|
|
summary = entry.get('summary', entry.title).strip()
|
|
new_entries.append({
|
|
'title': entry.title.strip(),
|
|
'summary': summary,
|
|
'date': pub_date.strftime('%Y-%m-%d'),
|
|
'link': entry.get('link', ''),
|
|
})
|
|
|
|
print(f" {len(new_entries)} neue Einträge seit letztem Update")
|
|
if not new_entries:
|
|
print("Keine neuen Änderungen.")
|
|
save_last_update()
|
|
save_metadata(metadata)
|
|
exit(0)
|
|
|
|
# 2. TOC laden
|
|
toc = load_toc()
|
|
print()
|
|
|
|
# 3. Gesetze matchen und herunterladen
|
|
updated_count = 0
|
|
skipped_count = 0
|
|
not_found_count = 0
|
|
seen_abbrevs = set() # Duplikate vermeiden
|
|
|
|
for i, entry in enumerate(new_entries):
|
|
summary = entry['summary']
|
|
change_date = entry['date']
|
|
|
|
# Gesetz in TOC finden
|
|
abbrev, toc_entry = find_matching_law(summary, toc)
|
|
|
|
if abbrev is None:
|
|
print(f" [{i+1}/{len(new_entries)}] NICHT GEFUNDEN: {summary[:70]}")
|
|
not_found_count += 1
|
|
continue
|
|
|
|
if abbrev in seen_abbrevs:
|
|
# Schon in diesem Run verarbeitet
|
|
continue
|
|
seen_abbrevs.add(abbrev)
|
|
|
|
print(f" [{i+1}/{len(new_entries)}] {abbrev}: {toc_entry['title'][:60]}")
|
|
|
|
# Herunterladen und speichern
|
|
updated = download_and_save_law(abbrev, toc_entry['link'], change_date, metadata)
|
|
if updated:
|
|
print(f" -> AKTUALISIERT")
|
|
updated_count += 1
|
|
else:
|
|
print(f" -> unverändert")
|
|
skipped_count += 1
|
|
|
|
time.sleep(DOWNLOAD_DELAY)
|
|
|
|
# 4. Ergebnis speichern
|
|
save_last_update()
|
|
save_metadata(metadata)
|
|
|
|
print()
|
|
print("=" * 50)
|
|
print("ZUSAMMENFASSUNG")
|
|
print("=" * 50)
|
|
print(f" Neue RSS-Einträge: {len(new_entries)}")
|
|
print(f" Aktualisiert: {updated_count}")
|
|
print(f" Unverändert: {skipped_count}")
|
|
print(f" Nicht gefunden: {not_found_count}")
|
|
|
|
if updated_count > 0:
|
|
print(f"\n{updated_count} Gesetze aktualisiert.")
|
|
else:
|
|
print("\nKeine Gesetze aktualisiert.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|