Not a member of GistPad yet?
Sign Up,
it unlocks many cool features!
- """
- Grand Vision Secretary — Local Flask Application
- Runs on localhost:5000
- Handles: Word Count Tracking, Brief Implementation, Workflow Registry,
- Session Log, Usage Dashboard, PDF to Text Converter
- """
- import os
- import json
- import re
- import sqlite3
- import fitz # PyMuPDF
- from datetime import datetime
- from flask import Flask, render_template, request, jsonify, redirect, url_for
- # Fix: Force Flask to look for templates in the folder where app.py lives
- current_dir = os.path.dirname(os.path.abspath(__file__))
- template_dir = os.path.join(current_dir, 'templates')
- app = Flask(__name__, template_folder=template_dir)
- # --- PATHS ---
- BASE_DIR = r"V:\GrandVision"
- AGENTS_DIR = os.path.join(BASE_DIR, "Secretary", "agents")
- WORKFLOW_DIR = os.path.join(BASE_DIR, "Secretary", "workflow")
- REGISTRY_PATH = os.path.join(WORKFLOW_DIR, "workflow_registry.jsonl")
- DB_PATH = os.path.join(BASE_DIR, "Secretary", "secretary.db")
- # --- DATABASE SETUP ---
- def get_db():
- conn = sqlite3.connect(DB_PATH)
- conn.row_factory = sqlite3.Row
- return conn
- def init_db():
- conn = get_db()
- conn.executescript("""
- CREATE TABLE IF NOT EXISTS word_count_sessions (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- session_date TEXT NOT NULL,
- agent_identifier TEXT NOT NULL,
- session_number INTEGER,
- author_word_count INTEGER NOT NULL,
- agent_word_count INTEGER NOT NULL,
- author_token_estimate REAL NOT NULL,
- agent_token_estimate REAL NOT NULL,
- claude_usage_panel_tokens INTEGER,
- notes TEXT,
- created_at TEXT NOT NULL
- );
- CREATE TABLE IF NOT EXISTS session_log (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL,
- action_type TEXT NOT NULL,
- description TEXT NOT NULL,
- agent TEXT,
- details TEXT
- );
- CREATE TABLE IF NOT EXISTS brief_implementations (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- timestamp TEXT NOT NULL,
- target_script TEXT NOT NULL,
- action TEXT NOT NULL,
- authorised INTEGER NOT NULL,
- diff TEXT
- );
- """)
- conn.commit()
- conn.close()
- def log_action(action_type, description, agent=None, details=None):
- conn = get_db()
- conn.execute(
- "INSERT INTO session_log (timestamp, action_type, description, agent, details) VALUES (?,?,?,?,?)",
- (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), action_type, description, agent, details)
- )
- conn.commit()
- conn.close()
- # --- WORD COUNT UTILITIES ---
- def parse_dialogue(text):
- """
- Parses pasted session dialogue into Author and Agent turns.
- Detects speaker by common patterns: 'Human:', 'You:', 'Claude:', 'Assistant:'
- Returns dict with author_words and agent_words counts.
- """
- author_patterns = r'^(Human|You|Author)\s*:'
- agent_patterns = r'^(Claude|Assistant|CA|CS|PIT|PIF|DOT|CIO|MRA|MD|Dean|Secretary)\s*[\u26c8\u03a3\u2b21\u25ce\u25c7\u2699\u25b3\u229e\u03a8]?\s*:'
- lines = text.strip().split('\n')
- author_words = 0
- agent_words = 0
- current_speaker = None
- current_block = []
- for line in lines:
- if re.match(author_patterns, line, re.IGNORECASE):
- if current_speaker == 'agent' and current_block:
- agent_words += len(' '.join(current_block).split())
- elif current_speaker == 'author' and current_block:
- author_words += len(' '.join(current_block).split())
- current_speaker = 'author'
- content = re.sub(author_patterns, '', line, flags=re.IGNORECASE).strip()
- current_block = [content] if content else []
- elif re.match(agent_patterns, line, re.IGNORECASE):
- if current_speaker == 'author' and current_block:
- author_words += len(' '.join(current_block).split())
- elif current_speaker == 'agent' and current_block:
- agent_words += len(' '.join(current_block).split())
- current_speaker = 'agent'
- content = re.sub(agent_patterns, '', line, flags=re.IGNORECASE).strip()
- current_block = [content] if content else []
- else:
- if current_block is not None:
- current_block.append(line)
- # Flush final block
- if current_speaker == 'author' and current_block:
- author_words += len(' '.join(current_block).split())
- elif current_speaker == 'agent' and current_block:
- agent_words += len(' '.join(current_block).split())
- # Fallback: if no speakers detected, split 50/50 by paragraphs
- if author_words == 0 and agent_words == 0:
- total = len(text.split())
- author_words = total // 2
- agent_words = total - author_words
- return {
- 'author_words': author_words,
- 'agent_words': agent_words,
- 'author_tokens': round(author_words * 1.33),
- 'agent_tokens': round(agent_words * 1.33),
- 'total_words': author_words + agent_words,
- 'total_tokens': round((author_words + agent_words) * 1.33)
- }
- # --- ROUTES ---
- @app.route('/')
- def index():
- return redirect(url_for('word_count'))
- @app.route('/word-count', methods=['GET', 'POST'])
- def word_count():
- result = None
- error = None
- if request.method == 'POST':
- dialogue = request.form.get('dialogue', '').strip()
- agent = request.form.get('agent', '').strip()
- session_number = request.form.get('session_number', '').strip()
- claude_usage = request.form.get('claude_usage', '').strip()
- notes = request.form.get('notes', '').strip()
- if not dialogue:
- error = 'No dialogue provided.'
- elif not agent:
- error = 'Agent identifier required.'
- else:
- parsed = parse_dialogue(dialogue)
- result = parsed
- result['agent'] = agent # type: ignore
- result['session_number'] = session_number # type: ignore
- result['notes'] = notes # type: ignore
- result['claude_usage'] = claude_usage # type: ignore
- # Store in DB
- conn = get_db()
- conn.execute("""
- INSERT INTO word_count_sessions
- (session_date, agent_identifier, session_number, author_word_count,
- agent_word_count, author_token_estimate, agent_token_estimate,
- claude_usage_panel_tokens, notes, created_at)
- VALUES (?,?,?,?,?,?,?,?,?,?)
- """, (
- datetime.now().strftime("%Y-%m-%d"),
- agent,
- int(session_number) if session_number.isdigit() else None,
- parsed['author_words'],
- parsed['agent_words'],
- parsed['author_tokens'],
- parsed['agent_tokens'],
- int(claude_usage) if claude_usage.isdigit() else None,
- notes,
- datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- ))
- conn.commit()
- conn.close()
- log_action('WORD_COUNT', f'Session logged for {agent}', agent=agent)
- # Fetch recent sessions
- conn = get_db()
- recent = conn.execute("""
- SELECT * FROM word_count_sessions
- ORDER BY created_at DESC LIMIT 10
- """).fetchall()
- # Totals per agent
- agent_totals = conn.execute("""
- SELECT agent_identifier,
- SUM(author_word_count) as total_author_words,
- SUM(agent_word_count) as total_agent_words,
- SUM(author_token_estimate) as total_author_tokens,
- SUM(agent_token_estimate) as total_agent_tokens,
- COUNT(*) as session_count
- FROM word_count_sessions
- GROUP BY agent_identifier
- ORDER BY total_agent_tokens DESC
- """).fetchall()
- conn.close()
- return render_template('word_count.html',
- result=result,
- error=error,
- recent=recent,
- agent_totals=agent_totals)
- @app.route('/brief-implementer')
- def brief_implementer():
- scripts = []
- if os.path.exists(AGENTS_DIR):
- scripts = [f for f in os.listdir(AGENTS_DIR) if f.endswith('.py')]
- conn = get_db()
- implementations = conn.execute("""
- SELECT * FROM brief_implementations
- ORDER BY timestamp DESC LIMIT 20
- """).fetchall()
- conn.close()
- return render_template('brief_implementer.html', scripts=scripts,
- implementations=implementations)
- @app.route('/brief-implementer/parse', methods=["POST"])
- def brief_implementer_parse():
- import difflib
- data = request.get_json()
- script_name = data.get("script", "")
- brief_text = data.get("brief", "").strip()
- script_path = os.path.join(AGENTS_DIR, script_name)
- if not os.path.exists(script_path):
- return jsonify({"error": f"Script not found: {script_name}"})
- with open(script_path, "r", encoding="utf-8") as f:
- script_content = f.read()
- # Parse brief fields
- lines = {line.split(":", 1)[0].strip().upper(): line.split(":", 1)[1].strip()
- for line in brief_text.splitlines() if ":" in line}
- action = lines.get("ACTION", "").upper()
- if action not in ("ADD", "REPLACE", "DELETE"):
- return jsonify({"error": "Brief must contain ACTION: ADD, REPLACE, or DELETE."})
- # Locate target — AFTER, REPLACE, or DELETE field
- locator_key = "AFTER" if action == "ADD" else ("REPLACE" if action == "REPLACE" else "DELETE")
- locator = lines.get(locator_key, "").strip()
- new_content = lines.get("INSERT", lines.get("WITH", "")).strip()
- if not locator:
- return jsonify({"error": f"Brief must contain {locator_key}: <exact step text to locate>"})
- # Find locator in script
- if locator not in script_content:
- # Try partial — match just the action field
- action_match = re.search(r'"action"\s*:\s*"' + re.escape(
- locator.strip('{}').split('"action"')[1].split('"')[2]
- if '"action"' in locator else locator
- ) + r'"', script_content)
- if action_match:
- start = script_content.rfind('\n', 0, action_match.start()) + 1
- end = script_content.find('\n', action_match.end()) + 1
- locator = script_content[start:end].rstrip()
- else:
- return jsonify({
- "error": f"Locator not found in script: '{locator[:80]}'. Copy the exact step text from the script."
- })
- # Apply change
- if action == "DELETE":
- after = script_content.replace(locator, "", 1)
- description = f"DELETE step: {locator[:60]}"
- elif action == "ADD":
- after = script_content.replace(locator, locator + ",\n " + new_content, 1)
- description = f"ADD after: {locator[:60]}"
- else: # REPLACE
- after = script_content.replace(locator, new_content, 1)
- description = f"REPLACE: {locator[:60]}"
- # Generate diff
- diff_lines = list(difflib.unified_diff(
- script_content.splitlines(),
- after.splitlines(),
- lineterm='',
- n=3
- ))
- diff_display = [l for l in diff_lines
- if not l.startswith('---') and not l.startswith('+++')]
- if not diff_display:
- return jsonify({"error": "No difference detected. Check the locator matches exactly."})
- return jsonify({
- "action": action,
- "description": description,
- "target_match": locator,
- "new_content": new_content,
- "diff": diff_display,
- "script": script_name,
- "script_path": script_path,
- "original_content": script_content
- })
- @app.route("/brief-implementer/authorise", methods=["POST"])
- def brief_implementer_authorise():
- data = request.get_json()
- script_path = data.get("script_path", "")
- original_content = data.get("original_content", "")
- target_match = data.get("target_match", "")
- new_content = data.get("new_content", "")
- action = data.get("action", "")
- description = data.get("description", "")
- script_name = data.get("script", "")
- if not script_path or not os.path.exists(script_path):
- return jsonify({"success": False, "error": "Script path invalid."})
- try:
- if action == "DELETE":
- updated = original_content.replace(target_match, "")
- elif action in ("ADD", "REPLACE"):
- if target_match not in original_content:
- return jsonify({"success": False, "error": "Target match string not found in script."})
- if action == "ADD":
- updated = original_content.replace(target_match, target_match + ",\n " + new_content, 1)
- else:
- updated = original_content.replace(target_match, new_content)
- else:
- return jsonify({"success": False, "error": f"Unknown action: {action}"})
- with open(script_path, "w", encoding="utf-8") as f:
- f.write(updated)
- conn = get_db()
- conn.execute(
- "INSERT INTO brief_implementations (timestamp, target_script, action, authorised, diff) VALUES (?,?,?,?,?)",
- (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), script_name, description, 1, json.dumps(data.get("diff", [])))
- )
- conn.commit()
- conn.close()
- log_action("BRIEF_IMPL", f"Authorised: {description}", details=script_name)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)})
- @app.route('/brief-compiler/authorise', methods=['POST'])
- def brief_compiler_authorise():
- data = request.get_json(force=True, silent=True)
- if not data or 'compiled' not in data or 'title' not in data:
- return jsonify({"status": "error", "message": "Missing compiled plan or title"}), 400
- clean_title = re.sub(r'[\\/*?:"<>|]', "_", data['title'])
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{clean_title}_{timestamp}.json"
- # Path: V:\GrandVision\Secretary\briefs
- briefs_dir = os.path.join(BASE_DIR, "Secretary", "briefs")
- os.makedirs(briefs_dir, exist_ok=True)
- filepath = os.path.join(briefs_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- f.write(data['compiled'])
- return jsonify({"status": "ok", "filename": filename, "message": "Saved successfully"})
- except Exception as e:
- return jsonify({"status": "error", "message": str(e)}), 500
- @app.route('/workflow-registry', methods=['GET', 'POST'])
- def workflow_registry():
- entries = []
- error = None
- success = None
- if request.method == 'POST':
- entry = {
- 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'document_type': request.form.get('document_type', ''),
- 'issuing_agent': request.form.get('issuing_agent', ''),
- 'receiving_agent': request.form.get('receiving_agent', ''),
- 'brief_reference': request.form.get('brief_reference', ''),
- 'filename': request.form.get('filename', '')
- }
- if not entry['document_type'] or not entry['issuing_agent']:
- error = 'Document type and issuing agent are required.'
- else:
- os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True)
- with open(REGISTRY_PATH, 'a', encoding='utf-8') as f:
- f.write(json.dumps(entry) + '\n')
- log_action('REGISTRY', f'Entry added: {entry["document_type"]} from {entry["issuing_agent"]}')
- success = 'Registry entry added.'
- if os.path.exists(REGISTRY_PATH):
- with open(REGISTRY_PATH, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.strip()
- if line:
- entries.append(json.loads(line))
- entries = list(reversed(entries))
- return render_template('workflow_registry.html',
- entries=entries,
- error=error,
- success=success)
- @app.route('/session-log')
- def session_log():
- conn = get_db()
- logs = conn.execute("""
- SELECT * FROM session_log
- ORDER BY timestamp DESC LIMIT 100
- """).fetchall()
- conn.close()
- return render_template('session_log.html', logs=logs)
- @app.route('/usage-dashboard')
- def usage_dashboard():
- conn = get_db()
- sessions = conn.execute("""
- SELECT session_date, agent_identifier,
- SUM(author_token_estimate + agent_token_estimate) as total_tokens,
- SUM(claude_usage_panel_tokens) as claude_tokens
- FROM word_count_sessions
- GROUP BY session_date, agent_identifier
- ORDER BY session_date DESC
- """).fetchall()
- grand_totals = conn.execute("""
- SELECT
- SUM(author_word_count) as total_author_words,
- SUM(agent_word_count) as total_agent_words,
- SUM(author_token_estimate) as total_author_tokens,
- SUM(agent_token_estimate) as total_agent_tokens,
- SUM(claude_usage_panel_tokens) as total_claude_tokens,
- COUNT(*) as total_sessions
- FROM word_count_sessions
- """).fetchone()
- conn.close()
- return render_template('usage_dashboard.html',
- sessions=sessions,
- grand_totals=grand_totals)
- @app.route('/pdf-converter')
- def pdf_converter():
- return render_template('pdf_converter.html')
- @app.route('/pdf-converter/convert', methods=['POST'])
- def pdf_converter_convert():
- if 'file' not in request.files:
- return jsonify({"status": "error", "message": "No file part"}), 400
- files = request.files.getlist('file')
- if not files or all(f.filename == '' for f in files):
- return jsonify({"status": "error", "message": "No selected files"}), 400
- # Output directory defined using os.path.join
- plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
- os.makedirs(plaintext_dir, exist_ok=True)
- results = []
- for file in files:
- if file.filename == '':
- continue
- try:
- # 1. Extract text
- file_bytes = file.read()
- doc = fitz.open(stream=file_bytes, filetype="pdf")
- text_blocks = []
- for page in doc:
- text = page.get_text()
- text_blocks.append(text)
- doc.close()
- full_text = "\n".join(text_blocks)
- # 2. Clean text
- # Strip excessive blank lines (max two consecutive)
- cleaned_text = re.sub(r'\n{3,}', '\n\n', full_text)
- cleaned_text = cleaned_text.strip()
- # 3. Derive filename
- base_name, _ = os.path.splitext(str(file.filename))
- # Remove illegal path characters
- safe_base_name = re.sub(r'[^a-zA-Z0-9_\-\s]', '', base_name).strip()
- output_filename = f"{safe_base_name}.txt"
- # 4. Save file
- save_path = os.path.join(plaintext_dir, output_filename)
- with open(save_path, 'w', encoding='utf-8') as f:
- f.write(cleaned_text)
- # 5. Record result
- char_count = len(cleaned_text)
- word_count = len(cleaned_text.split())
- results.append({
- "filename": file.filename,
- "output_filename": output_filename,
- "word_count": word_count,
- "char_count": char_count,
- "path": save_path,
- "status": "ok",
- "error": None
- })
- except Exception as e:
- results.append({
- "filename": file.filename,
- "output_filename": "",
- "word_count": 0,
- "char_count": 0,
- "path": "",
- "status": "error",
- "error": str(e)
- })
- return jsonify({"status": "ok", "results": results})
- @app.route('/pdf-converter/clear', methods=['POST'])
- def pdf_converter_clear():
- plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
- deleted_count = 0
- if os.path.exists(plaintext_dir):
- for filename in os.listdir(plaintext_dir):
- if filename.endswith('.txt'):
- try:
- filepath = os.path.join(plaintext_dir, filename)
- os.remove(filepath)
- deleted_count += 1
- except Exception:
- pass
- return jsonify({"status": "ok", "deleted": deleted_count})
- @app.route('/pdf-converter/files', methods=['GET'])
- def pdf_converter_files():
- plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
- files_list = []
- if os.path.exists(plaintext_dir):
- for filename in os.listdir(plaintext_dir):
- if filename.endswith('.txt'):
- filepath = os.path.join(plaintext_dir, filename)
- try:
- stats = os.stat(filepath)
- # Get word count directly from reading the file contents
- with open(filepath, 'r', encoding='utf-8') as f:
- content = f.read()
- word_count = len(content.split())
- mod_time = datetime.fromtimestamp(stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
- files_list.append({
- "filename": filename,
- "word_count": word_count,
- "modified": mod_time,
- "path": filepath
- })
- except Exception:
- pass
- # Sort files by modified date descending
- files_list.sort(key=lambda x: x['modified'], reverse=True)
- return jsonify({"status": "ok", "files": files_list})
- if __name__ == '__main__':
- # Initialize database
- try:
- init_db()
- except Exception as e:
- print(f"Database init failed: {e}")
- # PID Registration
- import atexit
- # We use a more robust path detection for the PID file
- current_file_dir = os.path.dirname(os.path.abspath(__file__))
- pid_path = os.path.join(current_file_dir, "secretary.pid")
- try:
- with open(pid_path, "w") as f:
- f.write(str(os.getpid()))
- def cleanup_pid():
- if os.path.exists(pid_path):
- os.remove(pid_path)
- atexit.register(cleanup_pid)
- except Exception as e:
- print(f"Could not write PID file: {e}")
- print("\n[SECRETARY] Grand Vision Secretary starting...")
- print("Accessible at: http://127.0.0.1:5000")
- # Run the app
- # host='0.0.0.0' ensures it listens on all local addresses
- app.run(debug=True, port=5000, host='0.0.0.0', use_reloader=False)
RAW Paste Data
Copied
