"""DataUpdateCoordinator for MedMate integration.""" from __future__ import annotations import logging from datetime import datetime, timedelta, time, date from typing import Any from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.helpers.storage import Store from homeassistant.helpers.update_coordinator import DataUpdateCoordinator from .const import ( DOMAIN, STORAGE_KEY, STORAGE_VERSION, CONF_SCHEDULE, CONF_PRESCRIPTION, CONF_DAYS, CONF_TIMES, CONF_EXPIRY_DATE, CONF_REPEATS_LEFT, TIME_SLOTS, ) _LOGGER = logging.getLogger(__name__) UPDATE_INTERVAL = timedelta(minutes=5) # Default times for time slots DEFAULT_TIMES = { "morning": time(8, 0), "lunchtime": time(12, 0), "dinner": time(18, 0), "night": time(22, 0), } class MedMateDataUpdateCoordinator(DataUpdateCoordinator): """Class to manage fetching MedMate data.""" def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: """Initialize the coordinator.""" super().__init__( hass, _LOGGER, name=DOMAIN, update_interval=UPDATE_INTERVAL, ) self.entry = entry self.store = Store(hass, STORAGE_VERSION, STORAGE_KEY) self.medicine_id = entry.data.get("medicine_id") async def _async_update_data(self) -> dict[str, Any]: """Fetch data from storage.""" data = await self.store.async_load() or {} medicines = data.get("medicines", {}) if self.medicine_id not in medicines: _LOGGER.warning("Medicine %s not found in storage", self.medicine_id) return {} medicine_data = medicines[self.medicine_id] # Calculate derived values medicine_data = self._calculate_derived_data(medicine_data) return medicine_data def _calculate_derived_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]: """Calculate derived data for the medicine.""" now = datetime.now() today = now.date() # Check if prescription is still active prescription = medicine_data.get(CONF_PRESCRIPTION, {}) expiry_date = prescription.get(CONF_EXPIRY_DATE) repeats_left = prescription.get(CONF_REPEATS_LEFT, 0) prescription_active = False if expiry_date and repeats_left > 0: if isinstance(expiry_date, str): expiry_date = datetime.fromisoformat(expiry_date).date() prescription_active = today <= expiry_date medicine_data["prescription_active"] = prescription_active # Calculate next dose time next_dose = self._calculate_next_dose(medicine_data, now) medicine_data["next_dose"] = next_dose # Check if due for dose medicine_data["dose_due"] = self._is_dose_due(medicine_data, now) return medicine_data def _calculate_next_dose(self, medicine_data: dict[str, Any], now: datetime) -> datetime | None: """Calculate the next dose time.""" schedule = medicine_data.get(CONF_SCHEDULE, {}) days = schedule.get(CONF_DAYS, []) times = schedule.get(CONF_TIMES, []) if not days or not times: return None # Get current day of week (0=Monday, 6=Sunday) current_weekday = now.weekday() current_date = now.date() current_time = now.time() # Convert day names to weekday numbers weekday_map = { "monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6 } scheduled_weekdays = [weekday_map[day] for day in days if day in weekday_map] if not scheduled_weekdays: return None # Get dose times for today and future dose_times = [] for time_slot in times: if time_slot in DEFAULT_TIMES: dose_times.append(DEFAULT_TIMES[time_slot]) if not dose_times: return None dose_times.sort() # Check for remaining doses today if current_weekday in scheduled_weekdays: for dose_time in dose_times: if dose_time > current_time: return datetime.combine(current_date, dose_time) # Find next scheduled day days_ahead = 1 while days_ahead <= 7: next_date = current_date + timedelta(days=days_ahead) next_weekday = next_date.weekday() if next_weekday in scheduled_weekdays: return datetime.combine(next_date, dose_times[0]) days_ahead += 1 return None def _is_dose_due(self, medicine_data: dict[str, Any], now: datetime) -> bool: """Check if a dose is currently due.""" schedule = medicine_data.get(CONF_SCHEDULE, {}) days = schedule.get(CONF_DAYS, []) times = schedule.get(CONF_TIMES, []) last_taken = medicine_data.get("last_taken", {}) if not days or not times: return False current_weekday = now.strftime("%A").lower() current_time = now.time() current_date = now.date().isoformat() if current_weekday not in days: return False # Check each scheduled time slot for time_slot in times: if time_slot not in DEFAULT_TIMES: continue slot_time = DEFAULT_TIMES[time_slot] # Check if we're within 30 minutes of the dose time dose_datetime = datetime.combine(now.date(), slot_time) time_diff = abs((now - dose_datetime).total_seconds() / 60) # minutes if time_diff <= 30: # Within 30 minutes # Check if already taken today for this time slot dose_key = f"{current_date}_{time_slot}" if dose_key not in last_taken: return True return False async def async_take_dose(self, time_slot: str) -> bool: """Record that a dose was taken.""" try: data = await self.store.async_load() or {} medicines = data.get("medicines", {}) if self.medicine_id not in medicines: return False medicine_data = medicines[self.medicine_id] last_taken = medicine_data.get("last_taken", {}) # Record the dose now = datetime.now() dose_key = f"{now.date().isoformat()}_{time_slot}" last_taken[dose_key] = now.isoformat() # Reduce inventory current_inventory = medicine_data.get("inventory", 0) if current_inventory > 0: medicine_data["inventory"] = current_inventory - 1 medicine_data["last_taken"] = last_taken medicines[self.medicine_id] = medicine_data await self.store.async_save({"medicines": medicines}) await self.async_request_refresh() return True except Exception as err: _LOGGER.error("Error recording dose: %s", err) return False async def async_fill_prescription(self) -> bool: """Fill the prescription (add inventory, reduce repeats).""" try: data = await self.store.async_load() or {} medicines = data.get("medicines", {}) if self.medicine_id not in medicines: return False medicine_data = medicines[self.medicine_id] prescription = medicine_data.get(CONF_PRESCRIPTION, {}) # Check if prescription is still active if not medicine_data.get("prescription_active", False): return False # Reduce repeats left repeats_left = prescription.get(CONF_REPEATS_LEFT, 0) if repeats_left <= 0: return False prescription[CONF_REPEATS_LEFT] = repeats_left - 1 # Add to inventory pack_size = medicine_data.get("pack_size", 30) current_inventory = medicine_data.get("inventory", 0) medicine_data["inventory"] = current_inventory + pack_size medicine_data[CONF_PRESCRIPTION] = prescription medicines[self.medicine_id] = medicine_data await self.store.async_save({"medicines": medicines}) await self.async_request_refresh() return True except Exception as err: _LOGGER.error("Error filling prescription: %s", err) return False