Not a member of GistPad yet?
Sign Up,
it unlocks many cool features!
- import time
- import numpy as np
- from smbus2 import SMBus
- import requests
- # =========================
- # 1) SUPABASE CONFIG
- # =========================
- SUPABASE_URL = "https://bnrqoovyawxyjbdqmtix.supabase.co"
- SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM"
- TABLE = "heart_reading"
- UPLOAD_INTERVAL = 5.0 # Upload every 5 seconds
- def send_to_supabase(bpm, ir, red, spo2=98):
- url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
- headers = {
- "apikey": SUPABASE_ANON_KEY,
- "Authorization": f"Bearer {SUPABASE_ANON_KEY}",
- "Content-Type": "application/json",
- "Prefer": "return=minimal",
- }
- payload = {
- "bpm": int(round(bpm)) if bpm else None,
- "spo2": int(spo2),
- "ir": int(ir),
- "red": int(red)
- }
- try:
- r = requests.post(url, headers=headers, json=payload, timeout=5)
- return r.ok
- except:
- return False
- # =========================
- # 2) MAX30102 CORE
- # =========================
- MAX30102_ADDR = 0x57
- REG_FIFO_DATA = 0x07
- REG_MODE_CONFIG = 0x09
- REG_SPO2_CONFIG = 0x0A
- REG_LED1_PA = 0x0C
- REG_LED2_PA = 0x0D
- def init_max30102(bus):
- bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset
- time.sleep(0.1)
- bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode
- bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz
- bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24)
- bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24)
- def read_fifo(bus):
- d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
- red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF
- ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF
- return red, ir
- def estimate_bpm_autocorr(ir_signal, fs_hz):
- x = np.array(ir_signal, dtype=np.float64)
- x = x - np.mean(x)
- w = 3 # Smaller smoothing window for smaller buffer
- x = np.convolve(x, np.ones(w)/w, mode='same')
- ac = np.correlate(x, x, mode='full')
- ac = ac[len(ac)//2:]
- min_lag = int(fs_hz * 60 / 180)
- max_lag = int(fs_hz * 60 / 45)
- if max_lag >= len(ac): return None
- seg = ac[min_lag:max_lag]
- if len(seg) == 0: return None
- return 60.0 * fs_hz / (np.argmax(seg) + min_lag)
- # =========================
- # 3) MAIN LOGIC
- # =========================
- def main():
- fs = 50
- N = fs * 2 # <--- CHANGED: Buffer is now 2 seconds (100 samples)
- ir_buf = []
- last_upload_time = time.time()
- bpm_smooth = None
- with SMBus(1) as bus:
- init_max30102(bus)
- print("System Online. Buffer: 2 Seconds. Uploading every 5 seconds.")
- while True:
- try:
- red_raw, ir_raw = read_fifo(bus)
- except:
- continue
- if ir_raw > 35000: # Finger detected
- ir_buf.append(ir_raw)
- if len(ir_buf) > N: ir_buf.pop(0)
- if len(ir_buf) == N:
- bpm = estimate_bpm_autocorr(ir_buf, fs)
- if bpm and 40 < bpm < 180:
- # Smoothing factor (slightly faster response)
- bpm_smooth = bpm if bpm_smooth is None else 0.7*bpm_smooth + 0.3*bpm
- print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Wait 2s...'}")
- else:
- ir_buf = []
- bpm_smooth = None
- print("Searching for Finger...")
- # Upload Check
- current_time = time.time()
- if (current_time - last_upload_time) >= UPLOAD_INTERVAL:
- if ir_raw > 35000 and bpm_smooth is not None:
- print(">>> Uploading snapshot...")
- send_to_supabase(bpm_smooth, ir_raw, red_raw)
- last_upload_time = current_time
- time.sleep(1/fs)
- if __name__ == "__main__":
- main()
RAW Paste Data
Copied
