import time import numpy as np from smbus2 import SMBus import requests # ========================= # 1) SUPABASE CONFIG # ========================= SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co" SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" # <-- put yours here TABLE = "heart_reading" SEND_EVERY_SECONDS = 1.0 # upload rate (e.g. 1 row/sec). Set 5.0 for 1 row/5 sec def send_to_supabase(bpm, ir, red, spo2=None): url = f"{SUPABASE_URL}/rest/v1/{TABLE}" headers = { "apikey": SUPABASE_ANON_KEY, "Authorization": f"Bearer {SUPABASE_ANON_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", } payload = { "bpm": int(round(bpm)) if bpm is not None else None, # INT bpm "spo2": int(round(spo2)) if spo2 is not None else None, # INT spo2 (if you have it) "ir": int(ir) if ir is not None else None, "red": int(red) if red is not None else None, # created_at auto by default now() } payload = {k: v for k, v in payload.items() if v is not None} r = requests.post(url, headers=headers, json=payload, timeout=8) if not r.ok: raise RuntimeError(f"Supabase insert failed: {r.status_code} {r.text}") # ========================= # 2) MAX30102 REGISTERS # ========================= MAX30102_ADDR = 0x57 REG_INTR_STATUS_1 = 0x00 REG_INTR_STATUS_2 = 0x01 REG_INTR_ENABLE_1 = 0x02 REG_INTR_ENABLE_2 = 0x03 REG_FIFO_WR_PTR = 0x04 REG_OVF_COUNTER = 0x05 REG_FIFO_RD_PTR = 0x06 REG_FIFO_DATA = 0x07 REG_FIFO_CONFIG = 0x08 REG_MODE_CONFIG = 0x09 REG_SPO2_CONFIG = 0x0A REG_LED1_PA = 0x0C # RED REG_LED2_PA = 0x0D # IR REG_PART_ID = 0xFF MODE_RESET = 0x40 MODE_SPO2 = 0x03 # Red + IR def write_reg(bus, reg, val): bus.write_byte_data(MAX30102_ADDR, reg, val & 0xFF) def read_reg(bus, reg): return bus.read_byte_data(MAX30102_ADDR, reg) def read_fifo_sample(bus): """ 6 bytes per sample: RED(3) + IR(3) in SPO2 mode Values are 18-bit """ data = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6) red = ((data[0] << 16) | (data[1] << 8) | data[2]) & 0x3FFFF ir = ((data[3] << 16) | (data[4] << 8) | data[5]) & 0x3FFFF return red, ir def init_max30102(bus, fs_hz=50): part = read_reg(bus, REG_PART_ID) print(f"MAX3010x PART ID: 0x{part:02X}") # Reset write_reg(bus, REG_MODE_CONFIG, MODE_RESET) time.sleep(0.1) # Disable interrupts (polling) write_reg(bus, REG_INTR_ENABLE_1, 0x00) write_reg(bus, REG_INTR_ENABLE_2, 0x00) # FIFO config: # sample avg = 4, rollover disabled, almost full = 0x0F fifo_cfg = (0b010 << 5) | (0 << 4) | 0x0F write_reg(bus, REG_FIFO_CONFIG, fifo_cfg) # LED currents (you can adjust 0x1F..0x7F) write_reg(bus, REG_LED1_PA, 0x3F) # RED write_reg(bus, REG_LED2_PA, 0x3F) # IR # SPO2 config: ADC range + sample rate + LED pulse width (18-bit) # ADC range: 4096nA -> 0b01 << 5 # Sample rate bits: # 50 Hz -> 0b010 # 100 Hz -> 0b011 # 25 Hz -> 0b001 # Pulse width 18-bit -> 0b11 if fs_hz == 25: sr = 0b001 elif fs_hz == 50: sr = 0b010 elif fs_hz == 100: sr = 0b011 else: sr = 0b010 # default to 50 spo2_cfg = (0b01 << 5) | (sr << 2) | 0b11 write_reg(bus, REG_SPO2_CONFIG, spo2_cfg) # SPO2 mode (Red + IR) write_reg(bus, REG_MODE_CONFIG, MODE_SPO2) # Clear FIFO pointers write_reg(bus, REG_FIFO_WR_PTR, 0x00) write_reg(bus, REG_OVF_COUNTER, 0x00) write_reg(bus, REG_FIFO_RD_PTR, 0x00) # ========================= # 3) BPM ESTIMATION (Autocorrelation = stable) # ========================= def estimate_bpm_autocorr(ir_signal, fs_hz): x = np.array(ir_signal, dtype=np.float64) x = x - np.mean(x) # Smooth ~80ms w = max(3, int(fs_hz * 0.08)) x = np.convolve(x, np.ones(w) / w, mode="same") ac = np.correlate(x, x, mode="full") ac = ac[len(ac) // 2:] # positive lags # Search plausible heart rates min_bpm, max_bpm = 45, 180 min_lag = int(fs_hz * 60 / max_bpm) max_lag = int(fs_hz * 60 / min_bpm) if max_lag >= len(ac): return None seg = ac[min_lag:max_lag] if len(seg) < 5: return None best = int(np.argmax(seg)) + min_lag if best <= 0: return None bpm = 60.0 * fs_hz / best return bpm # ========================= # 4) MAIN LOOP # ========================= def main(): fs = 50 # keep 50 for stable BPM window_seconds = 8 N = fs * window_seconds ir_buf = [] red_buf = [] last_send = 0.0 bpm_smooth = None FINGER_STD_MIN = 2000 # if too strict, lower to 1000 with SMBus(1) as bus: init_max30102(bus, fs_hz=fs) print("Reading... (place finger firmly on sensor, cover from room light)") # Prime buffer while len(ir_buf) < N: red, ir = read_fifo_sample(bus) red_buf.append(red) ir_buf.append(ir) time.sleep(1.0 / fs) while True: red, ir = read_fifo_sample(bus) red_buf.append(red) ir_buf.append(ir) red_buf = red_buf[-N:] ir_buf = ir_buf[-N:] ir_mean = int(np.mean(ir_buf)) ir_std = float(np.std(ir_buf)) # Finger detection gate if ir_std < FINGER_STD_MIN: bpm = None else: bpm = estimate_bpm_autocorr(ir_buf, fs) # Optional smoothing if bpm is not None: bpm_smooth = bpm if bpm_smooth is None else 0.7 * bpm_smooth + 0.3 * bpm if bpm_smooth is None: print(f"IR mean={ir_mean} std={int(ir_std)} | BPM=... (hold still / adjust finger)") else: print(f"IR mean={ir_mean} std={int(ir_std)} | BPM={bpm_smooth:.1f}") # Send to Supabase at fixed rate now = time.time() if bpm_smooth is not None and (now - last_send) >= SEND_EVERY_SECONDS: try: send_to_supabase( bpm=bpm_smooth, ir=ir, red=red, spo2=98 # set real spo2 if you calculate it later ) print("✅ Sent to Supabase") except Exception as e: print("❌ Supabase error:", e) last_send = now # IMPORTANT: sampling delay matches fs time.sleep(1.0 / fs) if __name__ == "__main__": main()