import time
import numpy as np
from smbus2 import SMBus
import requests
# =========================
# 1) SUPABASE CONFIG
# =========================
SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" # <-- put yours here
TABLE = "heart_reading"
SEND_EVERY_SECONDS = 1.0 # upload rate (e.g. 1 row/sec). Set 5.0 for 1 row/5 sec
def send_to_supabase(bpm, ir, red, spo2=None):
url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
headers = {
"apikey": SUPABASE_ANON_KEY,
"Authorization": f"Bearer {SUPABASE_ANON_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal",
}
payload = {
"bpm": int(round(bpm)) if bpm is not None else None, # INT bpm
"spo2": int(round(spo2)) if spo2 is not None else None, # INT spo2 (if you have it)
"ir": int(ir) if ir is not None else None,
"red": int(red) if red is not None else None,
# created_at auto by default now()
}
payload = {k: v for k, v in payload.items() if v is not None}
r = requests.post(url, headers=headers, json=payload, timeout=8)
if not r.ok:
raise RuntimeError(f"Supabase insert failed: {r.status_code} {r.text}")
# =========================
# 2) MAX30102 REGISTERS
# =========================
MAX30102_ADDR = 0x57
REG_INTR_STATUS_1 = 0x00
REG_INTR_STATUS_2 = 0x01
REG_INTR_ENABLE_1 = 0x02
REG_INTR_ENABLE_2 = 0x03
REG_FIFO_WR_PTR = 0x04
REG_OVF_COUNTER = 0x05
REG_FIFO_RD_PTR = 0x06
REG_FIFO_DATA = 0x07
REG_FIFO_CONFIG = 0x08
REG_MODE_CONFIG = 0x09
REG_SPO2_CONFIG = 0x0A
REG_LED1_PA = 0x0C # RED
REG_LED2_PA = 0x0D # IR
REG_PART_ID = 0xFF
MODE_RESET = 0x40
MODE_SPO2 = 0x03 # Red + IR
def write_reg(bus, reg, val):
bus.write_byte_data(MAX30102_ADDR, reg, val & 0xFF)
def read_reg(bus, reg):
return bus.read_byte_data(MAX30102_ADDR, reg)
def read_fifo_sample(bus):
"""
6 bytes per sample: RED(3) + IR(3) in SPO2 mode
Values are 18-bit
"""
data = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
red = ((data[0] << 16) | (data[1] << 8) | data[2]) & 0x3FFFF
ir = ((data[3] << 16) | (data[4] << 8) | data[5]) & 0x3FFFF
return red, ir
def init_max30102(bus, fs_hz=50):
part = read_reg(bus, REG_PART_ID)
print(f"MAX3010x PART ID: 0x{part:02X}")
# Reset
write_reg(bus, REG_MODE_CONFIG, MODE_RESET)
time.sleep(0.1)
# Disable interrupts (polling)
write_reg(bus, REG_INTR_ENABLE_1, 0x00)
write_reg(bus, REG_INTR_ENABLE_2, 0x00)
# FIFO config:
# sample avg = 4, rollover disabled, almost full = 0x0F
fifo_cfg = (0b010 << 5) | (0 << 4) | 0x0F
write_reg(bus, REG_FIFO_CONFIG, fifo_cfg)
# LED currents (you can adjust 0x1F..0x7F)
write_reg(bus, REG_LED1_PA, 0x3F) # RED
write_reg(bus, REG_LED2_PA, 0x3F) # IR
# SPO2 config: ADC range + sample rate + LED pulse width (18-bit)
# ADC range: 4096nA -> 0b01 << 5
# Sample rate bits:
# 50 Hz -> 0b010
# 100 Hz -> 0b011
# 25 Hz -> 0b001
# Pulse width 18-bit -> 0b11
if fs_hz == 25:
sr = 0b001
elif fs_hz == 50:
sr = 0b010
elif fs_hz == 100:
sr = 0b011
else:
sr = 0b010 # default to 50
spo2_cfg = (0b01 << 5) | (sr << 2) | 0b11
write_reg(bus, REG_SPO2_CONFIG, spo2_cfg)
# SPO2 mode (Red + IR)
write_reg(bus, REG_MODE_CONFIG, MODE_SPO2)
# Clear FIFO pointers
write_reg(bus, REG_FIFO_WR_PTR, 0x00)
write_reg(bus, REG_OVF_COUNTER, 0x00)
write_reg(bus, REG_FIFO_RD_PTR, 0x00)
# =========================
# 3) BPM ESTIMATION (Autocorrelation = stable)
# =========================
def estimate_bpm_autocorr(ir_signal, fs_hz):
x = np.array(ir_signal, dtype=np.float64)
x = x - np.mean(x)
# Smooth ~80ms
w = max(3, int(fs_hz * 0.08))
x = np.convolve(x, np.ones(w) / w, mode="same")
ac = np.correlate(x, x, mode="full")
ac = ac[len(ac) // 2:] # positive lags
# Search plausible heart rates
min_bpm, max_bpm = 45, 180
min_lag = int(fs_hz * 60 / max_bpm)
max_lag = int(fs_hz * 60 / min_bpm)
if max_lag >= len(ac):
return None
seg = ac[min_lag:max_lag]
if len(seg) < 5:
return None
best = int(np.argmax(seg)) + min_lag
if best <= 0:
return None
bpm = 60.0 * fs_hz / best
return bpm
# =========================
# 4) MAIN LOOP
# =========================
def main():
fs = 50 # keep 50 for stable BPM
window_seconds = 8
N = fs * window_seconds
ir_buf = []
red_buf = []
last_send = 0.0
bpm_smooth = None
FINGER_STD_MIN = 2000 # if too strict, lower to 1000
with SMBus(1) as bus:
init_max30102(bus, fs_hz=fs)
print("Reading... (place finger firmly on sensor, cover from room light)")
# Prime buffer
while len(ir_buf) < N:
red, ir = read_fifo_sample(bus)
red_buf.append(red)
ir_buf.append(ir)
time.sleep(1.0 / fs)
while True:
red, ir = read_fifo_sample(bus)
red_buf.append(red)
ir_buf.append(ir)
red_buf = red_buf[-N:]
ir_buf = ir_buf[-N:]
ir_mean = int(np.mean(ir_buf))
ir_std = float(np.std(ir_buf))
# Finger detection gate
if ir_std < FINGER_STD_MIN:
bpm = None
else:
bpm = estimate_bpm_autocorr(ir_buf, fs)
# Optional smoothing
if bpm is not None:
bpm_smooth = bpm if bpm_smooth is None else 0.7 * bpm_smooth + 0.3 * bpm
if bpm_smooth is None:
print(f"IR mean={ir_mean} std={int(ir_std)} | BPM=... (hold still / adjust finger)")
else:
print(f"IR mean={ir_mean} std={int(ir_std)} | BPM={bpm_smooth:.1f}")
# Send to Supabase at fixed rate
now = time.time()
if bpm_smooth is not None and (now - last_send) >= SEND_EVERY_SECONDS:
try:
send_to_supabase(
bpm=bpm_smooth,
ir=ir,
red=red,
spo2=98 # set real spo2 if you calculate it later
)
print("✅ Sent to Supabase")
except Exception as e:
print("❌ Supabase error:", e)
last_send = now
# IMPORTANT: sampling delay matches fs
time.sleep(1.0 / fs)
if __name__ == "__main__":
main()