Guest

Untitled 1061

Apr 5th, 2026
11
0
Never
Not a member of GistPad yet? Sign Up, it unlocks many cool features!
None 10.39 KB | None | 0 0
  1. import time
  2. import numpy as np
  3. from smbus2 import SMBus
  4. import requests
  5.  
  6. # =========================
  7. # 1) SUPABASE CONFIG
  8. # =========================
  9. SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM"
  10. TABLE = "heart_reading"
  11.  
  12. SEND_EVERY_SECONDS = 1.0 # Upload rate
  13.  
  14. def send_to_supabase(bpm, ir, red, spo2=98):
  15. url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
  16. headers = {
  17. "apikey": SUPABASE_ANON_KEY,
  18. "Authorization": f"Bearer {SUPABASE_ANON_KEY}",
  19. "Content-Type": "application/json",
  20. "Prefer": "return=minimal",
  21. }
  22. payload = {
  23. "bpm": int(round(bpm)) if bpm else None,
  24. "spo2": int(spo2),
  25. "ir": int(ir),
  26. "red": int(red)
  27. }
  28. try:
  29. r = requests.post(url, headers=headers, json=payload, timeout=2)
  30. return r.ok
  31. except:
  32. return False
  33.  
  34. # =========================
  35. # 2) MAX30102 REGISTERS
  36. # =========================
  37. MAX30102_ADDR = 0x57
  38. REG_FIFO_DATA = 0x07
  39. REG_FIFO_WR_PTR = 0x04
  40. REG_FIFO_RD_PTR = 0x06
  41. REG_MODE_CONFIG = 0x09
  42. REG_SPO2_CONFIG = 0x0A
  43. REG_LED1_PA = 0x0C # RED
  44. REG_LED2_PA = 0x0D # IR
  45.  
  46. def init_max30102(bus):
  47. bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x40) # Reset
  48. time.sleep(0.1)
  49. bus.write_byte_data(MAX30102_ADDR, REG_MODE_CONFIG, 0x03) # SpO2 Mode
  50. bus.write_byte_data(MAX30102_ADDR, REG_SPO2_CONFIG, 0x27) # 100Hz, 411us
  51. bus.write_byte_data(MAX30102_ADDR, REG_LED1_PA, 0x24) # Red LED
  52. bus.write_byte_data(MAX30102_ADDR, REG_LED2_PA, 0x24) # IR LED
  53. bus.write_byte_data(MAX30102_ADDR, REG_FIFO_WR_PTR, 0x00)
  54. bus.write_byte_data(MAX30102_ADDR, REG_FIFO_RD_PTR, 0x00)
  55.  
  56. def read_fifo(bus):
  57. d = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
  58. red = ((d[0] << 16) | (d[1] << 8) | d[2]) & 0x3FFFF
  59. ir = ((d[3] << 16) | (d[4] << 8) | d[5]) & 0x3FFFF
  60. return red, ir
  61.  
  62. # =========================
  63. # 3) BPM ALGORITHM
  64. # =========================
  65. def estimate_bpm_autocorr(ir_signal, fs_hz):
  66. x = np.array(ir_signal, dtype=np.float64)
  67. x = x - np.mean(x)
  68. # Bandpass filter simulation (Smoothing)
  69. w = 5
  70. x = np.convolve(x, np.ones(w)/w, mode='same')
  71. ac = np.correlate(x, x, mode='full')
  72. ac = ac[len(ac)//2:]
  73.  
  74. min_lag = int(fs_hz * 60 / 180) # 180 BPM
  75. max_lag = int(fs_hz * 60 / 45) # 45 BPM
  76.  
  77. if max_lag >= len(ac): return None
  78. seg = ac[min_lag:max_lag]
  79. if len(seg) == 0: return None
  80.  
  81. peak = np.argmax(seg) + min_lag
  82. return 60.0 * fs_hz / peak
  83.  
  84. # =========================
  85. # 4) MAIN LOOP
  86. # =========================
  87. def main():
  88. fs = 50
  89. N = fs * 4 # 4-second window for faster response
  90. ir_buf = []
  91. last_send = 0
  92. bpm_smooth = None
  93.  
  94. with SMBus(1) as bus:
  95. init_max30102(bus)
  96. print("System Ready. Place finger on sensor...")
  97.  
  98. while True:
  99. # Read sample
  100. try:
  101. red_raw, ir_raw = read_fifo(bus)
  102. except:
  103. continue
  104.  
  105. if ir_raw > 30000: # Finger detected
  106. ir_buf.append(ir_raw)
  107. if len(ir_buf) > N: ir_buf.pop(0)
  108.  
  109. if len(ir_buf) == N:
  110. bpm = estimate_bpm_autocorr(ir_buf, fs)
  111. if bpm and 40 < bpm < 180:
  112. bpm_smooth = bpm if bpm_smooth is None else 0.8*bpm_smooth + 0.2*bpm
  113.  
  114. print(f"RED: {red_raw} | IR: {ir_raw} | BPM: {int(bpm_smooth) if bpm_smooth else 'Calculating...'}")
  115. else:
  116. ir_buf = []
  117. bpm_smooth = None
  118. print("No Finger Detected")
  119.  
  120. # Supabase Upload
  121. now = time.time()
  122. if ir_raw > 30000 and (now - last_send) >= SEND_EVERY_SECONDS:
  123. success = send_to_supabase(bpm_smooth, ir_raw, red_raw)
  124. if success: print(">>> Sent to Supabase ✅")
  125. else: print(">>> Supabase Error ❌")
  126. last_send = now
  127.  
  128. time.sleep(1/fs)
  129.  
  130. if __name__ == "__main__":
  131. main()
RAW Paste Data Copied