Not a member of GistPad yet?
Sign Up,
it unlocks many cool features!
- import time
- import numpy as np
- from smbus2 import SMBus
- import requests
- # =========================
- # 1) SUPABASE CONFIG
- # =========================
- SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co"
- SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM"
- TABLE = "heart_reading"
- SEND_EVERY_SECONDS = 1.0 # Upload rate
- def send_to_supabase(bpm, ir, red, spo2=98):
- url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
- headers = {
- "apikey": SUPABASE_ANON_KEY,
- "Authorization": f"Bearer {SUPABASE_ANON_KEY}",
- "Content-Type": "application/json",
- "Prefer": "return=minimal",
- }
- payload = {
- "bpm": int(round(bpm)) if bpm else None,
- "spo2": int(spo2),
- "ir": int(ir),
- "red": int(red)
- }
- try:
- r = requests.post(url, headers=headers, json=payload, timeout=2)
- return r.ok
- except:
- return False
- # =========================
- # 2) MAX30102 REGISTERS
- # =========================
- MAX30102_ADDR = 0x57
- REG_FIFO_DATA = 0x07
- REG_FIFO_WR_PTR = 0x04
- REG_FIFO_RD_PTR = 0x06
- REG_MODE_CONFIG = 0x09
- REG_SPO2_CONFIG = 0x0A
- REG_LED1_PA = 0x0C # RED
- REG_LED2_PA = 0x0D # IR
- def init_max30102(bus):
- bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset
- time.sleep(0.1)
- bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode
- bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz, 411us
- bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24) # Red LED
- bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24) # IR LED
- bus.write_byte_data(MAX30102_ADDR, REG_FIFO_WR_PTR, 0x00)
- bus.write_byte_data(MAX30102_ADDR, REG_FIFO_RD_PTR, 0x00)
- def read_fifo(bus):
- d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
- red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF
- ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF
- return red, ir
- # =========================
- # 3) BPM ALGORITHM
- # =========================
- def estimate_bpm_autocorr(ir_signal, fs_hz):
- x = np.array(ir_signal, dtype=np.float64)
- x = x - np.mean(x)
- # Bandpass filter simulation (Smoothing)
- w = 5
- x = np.convolve(x, np.ones(w)/w, mode='same')
- ac = np.correlate(x, x, mode='full')
- ac = ac[len(ac)//2:]
- min_lag = int(fs_hz * 60 / 180) # 180 BPM
- max_lag = int(fs_hz * 60 / 45) # 45 BPM
- if max_lag >= len(ac): return None
- seg = ac[min_lag:max_lag]
- if len(seg) == 0: return None
- peak = np.argmax(seg) + min_lag
- return 60.0 * fs_hz / peak
- # =========================
- # 4) MAIN LOOP
- # =========================
- def main():
- fs = 50
- N = fs * 4 # 4-second window for faster response
- ir_buf = []
- last_send = 0
- bpm_smooth = None
- with SMBus(1) as bus:
- init_max30102(bus)
- print("System Ready. Place finger on sensor...")
- while True:
- # Read sample
- try:
- red_raw, ir_raw = read_fifo(bus)
- except:
- continue
- if ir_raw > 30000: # Finger detected
- ir_buf.append(ir_raw)
- if len(ir_buf) > N: ir_buf.pop(0)
- if len(ir_buf) == N:
- bpm = estimate_bpm_autocorr(ir_buf, fs)
- if bpm and 40 < bpm < 180:
- bpm_smooth = bpm if bpm_smooth is None else 0.8*bpm_smooth + 0.2*bpm
- print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Calculating...'}")
- else:
- ir_buf = []
- bpm_smooth = None
- print("No Finger Detected")
- # Supabase Upload
- now = time.time()
- if ir_raw > 30000 and (now - last_send) >= SEND_EVERY_SECONDS:
- success = send_to_supabase(bpm_smooth, ir_raw, red_raw)
- if success: print(">>> Sent to Supabase ✅")
- else: print(">>> Supabase Error ❌")
- last_send = now
- time.sleep(1/fs)
- if __name__ == "__main__":
- main()
RAW Paste Data
Copied
