import time import numpy as np from smbus2 import SMBus import requests # ========================= # 1) SUPABASE CONFIG # ========================= SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co" SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" TABLE = "heart_reading" UPLOAD_INTERVAL = 5.0 # Upload every 5 seconds def send_to_supabase(bpm, ir, red, spo2=98): url = f"{SUPABASE_URL}/rest/v1/{TABLE}" headers = { "apikey": SUPABASE_ANON_KEY, "Authorization": f"Bearer {SUPABASE_ANON_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", } payload = { "bpm": int(round(bpm)) if bpm else None, "spo2": int(spo2), "ir": int(ir), "red": int(red) } try: r = requests.post(url, headers=headers, json=payload, timeout=5) return r.ok except: return False # ========================= # 2) MAX30102 CORE # ========================= MAX30102_ADDR = 0x57 REG_FIFO_DATA = 0x07 REG_MODE_CONFIG = 0x09 REG_SPO2_CONFIG = 0x0A REG_LED1_PA = 0x0C REG_LED2_PA = 0x0D def init_max30102(bus): bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset time.sleep(0.1) bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24) bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24) def read_fifo(bus): d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6) red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF return red, ir def estimate_bpm_autocorr(ir_signal, fs_hz): x = np.array(ir_signal, dtype=np.float64) x = x - np.mean(x) w = 3 # Smaller smoothing window for smaller buffer x = np.convolve(x, np.ones(w)/w, mode='same') ac = np.correlate(x, x, mode='full') ac = ac[len(ac)//2:] min_lag = int(fs_hz * 60 / 180) max_lag = int(fs_hz * 60 / 45) if max_lag >= len(ac): return None seg = ac[min_lag:max_lag] if len(seg) == 0: return None return 60.0 * fs_hz / (np.argmax(seg) + min_lag) # ========================= # 3) MAIN LOGIC # ========================= def main(): fs = 50 N = fs * 2 # <--- CHANGED: Buffer is now 2 seconds (100 samples) ir_buf = [] last_upload_time = time.time() bpm_smooth = None with SMBus(1) as bus: init_max30102(bus) print("System Online. Buffer: 2 Seconds. Uploading every 5 seconds.") while True: try: red_raw, ir_raw = read_fifo(bus) except: continue if ir_raw > 35000: # Finger detected ir_buf.append(ir_raw) if len(ir_buf) > N: ir_buf.pop(0) if len(ir_buf) == N: bpm = estimate_bpm_autocorr(ir_buf, fs) if bpm and 40 < bpm < 180: # Smoothing factor (slightly faster response) bpm_smooth = bpm if bpm_smooth is None else 0.7*bpm_smooth + 0.3*bpm print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Wait 2s...'}") else: ir_buf = [] bpm_smooth = None print("Searching for Finger...") # Upload Check current_time = time.time() if (current_time - last_upload_time) >= UPLOAD_INTERVAL: if ir_raw > 35000 and bpm_smooth is not None: print(">>> Uploading snapshot...") send_to_supabase(bpm_smooth, ir_raw, red_raw) last_upload_time = current_time time.sleep(1/fs) if __name__ == "__main__": main()