"""
Grand Vision Secretary — Local Flask Application
Runs on localhost:5000
Handles: Word Count Tracking, Brief Implementation, Workflow Registry,
Session Log, Usage Dashboard, PDF to Text Converter
"""
import os
import json
import re
import sqlite3
import fitz # PyMuPDF
from datetime import datetime
from flask import Flask, render_template, request, jsonify, redirect, url_for
# Fix: Force Flask to look for templates in the folder where app.py lives
current_dir = os.path.dirname(os.path.abspath(__file__))
template_dir = os.path.join(current_dir, 'templates')
app = Flask(__name__, template_folder=template_dir)
# --- PATHS ---
BASE_DIR = r"V:\GrandVision"
AGENTS_DIR = os.path.join(BASE_DIR, "Secretary", "agents")
WORKFLOW_DIR = os.path.join(BASE_DIR, "Secretary", "workflow")
REGISTRY_PATH = os.path.join(WORKFLOW_DIR, "workflow_registry.jsonl")
DB_PATH = os.path.join(BASE_DIR, "Secretary", "secretary.db")
# --- DATABASE SETUP ---
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS word_count_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_date TEXT NOT NULL,
agent_identifier TEXT NOT NULL,
session_number INTEGER,
author_word_count INTEGER NOT NULL,
agent_word_count INTEGER NOT NULL,
author_token_estimate REAL NOT NULL,
agent_token_estimate REAL NOT NULL,
claude_usage_panel_tokens INTEGER,
notes TEXT,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS session_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
action_type TEXT NOT NULL,
description TEXT NOT NULL,
agent TEXT,
details TEXT
);
CREATE TABLE IF NOT EXISTS brief_implementations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
target_script TEXT NOT NULL,
action TEXT NOT NULL,
authorised INTEGER NOT NULL,
diff TEXT
);
""")
conn.commit()
conn.close()
def log_action(action_type, description, agent=None, details=None):
conn = get_db()
conn.execute(
"INSERT INTO session_log (timestamp, action_type, description, agent, details) VALUES (?,?,?,?,?)",
(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), action_type, description, agent, details)
)
conn.commit()
conn.close()
# --- WORD COUNT UTILITIES ---
def parse_dialogue(text):
"""
Parses pasted session dialogue into Author and Agent turns.
Detects speaker by common patterns: 'Human:', 'You:', 'Claude:', 'Assistant:'
Returns dict with author_words and agent_words counts.
"""
author_patterns = r'^(Human|You|Author)\s*:'
agent_patterns = r'^(Claude|Assistant|CA|CS|PIT|PIF|DOT|CIO|MRA|MD|Dean|Secretary)\s*[\u26c8\u03a3\u2b21\u25ce\u25c7\u2699\u25b3\u229e\u03a8]?\s*:'
lines = text.strip().split('\n')
author_words = 0
agent_words = 0
current_speaker = None
current_block = []
for line in lines:
if re.match(author_patterns, line, re.IGNORECASE):
if current_speaker == 'agent' and current_block:
agent_words += len(' '.join(current_block).split())
elif current_speaker == 'author' and current_block:
author_words += len(' '.join(current_block).split())
current_speaker = 'author'
content = re.sub(author_patterns, '', line, flags=re.IGNORECASE).strip()
current_block = [content] if content else []
elif re.match(agent_patterns, line, re.IGNORECASE):
if current_speaker == 'author' and current_block:
author_words += len(' '.join(current_block).split())
elif current_speaker == 'agent' and current_block:
agent_words += len(' '.join(current_block).split())
current_speaker = 'agent'
content = re.sub(agent_patterns, '', line, flags=re.IGNORECASE).strip()
current_block = [content] if content else []
else:
if current_block is not None:
current_block.append(line)
# Flush final block
if current_speaker == 'author' and current_block:
author_words += len(' '.join(current_block).split())
elif current_speaker == 'agent' and current_block:
agent_words += len(' '.join(current_block).split())
# Fallback: if no speakers detected, split 50/50 by paragraphs
if author_words == 0 and agent_words == 0:
total = len(text.split())
author_words = total // 2
agent_words = total - author_words
return {
'author_words': author_words,
'agent_words': agent_words,
'author_tokens': round(author_words * 1.33),
'agent_tokens': round(agent_words * 1.33),
'total_words': author_words + agent_words,
'total_tokens': round((author_words + agent_words) * 1.33)
}
# --- ROUTES ---
@app.route('/')
def index():
return redirect(url_for('word_count'))
@app.route('/word-count', methods=['GET', 'POST'])
def word_count():
result = None
error = None
if request.method == 'POST':
dialogue = request.form.get('dialogue', '').strip()
agent = request.form.get('agent', '').strip()
session_number = request.form.get('session_number', '').strip()
claude_usage = request.form.get('claude_usage', '').strip()
notes = request.form.get('notes', '').strip()
if not dialogue:
error = 'No dialogue provided.'
elif not agent:
error = 'Agent identifier required.'
else:
parsed = parse_dialogue(dialogue)
result = parsed
result['agent'] = agent # type: ignore
result['session_number'] = session_number # type: ignore
result['notes'] = notes # type: ignore
result['claude_usage'] = claude_usage # type: ignore
# Store in DB
conn = get_db()
conn.execute("""
INSERT INTO word_count_sessions
(session_date, agent_identifier, session_number, author_word_count,
agent_word_count, author_token_estimate, agent_token_estimate,
claude_usage_panel_tokens, notes, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?)
""", (
datetime.now().strftime("%Y-%m-%d"),
agent,
int(session_number) if session_number.isdigit() else None,
parsed['author_words'],
parsed['agent_words'],
parsed['author_tokens'],
parsed['agent_tokens'],
int(claude_usage) if claude_usage.isdigit() else None,
notes,
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
))
conn.commit()
conn.close()
log_action('WORD_COUNT', f'Session logged for {agent}', agent=agent)
# Fetch recent sessions
conn = get_db()
recent = conn.execute("""
SELECT * FROM word_count_sessions
ORDER BY created_at DESC LIMIT 10
""").fetchall()
# Totals per agent
agent_totals = conn.execute("""
SELECT agent_identifier,
SUM(author_word_count) as total_author_words,
SUM(agent_word_count) as total_agent_words,
SUM(author_token_estimate) as total_author_tokens,
SUM(agent_token_estimate) as total_agent_tokens,
COUNT(*) as session_count
FROM word_count_sessions
GROUP BY agent_identifier
ORDER BY total_agent_tokens DESC
""").fetchall()
conn.close()
return render_template('word_count.html',
result=result,
error=error,
recent=recent,
agent_totals=agent_totals)
@app.route('/brief-implementer')
def brief_implementer():
scripts = []
if os.path.exists(AGENTS_DIR):
scripts = [f for f in os.listdir(AGENTS_DIR) if f.endswith('.py')]
conn = get_db()
implementations = conn.execute("""
SELECT * FROM brief_implementations
ORDER BY timestamp DESC LIMIT 20
""").fetchall()
conn.close()
return render_template('brief_implementer.html', scripts=scripts,
implementations=implementations)
@app.route('/brief-implementer/parse', methods=["POST"])
def brief_implementer_parse():
import difflib
data = request.get_json()
script_name = data.get("script", "")
brief_text = data.get("brief", "").strip()
script_path = os.path.join(AGENTS_DIR, script_name)
if not os.path.exists(script_path):
return jsonify({"error": f"Script not found: {script_name}"})
with open(script_path, "r", encoding="utf-8") as f:
script_content = f.read()
# Parse brief fields
lines = {line.split(":", 1)[0].strip().upper(): line.split(":", 1)[1].strip()
for line in brief_text.splitlines() if ":" in line}
action = lines.get("ACTION", "").upper()
if action not in ("ADD", "REPLACE", "DELETE"):
return jsonify({"error": "Brief must contain ACTION: ADD, REPLACE, or DELETE."})
# Locate target — AFTER, REPLACE, or DELETE field
locator_key = "AFTER" if action == "ADD" else ("REPLACE" if action == "REPLACE" else "DELETE")
locator = lines.get(locator_key, "").strip()
new_content = lines.get("INSERT", lines.get("WITH", "")).strip()
if not locator:
return jsonify({"error": f"Brief must contain {locator_key}: <exact step text to locate>"})
# Find locator in script
if locator not in script_content:
# Try partial — match just the action field
action_match = re.search(r'"action"\s*:\s*"' + re.escape(
locator.strip('{}').split('"action"')[1].split('"')[2]
if '"action"' in locator else locator
) + r'"', script_content)
if action_match:
start = script_content.rfind('\n', 0, action_match.start()) + 1
end = script_content.find('\n', action_match.end()) + 1
locator = script_content[start:end].rstrip()
else:
return jsonify({
"error": f"Locator not found in script: '{locator[:80]}'. Copy the exact step text from the script."
})
# Apply change
if action == "DELETE":
after = script_content.replace(locator, "", 1)
description = f"DELETE step: {locator[:60]}"
elif action == "ADD":
after = script_content.replace(locator, locator + ",\n " + new_content, 1)
description = f"ADD after: {locator[:60]}"
else: # REPLACE
after = script_content.replace(locator, new_content, 1)
description = f"REPLACE: {locator[:60]}"
# Generate diff
diff_lines = list(difflib.unified_diff(
script_content.splitlines(),
after.splitlines(),
lineterm='',
n=3
))
diff_display = [l for l in diff_lines
if not l.startswith('---') and not l.startswith('+++')]
if not diff_display:
return jsonify({"error": "No difference detected. Check the locator matches exactly."})
return jsonify({
"action": action,
"description": description,
"target_match": locator,
"new_content": new_content,
"diff": diff_display,
"script": script_name,
"script_path": script_path,
"original_content": script_content
})
@app.route("/brief-implementer/authorise", methods=["POST"])
def brief_implementer_authorise():
data = request.get_json()
script_path = data.get("script_path", "")
original_content = data.get("original_content", "")
target_match = data.get("target_match", "")
new_content = data.get("new_content", "")
action = data.get("action", "")
description = data.get("description", "")
script_name = data.get("script", "")
if not script_path or not os.path.exists(script_path):
return jsonify({"success": False, "error": "Script path invalid."})
try:
if action == "DELETE":
updated = original_content.replace(target_match, "")
elif action in ("ADD", "REPLACE"):
if target_match not in original_content:
return jsonify({"success": False, "error": "Target match string not found in script."})
if action == "ADD":
updated = original_content.replace(target_match, target_match + ",\n " + new_content, 1)
else:
updated = original_content.replace(target_match, new_content)
else:
return jsonify({"success": False, "error": f"Unknown action: {action}"})
with open(script_path, "w", encoding="utf-8") as f:
f.write(updated)
conn = get_db()
conn.execute(
"INSERT INTO brief_implementations (timestamp, target_script, action, authorised, diff) VALUES (?,?,?,?,?)",
(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), script_name, description, 1, json.dumps(data.get("diff", [])))
)
conn.commit()
conn.close()
log_action("BRIEF_IMPL", f"Authorised: {description}", details=script_name)
return jsonify({"success": True})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
@app.route('/brief-compiler/authorise', methods=['POST'])
def brief_compiler_authorise():
data = request.get_json(force=True, silent=True)
if not data or 'compiled' not in data or 'title' not in data:
return jsonify({"status": "error", "message": "Missing compiled plan or title"}), 400
clean_title = re.sub(r'[\\/*?:"<>|]', "_", data['title'])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{clean_title}_{timestamp}.json"
# Path: V:\GrandVision\Secretary\briefs
briefs_dir = os.path.join(BASE_DIR, "Secretary", "briefs")
os.makedirs(briefs_dir, exist_ok=True)
filepath = os.path.join(briefs_dir, filename)
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(data['compiled'])
return jsonify({"status": "ok", "filename": filename, "message": "Saved successfully"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/workflow-registry', methods=['GET', 'POST'])
def workflow_registry():
entries = []
error = None
success = None
if request.method == 'POST':
entry = {
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'document_type': request.form.get('document_type', ''),
'issuing_agent': request.form.get('issuing_agent', ''),
'receiving_agent': request.form.get('receiving_agent', ''),
'brief_reference': request.form.get('brief_reference', ''),
'filename': request.form.get('filename', '')
}
if not entry['document_type'] or not entry['issuing_agent']:
error = 'Document type and issuing agent are required.'
else:
os.makedirs(os.path.dirname(REGISTRY_PATH), exist_ok=True)
with open(REGISTRY_PATH, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry) + '\n')
log_action('REGISTRY', f'Entry added: {entry["document_type"]} from {entry["issuing_agent"]}')
success = 'Registry entry added.'
if os.path.exists(REGISTRY_PATH):
with open(REGISTRY_PATH, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
entries = list(reversed(entries))
return render_template('workflow_registry.html',
entries=entries,
error=error,
success=success)
@app.route('/session-log')
def session_log():
conn = get_db()
logs = conn.execute("""
SELECT * FROM session_log
ORDER BY timestamp DESC LIMIT 100
""").fetchall()
conn.close()
return render_template('session_log.html', logs=logs)
@app.route('/usage-dashboard')
def usage_dashboard():
conn = get_db()
sessions = conn.execute("""
SELECT session_date, agent_identifier,
SUM(author_token_estimate + agent_token_estimate) as total_tokens,
SUM(claude_usage_panel_tokens) as claude_tokens
FROM word_count_sessions
GROUP BY session_date, agent_identifier
ORDER BY session_date DESC
""").fetchall()
grand_totals = conn.execute("""
SELECT
SUM(author_word_count) as total_author_words,
SUM(agent_word_count) as total_agent_words,
SUM(author_token_estimate) as total_author_tokens,
SUM(agent_token_estimate) as total_agent_tokens,
SUM(claude_usage_panel_tokens) as total_claude_tokens,
COUNT(*) as total_sessions
FROM word_count_sessions
""").fetchone()
conn.close()
return render_template('usage_dashboard.html',
sessions=sessions,
grand_totals=grand_totals)
@app.route('/pdf-converter')
def pdf_converter():
return render_template('pdf_converter.html')
@app.route('/pdf-converter/convert', methods=['POST'])
def pdf_converter_convert():
if 'file' not in request.files:
return jsonify({"status": "error", "message": "No file part"}), 400
files = request.files.getlist('file')
if not files or all(f.filename == '' for f in files):
return jsonify({"status": "error", "message": "No selected files"}), 400
# Output directory defined using os.path.join
plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
os.makedirs(plaintext_dir, exist_ok=True)
results = []
for file in files:
if file.filename == '':
continue
try:
# 1. Extract text
file_bytes = file.read()
doc = fitz.open(stream=file_bytes, filetype="pdf")
text_blocks = []
for page in doc:
text = page.get_text()
text_blocks.append(text)
doc.close()
full_text = "\n".join(text_blocks)
# 2. Clean text
# Strip excessive blank lines (max two consecutive)
cleaned_text = re.sub(r'\n{3,}', '\n\n', full_text)
cleaned_text = cleaned_text.strip()
# 3. Derive filename
base_name, _ = os.path.splitext(str(file.filename))
# Remove illegal path characters
safe_base_name = re.sub(r'[^a-zA-Z0-9_\-\s]', '', base_name).strip()
output_filename = f"{safe_base_name}.txt"
# 4. Save file
save_path = os.path.join(plaintext_dir, output_filename)
with open(save_path, 'w', encoding='utf-8') as f:
f.write(cleaned_text)
# 5. Record result
char_count = len(cleaned_text)
word_count = len(cleaned_text.split())
results.append({
"filename": file.filename,
"output_filename": output_filename,
"word_count": word_count,
"char_count": char_count,
"path": save_path,
"status": "ok",
"error": None
})
except Exception as e:
results.append({
"filename": file.filename,
"output_filename": "",
"word_count": 0,
"char_count": 0,
"path": "",
"status": "error",
"error": str(e)
})
return jsonify({"status": "ok", "results": results})
@app.route('/pdf-converter/clear', methods=['POST'])
def pdf_converter_clear():
plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
deleted_count = 0
if os.path.exists(plaintext_dir):
for filename in os.listdir(plaintext_dir):
if filename.endswith('.txt'):
try:
filepath = os.path.join(plaintext_dir, filename)
os.remove(filepath)
deleted_count += 1
except Exception:
pass
return jsonify({"status": "ok", "deleted": deleted_count})
@app.route('/pdf-converter/files', methods=['GET'])
def pdf_converter_files():
plaintext_dir = os.path.join(BASE_DIR, "Secretary", "plaintext")
files_list = []
if os.path.exists(plaintext_dir):
for filename in os.listdir(plaintext_dir):
if filename.endswith('.txt'):
filepath = os.path.join(plaintext_dir, filename)
try:
stats = os.stat(filepath)
# Get word count directly from reading the file contents
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
word_count = len(content.split())
mod_time = datetime.fromtimestamp(stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
files_list.append({
"filename": filename,
"word_count": word_count,
"modified": mod_time,
"path": filepath
})
except Exception:
pass
# Sort files by modified date descending
files_list.sort(key=lambda x: x['modified'], reverse=True)
return jsonify({"status": "ok", "files": files_list})
if __name__ == '__main__':
# Initialize database
try:
init_db()
except Exception as e:
print(f"Database init failed: {e}")
# PID Registration
import atexit
# We use a more robust path detection for the PID file
current_file_dir = os.path.dirname(os.path.abspath(__file__))
pid_path = os.path.join(current_file_dir, "secretary.pid")
try:
with open(pid_path, "w") as f:
f.write(str(os.getpid()))
def cleanup_pid():
if os.path.exists(pid_path):
os.remove(pid_path)
atexit.register(cleanup_pid)
except Exception as e:
print(f"Could not write PID file: {e}")
print("\n[SECRETARY] Grand Vision Secretary starting...")
# Run the app
# host='0.0.0.0' ensures it listens on all local addresses
app.run(debug=True, port=5000, host='0.0.0.0', use_reloader=False)