Not a member of GistPad yet?
Sign Up,
it unlocks many cool features!
- import time
- import numpy as np
- from smbus2 import SMBus
- import requests
- # =========================
- # 1) SUPABASE CONFIG
- # =========================
- SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co"
- SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" # <-- put yours here
- TABLE = "heart_reading"
- SEND_EVERY_SECONDS = 1.0 # upload rate (e.g. 1 row/sec). Set 5.0 for 1 row/5 sec
- def send_to_supabase(bpm, ir, red, spo2=None):
- url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
- headers = {
- "apikey": SUPABASE_ANON_KEY,
- "Authorization": f"Bearer {SUPABASE_ANON_KEY}",
- "Content-Type": "application/json",
- "Prefer": "return=minimal",
- }
- payload = {
- "bpm": int(round(bpm)) if bpm is not None else None, # INT bpm
- "spo2": int(round(spo2)) if spo2 is not None else None, # INT spo2 (if you have it)
- "ir": int(ir) if ir is not None else None,
- "red": int(red) if red is not None else None,
- # created_at auto by default now()
- }
- payload = {k: v for k, v in payload.items() if v is not None}
- r = requests.post(url, headers=headers, json=payload, timeout=8)
- if not r.ok:
- raise RuntimeError(f"Supabase insert failed: {r.status_code} {r.text}")
- # =========================
- # 2) MAX30102 REGISTERS
- # =========================
- MAX30102_ADDR = 0x57
- REG_INTR_STATUS_1 = 0x00
- REG_INTR_STATUS_2 = 0x01
- REG_INTR_ENABLE_1 = 0x02
- REG_INTR_ENABLE_2 = 0x03
- REG_FIFO_WR_PTR = 0x04
- REG_OVF_COUNTER = 0x05
- REG_FIFO_RD_PTR = 0x06
- REG_FIFO_DATA = 0x07
- REG_FIFO_CONFIG = 0x08
- REG_MODE_CONFIG = 0x09
- REG_SPO2_CONFIG = 0x0A
- REG_LED1_PA = 0x0C # RED
- REG_LED2_PA = 0x0D # IR
- REG_PART_ID = 0xFF
- MODE_RESET = 0x40
- MODE_SPO2 = 0x03 # Red + IR
- def write_reg(bus, reg, val):
- bus.write_byte_data(MAX30102_ADDR, reg, val & 0xFF)
- def read_reg(bus, reg):
- return bus.read_byte_data(MAX30102_ADDR, reg)
- def read_fifo_sample(bus):
- """
- 6 bytes per sample: RED(3) + IR(3) in SPO2 mode
- Values are 18-bit
- """
- data = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
- red = ((data[0] << 16) | (data[1] << 8) | data[2]) & 0x3FFFF
- ir = ((data[3] << 16) | (data[4] << 8) | data[5]) & 0x3FFFF
- return red, ir
- def init_max30102(bus, fs_hz=50):
- part = read_reg(bus, REG_PART_ID)
- print(f"MAX3010x PART ID: 0x{part:02X}")
- # Reset
- write_reg(bus, REG_MODE_CONFIG, MODE_RESET)
- time.sleep(0.1)
- # Disable interrupts (polling)
- write_reg(bus, REG_INTR_ENABLE_1, 0x00)
- write_reg(bus, REG_INTR_ENABLE_2, 0x00)
- # FIFO config:
- # sample avg = 4, rollover disabled, almost full = 0x0F
- fifo_cfg = (0b010 << 5) | (0 << 4) | 0x0F
- write_reg(bus, REG_FIFO_CONFIG, fifo_cfg)
- # LED currents (you can adjust 0x1F..0x7F)
- write_reg(bus, REG_LED1_PA, 0x3F) # RED
- write_reg(bus, REG_LED2_PA, 0x3F) # IR
- # SPO2 config: ADC range + sample rate + LED pulse width (18-bit)
- # ADC range: 4096nA -> 0b01 << 5
- # Sample rate bits:
- # 50 Hz -> 0b010
- # 100 Hz -> 0b011
- # 25 Hz -> 0b001
- # Pulse width 18-bit -> 0b11
- if fs_hz == 25:
- sr = 0b001
- elif fs_hz == 50:
- sr = 0b010
- elif fs_hz == 100:
- sr = 0b011
- else:
- sr = 0b010 # default to 50
- spo2_cfg = (0b01 << 5) | (sr << 2) | 0b11
- write_reg(bus, REG_SPO2_CONFIG, spo2_cfg)
- # SPO2 mode (Red + IR)
- write_reg(bus, REG_MODE_CONFIG, MODE_SPO2)
- # Clear FIFO pointers
- write_reg(bus, REG_FIFO_WR_PTR, 0x00)
- write_reg(bus, REG_OVF_COUNTER, 0x00)
- write_reg(bus, REG_FIFO_RD_PTR, 0x00)
- # =========================
- # 3) BPM ESTIMATION (Autocorrelation = stable)
- # =========================
- def estimate_bpm_autocorr(ir_signal, fs_hz):
- x = np.array(ir_signal, dtype=np.float64)
- x = x - np.mean(x)
- # Smooth ~80ms
- w = max(3, int(fs_hz * 0.08))
- x = np.convolve(x, np.ones(w) / w, mode="same")
- ac = np.correlate(x, x, mode="full")
- ac = ac[len(ac) // 2:] # positive lags
- # Search plausible heart rates
- min_bpm, max_bpm = 45, 180
- min_lag = int(fs_hz * 60 / max_bpm)
- max_lag = int(fs_hz * 60 / min_bpm)
- if max_lag >= len(ac):
- return None
- seg = ac[min_lag:max_lag]
- if len(seg) < 5:
- return None
- best = int(np.argmax(seg)) + min_lag
- if best <= 0:
- return None
- bpm = 60.0 * fs_hz / best
- return bpm
- # =========================
- # 4) MAIN LOOP
- # =========================
- def main():
- fs = 50 # keep 50 for stable BPM
- window_seconds = 8
- N = fs * window_seconds
- ir_buf = []
- red_buf = []
- last_send = 0.0
- bpm_smooth = None
- FINGER_STD_MIN = 2000 # if too strict, lower to 1000
- with SMBus(1) as bus:
- init_max30102(bus, fs_hz=fs)
- print("Reading... (place finger firmly on sensor, cover from room light)")
- # Prime buffer
- while len(ir_buf) < N:
- red, ir = read_fifo_sample(bus)
- red_buf.append(red)
- ir_buf.append(ir)
- time.sleep(1.0 / fs)
- while True:
- red, ir = read_fifo_sample(bus)
- red_buf.append(red)
- ir_buf.append(ir)
- red_buf = red_buf[-N:]
- ir_buf = ir_buf[-N:]
- ir_mean = int(np.mean(ir_buf))
- ir_std = float(np.std(ir_buf))
- # Finger detection gate
- if ir_std < FINGER_STD_MIN:
- bpm = None
- else:
- bpm = estimate_bpm_autocorr(ir_buf, fs)
- # Optional smoothing
- if bpm is not None:
- bpm_smooth = bpm if bpm_smooth is None else 0.7 * bpm_smooth + 0.3 * bpm
- if bpm_smooth is None:
- print(f"IR mean={ir_mean} std={int(ir_std)} | BPM=... (hold still / adjust finger)")
- else:
- print(f"IR mean={ir_mean} std={int(ir_std)} | BPM={bpm_smooth:.1f}")
- # Send to Supabase at fixed rate
- now = time.time()
- if bpm_smooth is not None and (now - last_send) >= SEND_EVERY_SECONDS:
- try:
- send_to_supabase(
- bpm=bpm_smooth,
- ir=ir,
- red=red,
- spo2=98 # set real spo2 if you calculate it later
- )
- print("✅ Sent to Supabase")
- except Exception as e:
- print("❌ Supabase error:", e)
- last_send = now
- # IMPORTANT: sampling delay matches fs
- time.sleep(1.0 / fs)
- if __name__ == "__main__":
- main()
RAW Paste Data
Copied
