Files
MedMate-Medicine-Tracker/coordinator.py
2025-08-17 23:02:10 +10:00

319 lines
11 KiB
Python

"""DataUpdateCoordinator for MedMate integration."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, time, date
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
DOMAIN,
STORAGE_KEY,
STORAGE_VERSION,
CONF_MEDICINE_NAME,
CONF_ACTIVE_INGREDIENT,
CONF_STRENGTH,
CONF_PACK_SIZE,
CONF_SCHEDULE,
CONF_PRESCRIPTION,
CONF_DAYS,
CONF_TIMES,
CONF_EXPIRY_DATE,
CONF_REPEATS_LEFT,
CONF_ISSUE_DATE,
CONF_DOCTOR,
CONF_TOTAL_REPEATS,
TIME_SLOTS,
)
_LOGGER = logging.getLogger(__name__)
UPDATE_INTERVAL = timedelta(minutes=5)
# Default times for time slots
DEFAULT_TIMES = {
"morning": time(8, 0),
"lunchtime": time(12, 0),
"dinner": time(18, 0),
"night": time(22, 0),
}
class MedMateDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching MedMate data."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=UPDATE_INTERVAL,
)
self.entry = entry
self.store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
self.medicine_id = entry.data.get("medicine_id")
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from storage."""
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
_LOGGER.warning("Medicine %s not found in storage", self.medicine_id)
return {}
medicine_data = medicines[self.medicine_id]
# Clean the data to ensure no unwanted keys are present
cleaned_data = self._clean_medicine_data(medicine_data)
# Calculate derived values
cleaned_data = self._calculate_derived_data(cleaned_data)
return cleaned_data
def _clean_medicine_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]:
"""Clean medicine data to remove any unwanted keys from config flow."""
if not medicine_data:
return {}
# Define the expected structure
cleaned = {}
# Basic medicine info
for key in [CONF_MEDICINE_NAME, CONF_ACTIVE_INGREDIENT, CONF_STRENGTH, CONF_PACK_SIZE]:
if key in medicine_data:
cleaned[key] = medicine_data[key]
# Clean schedule data
schedule = medicine_data.get(CONF_SCHEDULE, {})
if isinstance(schedule, dict):
cleaned_schedule = {}
# Only include expected schedule keys
if CONF_DAYS in schedule and isinstance(schedule[CONF_DAYS], list):
cleaned_schedule[CONF_DAYS] = schedule[CONF_DAYS]
if CONF_TIMES in schedule and isinstance(schedule[CONF_TIMES], list):
cleaned_schedule[CONF_TIMES] = schedule[CONF_TIMES]
cleaned[CONF_SCHEDULE] = cleaned_schedule
# Clean prescription data
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
if isinstance(prescription, dict):
cleaned_prescription = {}
# Only include expected prescription keys
for key in [CONF_ISSUE_DATE, CONF_EXPIRY_DATE, CONF_DOCTOR, CONF_TOTAL_REPEATS, CONF_REPEATS_LEFT]:
if key in prescription:
cleaned_prescription[key] = prescription[key]
cleaned[CONF_PRESCRIPTION] = cleaned_prescription
# Include other expected keys
for key in ["inventory", "last_taken"]:
if key in medicine_data:
cleaned[key] = medicine_data[key]
_LOGGER.debug("Cleaned medicine data: %s", cleaned)
return cleaned
def _calculate_derived_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]:
"""Calculate derived data for the medicine."""
now = datetime.now()
today = now.date()
# Check if prescription is still active
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
expiry_date = prescription.get(CONF_EXPIRY_DATE)
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
prescription_active = False
if expiry_date and repeats_left > 0:
if isinstance(expiry_date, str):
expiry_date = datetime.fromisoformat(expiry_date).date()
prescription_active = today <= expiry_date
medicine_data["prescription_active"] = prescription_active
# Calculate next dose time
next_dose = self._calculate_next_dose(medicine_data, now)
medicine_data["next_dose"] = next_dose
# Check if due for dose
medicine_data["dose_due"] = self._is_dose_due(medicine_data, now)
return medicine_data
def _calculate_next_dose(self, medicine_data: dict[str, Any], now: datetime) -> datetime | None:
"""Calculate the next dose time."""
schedule = medicine_data.get(CONF_SCHEDULE, {})
days = schedule.get(CONF_DAYS, [])
times = schedule.get(CONF_TIMES, [])
if not days or not times or not isinstance(days, list) or not isinstance(times, list):
return None
# Get current day of week (0=Monday, 6=Sunday)
current_weekday = now.weekday()
current_date = now.date()
current_time = now.time()
# Convert day names to weekday numbers
weekday_map = {
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6
}
scheduled_weekdays = []
for day in days:
if isinstance(day, str) and day in weekday_map:
scheduled_weekdays.append(weekday_map[day])
if not scheduled_weekdays:
return None
# Get dose times for today and future
dose_times = []
for time_slot in times:
if isinstance(time_slot, str) and time_slot in DEFAULT_TIMES:
dose_times.append(DEFAULT_TIMES[time_slot])
if not dose_times:
return None
dose_times.sort()
# Check for remaining doses today
if current_weekday in scheduled_weekdays:
for dose_time in dose_times:
if dose_time > current_time:
return datetime.combine(current_date, dose_time)
# Find next scheduled day
days_ahead = 1
while days_ahead <= 7:
next_date = current_date + timedelta(days=days_ahead)
next_weekday = next_date.weekday()
if next_weekday in scheduled_weekdays:
return datetime.combine(next_date, dose_times[0])
days_ahead += 1
return None
def _is_dose_due(self, medicine_data: dict[str, Any], now: datetime) -> bool:
"""Check if a dose is currently due."""
schedule = medicine_data.get(CONF_SCHEDULE, {})
days = schedule.get(CONF_DAYS, [])
times = schedule.get(CONF_TIMES, [])
last_taken = medicine_data.get("last_taken", {})
if not days or not times or not isinstance(days, list) or not isinstance(times, list):
return False
current_weekday = now.strftime("%A").lower()
current_time = now.time()
current_date = now.date().isoformat()
if current_weekday not in days:
return False
# Check each scheduled time slot
for time_slot in times:
if not isinstance(time_slot, str) or time_slot not in DEFAULT_TIMES:
continue
slot_time = DEFAULT_TIMES[time_slot]
# Check if we're within 30 minutes of the dose time
dose_datetime = datetime.combine(now.date(), slot_time)
time_diff = abs((now - dose_datetime).total_seconds() / 60) # minutes
if time_diff <= 30: # Within 30 minutes
# Check if already taken today for this time slot
dose_key = f"{current_date}_{time_slot}"
if dose_key not in last_taken:
return True
return False
async def async_take_dose(self, time_slot: str) -> bool:
"""Record that a dose was taken."""
try:
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
return False
medicine_data = medicines[self.medicine_id]
last_taken = medicine_data.get("last_taken", {})
# Record the dose
now = datetime.now()
dose_key = f"{now.date().isoformat()}_{time_slot}"
last_taken[dose_key] = now.isoformat()
# Reduce inventory
current_inventory = medicine_data.get("inventory", 0)
if current_inventory > 0:
medicine_data["inventory"] = current_inventory - 1
medicine_data["last_taken"] = last_taken
medicines[self.medicine_id] = medicine_data
await self.store.async_save({"medicines": medicines})
await self.async_request_refresh()
return True
except Exception as err:
_LOGGER.error("Error recording dose: %s", err)
return False
async def async_fill_prescription(self) -> bool:
"""Fill the prescription (add inventory, reduce repeats)."""
try:
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
return False
medicine_data = medicines[self.medicine_id]
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
# Check if prescription is still active
if not medicine_data.get("prescription_active", False):
return False
# Reduce repeats left
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
if repeats_left <= 0:
return False
prescription[CONF_REPEATS_LEFT] = repeats_left - 1
# Add to inventory
pack_size = medicine_data.get(CONF_PACK_SIZE, 30)
current_inventory = medicine_data.get("inventory", 0)
medicine_data["inventory"] = current_inventory + pack_size
medicine_data[CONF_PRESCRIPTION] = prescription
medicines[self.medicine_id] = medicine_data
await self.store.async_save({"medicines": medicines})
await self.async_request_refresh()
return True
except Exception as err:
_LOGGER.error("Error filling prescription: %s", err)
return False