import time import numpy as np from smbus2 import SMBus import requests # ========================= # 1) SUPABASE CONFIG # ========================= SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co" SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" TABLE = "heart_reading" SEND_EVERY_SECONDS = 1.0 # Upload rate def send_to_supabase(bpm, ir, red, spo2=98): url = f"{SUPABASE_URL}/rest/v1/{TABLE}" headers = { "apikey": SUPABASE_ANON_KEY, "Authorization": f"Bearer {SUPABASE_ANON_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", } payload = { "bpm": int(round(bpm)) if bpm else None, "spo2": int(spo2), "ir": int(ir), "red": int(red) } try: r = requests.post(url, headers=headers, json=payload, timeout=2) return r.ok except: return False # ========================= # 2) MAX30102 REGISTERS # ========================= MAX30102_ADDR = 0x57 REG_FIFO_DATA = 0x07 REG_FIFO_WR_PTR = 0x04 REG_FIFO_RD_PTR = 0x06 REG_MODE_CONFIG = 0x09 REG_SPO2_CONFIG = 0x0A REG_LED1_PA = 0x0C # RED REG_LED2_PA = 0x0D # IR def init_max30102(bus): bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset time.sleep(0.1) bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz, 411us bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24) # Red LED bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24) # IR LED bus.write_byte_data(MAX30102_ADDR, REG_FIFO_WR_PTR, 0x00) bus.write_byte_data(MAX30102_ADDR, REG_FIFO_RD_PTR, 0x00) def read_fifo(bus): d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6) red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF return red, ir # ========================= # 3) BPM ALGORITHM # ========================= def estimate_bpm_autocorr(ir_signal, fs_hz): x = np.array(ir_signal, dtype=np.float64) x = x - np.mean(x) # Bandpass filter simulation (Smoothing) w = 5 x = np.convolve(x, np.ones(w)/w, mode='same') ac = np.correlate(x, x, mode='full') ac = ac[len(ac)//2:] min_lag = int(fs_hz * 60 / 180) # 180 BPM max_lag = int(fs_hz * 60 / 45) # 45 BPM if max_lag >= len(ac): return None seg = ac[min_lag:max_lag] if len(seg) == 0: return None peak = np.argmax(seg) + min_lag return 60.0 * fs_hz / peak # ========================= # 4) MAIN LOOP # ========================= def main(): fs = 50 N = fs * 4 # 4-second window for faster response ir_buf = [] last_send = 0 bpm_smooth = None with SMBus(1) as bus: init_max30102(bus) print("System Ready. Place finger on sensor...") while True: # Read sample try: red_raw, ir_raw = read_fifo(bus) except: continue if ir_raw > 30000: # Finger detected ir_buf.append(ir_raw) if len(ir_buf) > N: ir_buf.pop(0) if len(ir_buf) == N: bpm = estimate_bpm_autocorr(ir_buf, fs) if bpm and 40 < bpm < 180: bpm_smooth = bpm if bpm_smooth is None else 0.8*bpm_smooth + 0.2*bpm print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Calculating...'}") else: ir_buf = [] bpm_smooth = None print("No Finger Detected") # Supabase Upload now = time.time() if ir_raw > 30000 and (now - last_send) >= SEND_EVERY_SECONDS: success = send_to_supabase(bpm_smooth, ir_raw, red_raw) if success: print(">>> Sent to Supabase ✅") else: print(">>> Supabase Error ❌") last_send = now time.sleep(1/fs) if __name__ == "__main__": main()