Initial commit

This commit is contained in:
2025-08-17 18:51:29 +10:00
parent 214d7f8228
commit 02380cd0d9
12 changed files with 1877 additions and 1 deletions

260
coordinator.py Normal file
View File

@@ -0,0 +1,260 @@
"""DataUpdateCoordinator for MedMate integration."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, time, date
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import (
DOMAIN,
STORAGE_KEY,
STORAGE_VERSION,
CONF_SCHEDULE,
CONF_PRESCRIPTION,
CONF_DAYS,
CONF_TIMES,
CONF_EXPIRY_DATE,
CONF_REPEATS_LEFT,
TIME_SLOTS,
)
_LOGGER = logging.getLogger(__name__)
UPDATE_INTERVAL = timedelta(minutes=5)
# Default times for time slots
DEFAULT_TIMES = {
"morning": time(8, 0),
"lunchtime": time(12, 0),
"dinner": time(18, 0),
"night": time(22, 0),
}
class MedMateDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching MedMate data."""
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=UPDATE_INTERVAL,
)
self.entry = entry
self.store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
self.medicine_id = entry.data.get("medicine_id")
async def _async_update_data(self) -> dict[str, Any]:
"""Fetch data from storage."""
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
_LOGGER.warning("Medicine %s not found in storage", self.medicine_id)
return {}
medicine_data = medicines[self.medicine_id]
# Calculate derived values
medicine_data = self._calculate_derived_data(medicine_data)
return medicine_data
def _calculate_derived_data(self, medicine_data: dict[str, Any]) -> dict[str, Any]:
"""Calculate derived data for the medicine."""
now = datetime.now()
today = now.date()
# Check if prescription is still active
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
expiry_date = prescription.get(CONF_EXPIRY_DATE)
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
prescription_active = False
if expiry_date and repeats_left > 0:
if isinstance(expiry_date, str):
expiry_date = datetime.fromisoformat(expiry_date).date()
prescription_active = today <= expiry_date
medicine_data["prescription_active"] = prescription_active
# Calculate next dose time
next_dose = self._calculate_next_dose(medicine_data, now)
medicine_data["next_dose"] = next_dose
# Check if due for dose
medicine_data["dose_due"] = self._is_dose_due(medicine_data, now)
return medicine_data
def _calculate_next_dose(self, medicine_data: dict[str, Any], now: datetime) -> datetime | None:
"""Calculate the next dose time."""
schedule = medicine_data.get(CONF_SCHEDULE, {})
days = schedule.get(CONF_DAYS, [])
times = schedule.get(CONF_TIMES, [])
if not days or not times:
return None
# Get current day of week (0=Monday, 6=Sunday)
current_weekday = now.weekday()
current_date = now.date()
current_time = now.time()
# Convert day names to weekday numbers
weekday_map = {
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6
}
scheduled_weekdays = [weekday_map[day] for day in days if day in weekday_map]
if not scheduled_weekdays:
return None
# Get dose times for today and future
dose_times = []
for time_slot in times:
if time_slot in DEFAULT_TIMES:
dose_times.append(DEFAULT_TIMES[time_slot])
if not dose_times:
return None
dose_times.sort()
# Check for remaining doses today
if current_weekday in scheduled_weekdays:
for dose_time in dose_times:
if dose_time > current_time:
return datetime.combine(current_date, dose_time)
# Find next scheduled day
days_ahead = 1
while days_ahead <= 7:
next_date = current_date + timedelta(days=days_ahead)
next_weekday = next_date.weekday()
if next_weekday in scheduled_weekdays:
return datetime.combine(next_date, dose_times[0])
days_ahead += 1
return None
def _is_dose_due(self, medicine_data: dict[str, Any], now: datetime) -> bool:
"""Check if a dose is currently due."""
schedule = medicine_data.get(CONF_SCHEDULE, {})
days = schedule.get(CONF_DAYS, [])
times = schedule.get(CONF_TIMES, [])
last_taken = medicine_data.get("last_taken", {})
if not days or not times:
return False
current_weekday = now.strftime("%A").lower()
current_time = now.time()
current_date = now.date().isoformat()
if current_weekday not in days:
return False
# Check each scheduled time slot
for time_slot in times:
if time_slot not in DEFAULT_TIMES:
continue
slot_time = DEFAULT_TIMES[time_slot]
# Check if we're within 30 minutes of the dose time
dose_datetime = datetime.combine(now.date(), slot_time)
time_diff = abs((now - dose_datetime).total_seconds() / 60) # minutes
if time_diff <= 30: # Within 30 minutes
# Check if already taken today for this time slot
dose_key = f"{current_date}_{time_slot}"
if dose_key not in last_taken:
return True
return False
async def async_take_dose(self, time_slot: str) -> bool:
"""Record that a dose was taken."""
try:
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
return False
medicine_data = medicines[self.medicine_id]
last_taken = medicine_data.get("last_taken", {})
# Record the dose
now = datetime.now()
dose_key = f"{now.date().isoformat()}_{time_slot}"
last_taken[dose_key] = now.isoformat()
# Reduce inventory
current_inventory = medicine_data.get("inventory", 0)
if current_inventory > 0:
medicine_data["inventory"] = current_inventory - 1
medicine_data["last_taken"] = last_taken
medicines[self.medicine_id] = medicine_data
await self.store.async_save({"medicines": medicines})
await self.async_request_refresh()
return True
except Exception as err:
_LOGGER.error("Error recording dose: %s", err)
return False
async def async_fill_prescription(self) -> bool:
"""Fill the prescription (add inventory, reduce repeats)."""
try:
data = await self.store.async_load() or {}
medicines = data.get("medicines", {})
if self.medicine_id not in medicines:
return False
medicine_data = medicines[self.medicine_id]
prescription = medicine_data.get(CONF_PRESCRIPTION, {})
# Check if prescription is still active
if not medicine_data.get("prescription_active", False):
return False
# Reduce repeats left
repeats_left = prescription.get(CONF_REPEATS_LEFT, 0)
if repeats_left <= 0:
return False
prescription[CONF_REPEATS_LEFT] = repeats_left - 1
# Add to inventory
pack_size = medicine_data.get("pack_size", 30)
current_inventory = medicine_data.get("inventory", 0)
medicine_data["inventory"] = current_inventory + pack_size
medicine_data[CONF_PRESCRIPTION] = prescription
medicines[self.medicine_id] = medicine_data
await self.store.async_save({"medicines": medicines})
await self.async_request_refresh()
return True
except Exception as err:
_LOGGER.error("Error filling prescription: %s", err)
return False