import time
import numpy as np
from smbus2 import SMBus
import requests
# =========================
# 1) SUPABASE CONFIG
# =========================
SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM"
TABLE = "heart_reading"
SEND_EVERY_SECONDS = 1.0 # Upload rate
def send_to_supabase(bpm, ir, red, spo2=98):
url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
headers = {
"apikey": SUPABASE_ANON_KEY,
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
payload = {
"bpm": int(round(bpm)) if bpm else None,
"spo2": int(spo2),
"ir": int(ir),
"red": int(red)
}
try:
r = requests.post(url, headers=headers, json=payload, timeout=2)
return r.ok
except:
return False
# =========================
# 2) MAX30102 REGISTERS
# =========================
MAX30102_ADDR = 0x57
REG_FIFO_DATA = 0x07
REG_FIFO_WR_PTR = 0x04
REG_FIFO_RD_PTR = 0x06
REG_MODE_CONFIG = 0x09
REG_SPO2_CONFIG = 0x0A
REG_LED1_PA = 0x0C # RED
REG_LED2_PA = 0x0D # IR
def init_max30102(bus):
bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset
time.sleep(0.1)
bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode
bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz, 411us
bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24) # Red LED
bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24) # IR LED
bus.write_byte_data(MAX30102_ADDR, REG_FIFO_WR_PTR, 0x00)
bus.write_byte_data(MAX30102_ADDR, REG_FIFO_RD_PTR, 0x00)
def read_fifo(bus):
d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF
ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF
return red, ir
# =========================
# 3) BPM ALGORITHM
# =========================
def estimate_bpm_autocorr(ir_signal, fs_hz):
x = np.array(ir_signal, dtype=np.float64)
x = x - np.mean(x)
# Bandpass filter simulation (Smoothing)
w = 5
x = np.convolve(x, np.ones(w)/w, mode='same')
ac = np.correlate(x, x, mode='full')
ac = ac[len(ac)//2:]
min_lag = int(fs_hz * 60 / 180) # 180 BPM
max_lag = int(fs_hz * 60 / 45) # 45 BPM
if max_lag >= len(ac): return None
seg = ac[min_lag:max_lag]
if len(seg) == 0: return None
peak = np.argmax(seg) + min_lag
return 60.0 * fs_hz / peak
# =========================
# 4) MAIN LOOP
# =========================
def main():
fs = 50
N = fs * 4 # 4-second window for faster response
ir_buf = []
last_send = 0
bpm_smooth = None
with SMBus(1) as bus:
init_max30102(bus)
print("System Ready. Place finger on sensor...")
while True:
# Read sample
try:
red_raw, ir_raw = read_fifo(bus)
except:
continue
if ir_raw > 30000: # Finger detected
ir_buf.append(ir_raw)
if len(ir_buf) > N: ir_buf.pop(0)
if len(ir_buf) == N:
bpm = estimate_bpm_autocorr(ir_buf, fs)
if bpm and 40 < bpm < 180:
bpm_smooth = bpm if bpm_smooth is None else 0.8*bpm_smooth + 0.2*bpm
print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Calculating...'}")
else:
ir_buf = []
bpm_smooth = None
print("No Finger Detected")
# Supabase Upload
now = time.time()
if ir_raw > 30000 and (now - last_send) >= SEND_EVERY_SECONDS:
success = send_to_supabase(bpm_smooth, ir_raw, red_raw)
if success: print(">>> Sent to Supabase ✅")
else: print(">>> Supabase Error ❌")
last_send = now
time.sleep(1/fs)
if __name__ == "__main__":
main()