Files
crm.clientright.ru/crm_extensions/court_document_parser.py
Fedor dabcd43a00 Добавлены скрипты для парсинга судебных документов и обновления проектов в CRM
- court_document_parser.py: парсер судебных документов с извлечением ФИО, номера дела, УИД, суда
- court_parser_api.py: API для вызова парсера из n8n
- pdf_court_parser.py: парсер PDF документов с извлечением текста
- simple_project_updater.php: обновление проектов через CRM API
- simple_project_updater_v2.php: обновленная версия с прямыми SQL запросами и S3Client
- update_project_from_document.php: альтернативный скрипт обновления
- test_input.json: тестовые данные для парсера
- README файлы с документацией для всех скриптов

Скрипты поддерживают:
- Поиск проектов по ФИО, номеру дела, УИД, названию суда
- Создание документов в CRM с загрузкой в S3
- Привязку документов к проектам
- Логирование всех операций
- Работу с n8n через SSH команды
2025-09-30 19:54:37 +03:00

248 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Парсер судебных документов для извлечения структурированных данных
Использование: python3 court_document_parser.py
"""
import json
import re
import sys
import argparse
from typing import Dict, List, Optional, Any
from datetime import datetime
class CourtDocumentParser:
"""Парсер судебных документов для извлечения структурированных данных"""
def __init__(self):
# Паттерны для извлечения данных
self.patterns = {
# Номер дела
'case_number': [
r'\s*([А-Яа-я0-9\-/]+)',
r'дело\s*№\s*([А-Яа-я0-9\-/]+)',
r'\s*дела\s*([А-Яа-я0-9\-/]+)',
r'([А-Яа-я0-9\-/]+)\s*№\s*([А-Яа-я0-9\-/]+)',
],
# Номер исполнительного листа
'execution_number': [
r'исполнительный\s*лист\s*№\s*([А-Яа-я0-9\-/]+)',
r'\s*исполнительного\s*листа\s*([А-Яа-я0-9\-/]+)',
r'ИЛ\s*№\s*([А-Яа-я0-9\-/]+)',
],
# УИД
'uid': [
r'УИД\s*№\s*([0-9]{2}[А-Я]{2}[0-9]{3}-[0-9]{2}-[0-9]{4}-[0-9]{6}-[0-9]{2})',
r'уникальный\s*идентификатор\s*([0-9]{2}[А-Я]{2}[0-9]{3}-[0-9]{2}-[0-9]{4}-[0-9]{6}-[0-9]{2})',
r'([0-9]{2}[А-Я]{2}[0-9]{3}-[0-9]{2}-[0-9]{4}-[0-9]{6}-[0-9]{2})',
r'УИД\s*№\s*([А-Яа-я0-9\-/]+)',
],
# Суд
'court': [
r'([А-Яа-я\s]+городской\s*суд)',
r'([А-Яа-я\s]+районный\s*суд)',
r'([А-Яа-я\s]+областной\s*суд)',
r'([А-Яа-я\s]+суд)',
],
# ФИО истца
'plaintiff': [
r'в\s*интересах\s*([А-Я][а-я]+\s+[А-Я][а-я]+\s+[А-Я][а-я]+)',
r'истец[а\s]*([А-Я][а-я]+\s+[А-Я][а-я]+\s+[А-Я][а-я]+)',
r'([А-Я][а-я]+\s+[А-Я][а-я]+\s+[А-Я][а-я]+)\s*к\s*([А-Яа-я\s«»""]+)',
],
# ФИО ответчика
'defendant': [
r'к\s*([А-Яа-я\s«»""]+?)\s*о\s*взыскании',
r'ответчик[а\s]*([А-Яа-я\s«»""]+)',
r'([А-Яа-я\s«»""]+?)\s*о\s*взыскании',
],
# Название документа
'document_type': [
r'(ОПРЕДЕЛЕНИЕ|РЕШЕНИЕ|ПОСТАНОВЛЕНИЕ|ПРИКАЗ)',
r'([А-Яа-я\s]+)\s*от\s*\d+',
r'([А-Яа-я\s]+)\s*№\s*[А-Яа-я0-9\-/]+',
],
# Дата документа
'document_date': [
r'(\d{1,2}\s+[а-я]+\s+\d{4}\s+г\.)',
r'(\d{1,2}\.\d{1,2}\.\d{4})',
r'от\s*(\d{1,2}\s+[а-я]+\s+\d{4}\s+г\.)',
],
# Судья
'judge': [
r'судья[а\s]*([А-Я][а-я]+\s+[А-Я]\.[А-Я]\.)',
r'([А-Я][а-я]+\s+[А-Я]\.[А-Я]\.)\s*судья',
r'([А-Я][а-я]+\s+[А-Я]\.[А-Я]\.)',
r'([А-Я][а-я]+\s+[А-Я]\.[А-Я]\.)',
],
}
def extract_data(self, text: str) -> Dict[str, Any]:
"""Извлекает структурированные данные из текста документа"""
result = {
'case_number': None,
'execution_number': None,
'uid': None,
'court': None,
'plaintiff': None,
'defendant': None,
'document_type': None,
'document_date': None,
'judge': None,
'raw_text': text,
'extraction_timestamp': datetime.now().isoformat(),
}
# Нормализуем текст для поиска
normalized_text = re.sub(r'\s+', ' ', text.strip())
# Извлекаем данные по каждому паттерну
for field, patterns in self.patterns.items():
for pattern in patterns:
matches = re.findall(pattern, normalized_text, re.IGNORECASE)
if matches:
# Берем первое найденное совпадение
if isinstance(matches[0], tuple):
result[field] = matches[0][0] if matches[0][0] else matches[0][1]
else:
result[field] = matches[0]
break
# Дополнительная обработка для специфических случаев
self._post_process_result(result, normalized_text)
return result
def _post_process_result(self, result: Dict[str, Any], text: str):
"""Дополнительная обработка результатов"""
# Очистка и нормализация данных
for key in ['case_number', 'execution_number', 'uid', 'court', 'plaintiff', 'defendant', 'document_type', 'judge']:
if result[key]:
result[key] = result[key].strip()
# Специальная обработка для номера дела
if result['case_number']:
# Убираем лишние символы, оставляем только нужный формат
result['case_number'] = re.sub(r'[^\w\-/]', '', result['case_number'])
# Специальная обработка для УИД
if result['uid']:
# Убираем лишние символы, оставляем только нужный формат
result['uid'] = re.sub(r'[^\w\-]', '', result['uid'])
# Специальная обработка для суда
if result['court']:
# Убираем лишние слова и нормализуем
result['court'] = re.sub(r'\s+(суд|суды|суда|суду|судом|суде)\s*$', '', result['court'], flags=re.IGNORECASE)
result['court'] = re.sub(r'\s+судья[а\s]*', '', result['court'], flags=re.IGNORECASE)
result['court'] = result['court'].strip()
# Специальная обработка для ответчика
if result['defendant']:
# Убираем кавычки и лишние символы
result['defendant'] = re.sub(r'[«»""]', '', result['defendant'])
result['defendant'] = re.sub(r'\s+о\s+взыскании.*$', '', result['defendant'], flags=re.IGNORECASE)
result['defendant'] = result['defendant'].strip()
# Специальная обработка для истца
if result['plaintiff']:
# Убираем лишние слова
result['plaintiff'] = re.sub(r'\s+к\s+.*$', '', result['plaintiff'])
result['plaintiff'] = result['plaintiff'].strip()
# Специальная обработка для судьи
if result['judge']:
# Убираем лишние символы
result['judge'] = re.sub(r'[^\w\s\.]', '', result['judge'])
result['judge'] = result['judge'].strip()
def parse_documents(self, documents: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""Парсит список документов"""
results = []
for doc in documents:
if 'combinedText' in doc:
extracted_data = self.extract_data(doc['combinedText'])
results.append(extracted_data)
else:
# Если нет combinedText, пытаемся найти текст в других полях
text = None
for key in ['text', 'content', 'body', 'message']:
if key in doc:
text = doc[key]
break
if text:
extracted_data = self.extract_data(text)
results.append(extracted_data)
else:
results.append({
'error': 'No text found in document',
'raw_document': doc
})
return results
def main():
"""Основная функция для запуска из командной строки"""
parser = argparse.ArgumentParser(description='Парсер судебных документов')
parser.add_argument('--input', '-i', help='Входной JSON файл')
parser.add_argument('--output', '-o', help='Выходной JSON файл')
parser.add_argument('--stdin', action='store_true', help='Читать из stdin')
parser.add_argument('--stdout', action='store_true', help='Выводить в stdout')
args = parser.parse_args()
# Создаем парсер
document_parser = CourtDocumentParser()
try:
# Получаем входные данные
if args.stdin:
input_data = json.loads(sys.stdin.read())
elif args.input:
with open(args.input, 'r', encoding='utf-8') as f:
input_data = json.load(f)
else:
# По умолчанию читаем из stdin
input_data = json.loads(sys.stdin.read())
# Парсим документы
if isinstance(input_data, list):
results = document_parser.parse_documents(input_data)
else:
# Если передан один документ
results = document_parser.parse_documents([input_data])
# Выводим результаты
if args.stdout or not args.output:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return 0
except json.JSONDecodeError as e:
print(f"Ошибка парсинга JSON: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Ошибка: {e}", file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())