Guest

Untitled 1060

Apr 5th, 2026
10
0
Never
Not a member of GistPad yet? Sign Up, it unlocks many cool features!
None 17.99 KB | None | 0 0
  1. import time
  2. import numpy as np
  3. from smbus2 import SMBus
  4. import requests
  5.  
  6. # =========================
  7. # 1) SUPABASE CONFIG
  8. # =========================
  9. SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6ImJucnFvb3Z5YXd4eWpiZHFtdGl4Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjE2NjA3ODEsImV4cCI6MjA3NzIzNjc4MX0.2aXc6ZDLss-nXsj2WQNkvD9AnImr73xVGExKK1LQerM" # <-- put yours here
  10. TABLE = "heart_reading"
  11.  
  12. SEND_EVERY_SECONDS = 1.0 # upload rate (e.g. 1 row/sec). Set 5.0 for 1 row/5 sec
  13.  
  14.  
  15. def send_to_supabase(bpm, ir, red, spo2=None):
  16. url = f"{SUPABASE_URL}/rest/v1/{TABLE}"
  17. headers = {
  18. "apikey": SUPABASE_ANON_KEY,
  19. "Authorization": f"Bearer {SUPABASE_ANON_KEY}",
  20. "Content-Type": "application/json",
  21. "Prefer": "return=minimal",
  22. }
  23.  
  24. payload = {
  25. "bpm": int(round(bpm)) if bpm is not None else None, # INT bpm
  26. "spo2": int(round(spo2)) if spo2 is not None else None, # INT spo2 (if you have it)
  27. "ir": int(ir) if ir is not None else None,
  28. "red": int(red) if red is not None else None,
  29. # created_at auto by default now()
  30. }
  31.  
  32. payload = {k: v for k, v in payload.items() if v is not None}
  33.  
  34. r = requests.post(url, headers=headers, json=payload, timeout=8)
  35. if not r.ok:
  36. raise RuntimeError(f"Supabase insert failed: {r.status_code} {r.text}")
  37.  
  38.  
  39. # =========================
  40. # 2) MAX30102 REGISTERS
  41. # =========================
  42. MAX30102_ADDR = 0x57
  43.  
  44. REG_INTR_STATUS_1 = 0x00
  45. REG_INTR_STATUS_2 = 0x01
  46. REG_INTR_ENABLE_1 = 0x02
  47. REG_INTR_ENABLE_2 = 0x03
  48. REG_FIFO_WR_PTR = 0x04
  49. REG_OVF_COUNTER = 0x05
  50. REG_FIFO_RD_PTR = 0x06
  51. REG_FIFO_DATA = 0x07
  52. REG_FIFO_CONFIG = 0x08
  53. REG_MODE_CONFIG = 0x09
  54. REG_SPO2_CONFIG = 0x0A
  55. REG_LED1_PA = 0x0C # RED
  56. REG_LED2_PA = 0x0D # IR
  57. REG_PART_ID = 0xFF
  58.  
  59. MODE_RESET = 0x40
  60. MODE_SPO2 = 0x03 # Red + IR
  61.  
  62.  
  63. def write_reg(bus, reg, val):
  64. bus.write_byte_data(MAX30102_ADDR, reg, val & 0xFF)
  65.  
  66.  
  67. def read_reg(bus, reg):
  68. return bus.read_byte_data(MAX30102_ADDR, reg)
  69.  
  70.  
  71. def read_fifo_sample(bus):
  72. """
  73. 6 bytes per sample: RED(3) + IR(3) in SPO2 mode
  74. Values are 18-bit
  75. """
  76. data = bus.read_i2c_block_data(MAX30102_ADDR, REG_FIFO_DATA, 6)
  77. red = ((data[0] << 16) | (data[1] << 8) | data[2]) & 0x3FFFF
  78. ir = ((data[3] << 16) | (data[4] << 8) | data[5]) & 0x3FFFF
  79. return red, ir
  80.  
  81.  
  82. def init_max30102(bus, fs_hz=50):
  83. part = read_reg(bus, REG_PART_ID)
  84. print(f"MAX3010x PART ID: 0x{part:02X}")
  85.  
  86. # Reset
  87. write_reg(bus, REG_MODE_CONFIG, MODE_RESET)
  88. time.sleep(0.1)
  89.  
  90. # Disable interrupts (polling)
  91. write_reg(bus, REG_INTR_ENABLE_1, 0x00)
  92. write_reg(bus, REG_INTR_ENABLE_2, 0x00)
  93.  
  94. # FIFO config:
  95. # sample avg = 4, rollover disabled, almost full = 0x0F
  96. fifo_cfg = (0b010 << 5) | (0 << 4) | 0x0F
  97. write_reg(bus, REG_FIFO_CONFIG, fifo_cfg)
  98.  
  99. # LED currents (you can adjust 0x1F..0x7F)
  100. write_reg(bus, REG_LED1_PA, 0x3F) # RED
  101. write_reg(bus, REG_LED2_PA, 0x3F) # IR
  102.  
  103. # SPO2 config: ADC range + sample rate + LED pulse width (18-bit)
  104. # ADC range: 4096nA -> 0b01 << 5
  105. # Sample rate bits:
  106. # 50 Hz -> 0b010
  107. # 100 Hz -> 0b011
  108. # 25 Hz -> 0b001
  109. # Pulse width 18-bit -> 0b11
  110. if fs_hz == 25:
  111. sr = 0b001
  112. elif fs_hz == 50:
  113. sr = 0b010
  114. elif fs_hz == 100:
  115. sr = 0b011
  116. else:
  117. sr = 0b010 # default to 50
  118.  
  119. spo2_cfg = (0b01 << 5) | (sr << 2) | 0b11
  120. write_reg(bus, REG_SPO2_CONFIG, spo2_cfg)
  121.  
  122. # SPO2 mode (Red + IR)
  123. write_reg(bus, REG_MODE_CONFIG, MODE_SPO2)
  124.  
  125. # Clear FIFO pointers
  126. write_reg(bus, REG_FIFO_WR_PTR, 0x00)
  127. write_reg(bus, REG_OVF_COUNTER, 0x00)
  128. write_reg(bus, REG_FIFO_RD_PTR, 0x00)
  129.  
  130.  
  131. # =========================
  132. # 3) BPM ESTIMATION (Autocorrelation = stable)
  133. # =========================
  134. def estimate_bpm_autocorr(ir_signal, fs_hz):
  135. x = np.array(ir_signal, dtype=np.float64)
  136. x = x - np.mean(x)
  137.  
  138. # Smooth ~80ms
  139. w = max(3, int(fs_hz * 0.08))
  140. x = np.convolve(x, np.ones(w) / w, mode="same")
  141.  
  142. ac = np.correlate(x, x, mode="full")
  143. ac = ac[len(ac) // 2:] # positive lags
  144.  
  145. # Search plausible heart rates
  146. min_bpm, max_bpm = 45, 180
  147. min_lag = int(fs_hz * 60 / max_bpm)
  148. max_lag = int(fs_hz * 60 / min_bpm)
  149.  
  150. if max_lag >= len(ac):
  151. return None
  152.  
  153. seg = ac[min_lag:max_lag]
  154. if len(seg) < 5:
  155. return None
  156.  
  157. best = int(np.argmax(seg)) + min_lag
  158. if best <= 0:
  159. return None
  160.  
  161. bpm = 60.0 * fs_hz / best
  162. return bpm
  163.  
  164.  
  165. # =========================
  166. # 4) MAIN LOOP
  167. # =========================
  168. def main():
  169. fs = 50 # keep 50 for stable BPM
  170. window_seconds = 8
  171. N = fs * window_seconds
  172.  
  173. ir_buf = []
  174. red_buf = []
  175.  
  176. last_send = 0.0
  177. bpm_smooth = None
  178.  
  179. FINGER_STD_MIN = 2000 # if too strict, lower to 1000
  180.  
  181. with SMBus(1) as bus:
  182. init_max30102(bus, fs_hz=fs)
  183. print("Reading... (place finger firmly on sensor, cover from room light)")
  184.  
  185. # Prime buffer
  186. while len(ir_buf) < N:
  187. red, ir = read_fifo_sample(bus)
  188. red_buf.append(red)
  189. ir_buf.append(ir)
  190. time.sleep(1.0 / fs)
  191.  
  192. while True:
  193. red, ir = read_fifo_sample(bus)
  194. red_buf.append(red)
  195. ir_buf.append(ir)
  196.  
  197. red_buf = red_buf[-N:]
  198. ir_buf = ir_buf[-N:]
  199.  
  200. ir_mean = int(np.mean(ir_buf))
  201. ir_std = float(np.std(ir_buf))
  202.  
  203. # Finger detection gate
  204. if ir_std < FINGER_STD_MIN:
  205. bpm = None
  206. else:
  207. bpm = estimate_bpm_autocorr(ir_buf, fs)
  208.  
  209. # Optional smoothing
  210. if bpm is not None:
  211. bpm_smooth = bpm if bpm_smooth is None else 0.7 * bpm_smooth + 0.3 * bpm
  212.  
  213. if bpm_smooth is None:
  214. print(f"IR mean={ir_mean} std={int(ir_std)} | BPM=... (hold still / adjust finger)")
  215. else:
  216. print(f"IR mean={ir_mean} std={int(ir_std)} | BPM={bpm_smooth:.1f}")
  217.  
  218. # Send to Supabase at fixed rate
  219. now = time.time()
  220. if bpm_smooth is not None and (now - last_send) >= SEND_EVERY_SECONDS:
  221. try:
  222. send_to_supabase(
  223. bpm=bpm_smooth,
  224. ir=ir,
  225. red=red,
  226. spo2=98 # set real spo2 if you calculate it later
  227. )
  228. print("✅ Sent to Supabase")
  229. except Exception as e:
  230. print("❌ Supabase error:", e)
  231.  
  232. last_send = now
  233.  
  234. # IMPORTANT: sampling delay matches fs
  235. time.sleep(1.0 / fs)
  236.  
  237.  
  238. if __name__ == "__main__":
  239. main()
RAW Paste Data Copied