319 lines
11 KiB
Python
319 lines
11 KiB
Python
"""DataUpdateCoordinator for MedMate integration."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta, time, date
|
|
from typing import Any
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.storage import Store
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
|
|
|
from .const import (
|
|
DOMAIN,
|
|
STORAGE_KEY,
|
|
STORAGE_VERSION,
|
|
CONF_MEDICINE_NAME,
|
|
CONF_ACTIVE_INGREDIENT,
|
|
CONF_STRENGTH,
|
|
CONF_PACK_SIZE,
|
|
CONF_SCHEDULE,
|
|
CONF_PRESCRIPTION,
|
|
CONF_DAYS,
|
|
CONF_TIMES,
|
|
CONF_EXPIRY_DATE,
|
|
CONF_REPEATS_LEFT,
|
|
CONF_ISSUE_DATE,
|
|
CONF_DOCTOR,
|
|
CONF_TOTAL_REPEATS,
|
|
TIME_SLOTS,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
UPDATE_INTERVAL = timedelta(minutes=5)
|
|
|
|
# Default times for time slots
|
|
DEFAULT_TIMES = {
|
|
"morning": time(8, 0),
|
|
"lunchtime": time(12, 0),
|
|
"dinner": time(18, 0),
|
|
"night": time(22, 0),
|
|
}
|
|
|
|
|
|
class MedMateDataUpdateCoordinator(DataUpdateCoordinator):
|
|
"""Class to manage fetching MedMate data."""
|
|
|
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
|
"""Initialize the coordinator."""
|
|
super().__init__(
|
|
hass,
|
|
_LOGGER,
|
|
name=DOMAIN,
|
|
update_interval=UPDATE_INTERVAL,
|
|
)
|
|
self.entry = entry
|
|
self.store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
|
|
self.medicine_id = entry.data.get("medicine_id")
|
|
|
|
async def _async_update_data(self) -> dict[str, Any]:
|
|
"""Fetch data from storage."""
|
|
data = await self.store.async_load() or {}
|
|
medicines = data.get("medicines", {})
|
|
|
|
if self.medicine_id not in medicines:
|
|
_LOGGER.warning("Medicine %s not found in storage", self.medicine_id)
|
|
return {}
|
|
|
|
medicine_data = medicines[self.medicine_id]
|
|
|
|
# Clean the data to ensure no unwanted keys are present
|
|
cleaned_data = self._clean_medicine_data(medicine_data)
|
|
|
|
# Calculate derived values
|
|
cleaned_data = self._calculate_derived_data(cleaned_data)
|
|
|
|
return cleaned_data
|
|
|
|
def _clean_medicine_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""Clean medicine data to remove any unwanted keys from config flow."""
|
|
if not medicine_data:
|
|
return {}
|
|
|
|
# Define the expected structure
|
|
cleaned = {}
|
|
|
|
# Basic medicine info
|
|
for key in [CONF_MEDICINE_NAME, CONF_ACTIVE_INGREDIENT, CONF_STRENGTH, CONF_PACK_SIZE]:
|
|
if key in medicine_data:
|
|
cleaned[key] = medicine_data[key]
|
|
|
|
# Clean schedule data
|
|
schedule = medicine_data.get(CONF_SCHEDULE, {})
|
|
if isinstance(schedule, dict):
|
|
cleaned_schedule = {}
|
|
|
|
# Only include expected schedule keys
|
|
if CONF_DAYS in schedule and isinstance(schedule[CONF_DAYS], list):
|
|
cleaned_schedule[CONF_DAYS] = schedule[CONF_DAYS]
|
|
if CONF_TIMES in schedule and isinstance(schedule[CONF_TIMES], list):
|
|
cleaned_schedule[CONF_TIMES] = schedule[CONF_TIMES]
|
|
|
|
cleaned[CONF_SCHEDULE] = cleaned_schedule
|
|
|
|
# Clean prescription data
|
|
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
|
|
if isinstance(prescription, dict):
|
|
cleaned_prescription = {}
|
|
|
|
# Only include expected prescription keys
|
|
for key in [CONF_ISSUE_DATE, CONF_EXPIRY_DATE, CONF_DOCTOR, CONF_TOTAL_REPEATS, CONF_REPEATS_LEFT]:
|
|
if key in prescription:
|
|
cleaned_prescription[key] = prescription[key]
|
|
|
|
cleaned[CONF_PRESCRIPTION] = cleaned_prescription
|
|
|
|
# Include other expected keys
|
|
for key in ["inventory", "last_taken"]:
|
|
if key in medicine_data:
|
|
cleaned[key] = medicine_data[key]
|
|
|
|
_LOGGER.debug("Cleaned medicine data: %s", cleaned)
|
|
return cleaned
|
|
|
|
def _calculate_derived_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""Calculate derived data for the medicine."""
|
|
now = datetime.now()
|
|
today = now.date()
|
|
|
|
# Check if prescription is still active
|
|
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
|
|
expiry_date = prescription.get(CONF_EXPIRY_DATE)
|
|
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
|
|
|
|
prescription_active = False
|
|
if expiry_date and repeats_left > 0:
|
|
if isinstance(expiry_date, str):
|
|
expiry_date = datetime.fromisoformat(expiry_date).date()
|
|
prescription_active = today <= expiry_date
|
|
|
|
medicine_data["prescription_active"] = prescription_active
|
|
|
|
# Calculate next dose time
|
|
next_dose = self._calculate_next_dose(medicine_data, now)
|
|
medicine_data["next_dose"] = next_dose
|
|
|
|
# Check if due for dose
|
|
medicine_data["dose_due"] = self._is_dose_due(medicine_data, now)
|
|
|
|
return medicine_data
|
|
|
|
def _calculate_next_dose(self, medicine_data: dict[str, Any], now: datetime) -> datetime | None:
|
|
"""Calculate the next dose time."""
|
|
schedule = medicine_data.get(CONF_SCHEDULE, {})
|
|
days = schedule.get(CONF_DAYS, [])
|
|
times = schedule.get(CONF_TIMES, [])
|
|
|
|
if not days or not times or not isinstance(days, list) or not isinstance(times, list):
|
|
return None
|
|
|
|
# Get current day of week (0=Monday, 6=Sunday)
|
|
current_weekday = now.weekday()
|
|
current_date = now.date()
|
|
current_time = now.time()
|
|
|
|
# Convert day names to weekday numbers
|
|
weekday_map = {
|
|
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
|
|
"friday": 4, "saturday": 5, "sunday": 6
|
|
}
|
|
|
|
scheduled_weekdays = []
|
|
for day in days:
|
|
if isinstance(day, str) and day in weekday_map:
|
|
scheduled_weekdays.append(weekday_map[day])
|
|
|
|
if not scheduled_weekdays:
|
|
return None
|
|
|
|
# Get dose times for today and future
|
|
dose_times = []
|
|
for time_slot in times:
|
|
if isinstance(time_slot, str) and time_slot in DEFAULT_TIMES:
|
|
dose_times.append(DEFAULT_TIMES[time_slot])
|
|
|
|
if not dose_times:
|
|
return None
|
|
|
|
dose_times.sort()
|
|
|
|
# Check for remaining doses today
|
|
if current_weekday in scheduled_weekdays:
|
|
for dose_time in dose_times:
|
|
if dose_time > current_time:
|
|
return datetime.combine(current_date, dose_time)
|
|
|
|
# Find next scheduled day
|
|
days_ahead = 1
|
|
while days_ahead <= 7:
|
|
next_date = current_date + timedelta(days=days_ahead)
|
|
next_weekday = next_date.weekday()
|
|
|
|
if next_weekday in scheduled_weekdays:
|
|
return datetime.combine(next_date, dose_times[0])
|
|
|
|
days_ahead += 1
|
|
|
|
return None
|
|
|
|
def _is_dose_due(self, medicine_data: dict[str, Any], now: datetime) -> bool:
|
|
"""Check if a dose is currently due."""
|
|
schedule = medicine_data.get(CONF_SCHEDULE, {})
|
|
days = schedule.get(CONF_DAYS, [])
|
|
times = schedule.get(CONF_TIMES, [])
|
|
last_taken = medicine_data.get("last_taken", {})
|
|
|
|
if not days or not times or not isinstance(days, list) or not isinstance(times, list):
|
|
return False
|
|
|
|
current_weekday = now.strftime("%A").lower()
|
|
current_time = now.time()
|
|
current_date = now.date().isoformat()
|
|
|
|
if current_weekday not in days:
|
|
return False
|
|
|
|
# Check each scheduled time slot
|
|
for time_slot in times:
|
|
if not isinstance(time_slot, str) or time_slot not in DEFAULT_TIMES:
|
|
continue
|
|
|
|
slot_time = DEFAULT_TIMES[time_slot]
|
|
|
|
# Check if we're within 30 minutes of the dose time
|
|
dose_datetime = datetime.combine(now.date(), slot_time)
|
|
time_diff = abs((now - dose_datetime).total_seconds() / 60) # minutes
|
|
|
|
if time_diff <= 30: # Within 30 minutes
|
|
# Check if already taken today for this time slot
|
|
dose_key = f"{current_date}_{time_slot}"
|
|
if dose_key not in last_taken:
|
|
return True
|
|
|
|
return False
|
|
|
|
async def async_take_dose(self, time_slot: str) -> bool:
|
|
"""Record that a dose was taken."""
|
|
try:
|
|
data = await self.store.async_load() or {}
|
|
medicines = data.get("medicines", {})
|
|
|
|
if self.medicine_id not in medicines:
|
|
return False
|
|
|
|
medicine_data = medicines[self.medicine_id]
|
|
last_taken = medicine_data.get("last_taken", {})
|
|
|
|
# Record the dose
|
|
now = datetime.now()
|
|
dose_key = f"{now.date().isoformat()}_{time_slot}"
|
|
last_taken[dose_key] = now.isoformat()
|
|
|
|
# Reduce inventory
|
|
current_inventory = medicine_data.get("inventory", 0)
|
|
if current_inventory > 0:
|
|
medicine_data["inventory"] = current_inventory - 1
|
|
|
|
medicine_data["last_taken"] = last_taken
|
|
medicines[self.medicine_id] = medicine_data
|
|
|
|
await self.store.async_save({"medicines": medicines})
|
|
await self.async_request_refresh()
|
|
|
|
return True
|
|
|
|
except Exception as err:
|
|
_LOGGER.error("Error recording dose: %s", err)
|
|
return False
|
|
|
|
async def async_fill_prescription(self) -> bool:
|
|
"""Fill the prescription (add inventory, reduce repeats)."""
|
|
try:
|
|
data = await self.store.async_load() or {}
|
|
medicines = data.get("medicines", {})
|
|
|
|
if self.medicine_id not in medicines:
|
|
return False
|
|
|
|
medicine_data = medicines[self.medicine_id]
|
|
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
|
|
|
|
# Check if prescription is still active
|
|
if not medicine_data.get("prescription_active", False):
|
|
return False
|
|
|
|
# Reduce repeats left
|
|
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
|
|
if repeats_left <= 0:
|
|
return False
|
|
|
|
prescription[CONF_REPEATS_LEFT] = repeats_left - 1
|
|
|
|
# Add to inventory
|
|
pack_size = medicine_data.get(CONF_PACK_SIZE, 30)
|
|
current_inventory = medicine_data.get("inventory", 0)
|
|
medicine_data["inventory"] = current_inventory + pack_size
|
|
|
|
medicine_data[CONF_PRESCRIPTION] = prescription
|
|
medicines[self.medicine_id] = medicine_data
|
|
|
|
await self.store.async_save({"medicines": medicines})
|
|
await self.async_request_refresh()
|
|
|
|
return True
|
|
|
|
except Exception as err:
|
|
_LOGGER.error("Error filling prescription: %s", err)
|
|
return False |