Files
noteshop-webapp/main.py
Désiré Werner Menrath a7d88be89c feat(realtime): Implement real-time updates for shopping list
Introduces WebSocket-based real-time updates for the shopping list.
Changes to items (add, mark, delete) are now instantly reflected
across all connected user sessions without requiring a page refresh.

This commit:
- Extends the WebSocket ConnectionManager to broadcast item updates.
- Modifies item manipulation endpoints (add, mark, delete) to trigger broadcasts.
- Updates the frontend to listen for update broadcasts and refresh the list.
- Updates README.md to reflect the new real-time update feature.
2025-11-05 13:42:07 +01:00

689 lines
23 KiB
Python

from collections import defaultdict
from passlib.context import CryptContext
import os
import requests
import sqlite3
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
from translations import get_standard_items, get_all_translations
from fastapi import FastAPI, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
app = FastAPI()
# --- Configuration ---
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 43200 # 30 days
DB_FILE = "data/shoppinglist.db"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# --- Brute-Force Protection ---
MAX_FAILED_ATTEMPTS_USER = 5
MAX_FAILED_ATTEMPTS_ADMIN = 3
LOCKOUT_PERIOD_MINUTES = 30
failed_login_attempts_ip = defaultdict(
lambda: {'count': 0, 'timestamp': datetime.utcnow()})
# --- Pydantic Models ---
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
is_admin: bool
class UserInDB(User):
id: int
password_hash: str
failed_login_attempts: int
is_locked: bool
locked_at: Optional[datetime]
class Item(BaseModel):
names: List[str]
class NewUser(BaseModel):
username: str
password: str
is_admin: bool = False
# --- WebSocket Connection Manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_map: dict[WebSocket, User] = {}
async def connect(self, websocket: WebSocket, user: User):
await websocket.accept()
self.active_connections.append(websocket)
self.user_map[websocket] = user
await self.broadcast_user_list()
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if websocket in self.user_map:
del self.user_map[websocket]
async def broadcast_user_list(self):
user_list = sorted([user.username for user in self.user_map.values()])
for connection in self.active_connections:
await connection.send_json({"type": "user_list", "users": user_list})
async def broadcast_update(self):
for connection in self.active_connections:
await connection.send_json({"type": "update"})
manager = ConnectionManager()
# --- Auth Helper Functions ---
def get_user(username: str):
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
user_data = cursor.fetchone()
conn.close()
if user_data:
user_dict = dict(user_data)
if user_dict.get('locked_at'):
try:
user_dict['locked_at'] = datetime.fromisoformat(
user_dict['locked_at'])
except (TypeError, ValueError):
user_dict['locked_at'] = None
return UserInDB(**user_dict)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_active_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(username=token_data.username)
if user is None:
raise credentials_exception
return user
# --- Gotify Konfiguration ---
GOTIFY_URL = os.getenv("GOTIFY_URL")
def send_gotify_notification(title: str, message: str) -> str:
if not GOTIFY_URL:
return "GOTIFY_URL is not set. Skipping notification."
try:
payload = {"title": title, "message": message}
response = requests.post(GOTIFY_URL, json=payload)
response.raise_for_status()
return "Gotify notification sent successfully."
except requests.exceptions.RequestException as e:
print(f"Error sending Gotify notification: {e}")
return "Error sending Gotify notification."
# --- Datenbank-Setup ---
def column_exists(cursor, table_name, column_name):
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [info[1] for info in cursor.fetchall()]
return column_name in columns
def init_db():
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
is_admin BOOLEAN NOT NULL DEFAULT 0
)
""")
if not column_exists(cursor, "users", "failed_login_attempts"):
cursor.execute(
"ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER NOT NULL DEFAULT 0")
if not column_exists(cursor, "users", "is_locked"):
cursor.execute(
"ALTER TABLE users ADD COLUMN is_locked BOOLEAN NOT NULL DEFAULT 0")
if not column_exists(cursor, "users", "locked_at"):
cursor.execute("ALTER TABLE users ADD COLUMN locked_at TIMESTAMP")
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
is_standard BOOLEAN NOT NULL DEFAULT 0
)
""")
if not column_exists(cursor, "items", "created_by_user_id"):
cursor.execute("ALTER TABLE items ADD COLUMN created_by_user_id INTEGER REFERENCES users(id)")
if not column_exists(cursor, "items", "marked"):
cursor.execute(
"ALTER TABLE items ADD COLUMN marked BOOLEAN NOT NULL DEFAULT 0")
cursor.execute('''
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT
)
''')
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] == 0:
admin_password = "admin"
hashed_password = pwd_context.hash(admin_password)
cursor.execute(
"INSERT INTO users (username, password_hash, is_admin) VALUES (?, ?, ?)",
("admin", hashed_password, True)
)
print("Default admin user 'admin' with password 'admin' created.")
print("Please change this password in a production environment.")
conn.commit()
conn.close()
print("Datenbank initialisiert.")
class DeletionRequest(BaseModel):
password: Optional[str] = None
@app.get("/api/standard-items/{lang}")
async def get_standard_items_route(lang: str):
return get_standard_items(lang)
@app.get("/api/translations/{lang}")
async def get_translations_route(lang: str):
return get_all_translations(lang)
# --- API Endpunkte ---
@app.post("/token", response_model=Token)
async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
client_ip = request.client.host
now = datetime.utcnow()
ip_failures = failed_login_attempts_ip[client_ip]
if ip_failures['count'] >= 15 and (now - ip_failures['timestamp']) < timedelta(minutes=LOCKOUT_PERIOD_MINUTES):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many failed login attempts from this IP. Please try again later."
)
user = get_user(form_data.username)
if user and user.is_locked:
lockout_expiry = user.locked_at + \
timedelta(minutes=LOCKOUT_PERIOD_MINUTES) if user.locked_at else now
if now < lockout_expiry:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is locked due to too many failed login attempts."
)
else:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE username = ?",
(user.username,))
conn.commit()
conn.close()
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user.password_hash):
if user:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
new_attempts = user.failed_login_attempts + 1
max_attempts = MAX_FAILED_ATTEMPTS_ADMIN if user.is_admin else MAX_FAILED_ATTEMPTS_USER
if new_attempts >= max_attempts:
cursor.execute("UPDATE users SET failed_login_attempts = ?, is_locked = 1, locked_at = ? WHERE username = ?",
(new_attempts, now, user.username))
else:
cursor.execute(
"UPDATE users SET failed_login_attempts = ? WHERE username = ?", (new_attempts, user.username))
conn.commit()
conn.close()
ip_failures['count'] += 1
ip_failures['timestamp'] = now
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if user.failed_login_attempts > 0 or user.is_locked:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("UPDATE users SET failed_login_attempts = 0, is_locked = 0, locked_at = NULL WHERE username = ?",
(user.username,))
conn.commit()
conn.close()
failed_login_attempts_ip[client_ip] = {'count': 0, 'timestamp': now}
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/api/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.post("/api/users", status_code=status.HTTP_201_CREATED)
async def create_user(user: NewUser, current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can create users")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered")
hashed_password = pwd_context.hash(user.password)
cursor.execute(
"INSERT INTO users (username, password_hash, is_admin) VALUES (?, ?, ?)",
(user.username, hashed_password, user.is_admin)
)
conn.commit()
conn.close()
return {"status": f"User {user.username} created"}
class PasswordChangeRequest(BaseModel):
current_password: str
new_password: str
@app.put("/api/users/me/password", status_code=status.HTTP_200_OK)
async def change_password(
request: PasswordChangeRequest,
current_user: UserInDB = Depends(get_current_active_user)
):
if not verify_password(request.current_password, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect current password",
)
new_hashed_password = pwd_context.hash(request.new_password)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(new_hashed_password, current_user.id)
)
conn.commit()
conn.close()
return {"status": "Password updated successfully"}
class UserListUser(BaseModel):
id: int
username: str
is_admin: bool
is_locked: bool
@app.get("/api/users", response_model=List[UserListUser])
async def get_users(current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can access the user list")
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT id, username, is_admin, is_locked FROM users")
users = cursor.fetchall()
conn.close()
return [dict(row) for row in users]
@app.delete("/api/users/{user_id}", status_code=status.HTTP_200_OK)
async def delete_user(
user_id: int,
current_user: UserInDB = Depends(get_current_active_user)
):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete users")
if current_user.id == user_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Admins cannot delete themselves")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
if cursor.fetchone() is None:
conn.close()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
conn.close()
return {"status": "User deleted successfully"}
@app.get("/api/users/locked", response_model=List[UserListUser])
async def get_locked_users(current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can access this resource")
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT id, username, is_admin, is_locked FROM users WHERE is_locked = 1")
locked_users = cursor.fetchall()
conn.close()
return [dict(row) for row in locked_users]
@app.put("/api/users/{user_id}/unlock", status_code=status.HTTP_200_OK)
async def unlock_user(user_id: int, current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can perform this action")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE id = ?", (user_id,))
conn.commit()
conn.close()
return {"status": "User unlocked successfully"}
@app.get("/api/items")
async def get_items(current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, is_standard, created_by_user_id, marked FROM items ORDER BY name")
items = cursor.fetchall()
conn.close()
return {"items": [dict(row) for row in items]}
@app.post("/api/items")
async def add_item(item: Item, current_user: UserInDB = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
added_count = 0
for name in item.names:
cursor.execute("SELECT * FROM items WHERE name = ?", (name,))
if cursor.fetchone():
continue
standard_items_de = get_standard_items('de')
standard_items_en = get_standard_items('en')
is_standard = name in standard_items_de or name in standard_items_en
try:
cursor.execute("INSERT INTO items (name, is_standard, created_by_user_id) VALUES (?, ?, ?)",
(name, is_standard, current_user.id))
added_count += 1
except sqlite3.IntegrityError:
continue
conn.commit()
conn.close()
if added_count > 0:
await manager.broadcast_update()
conn.close()
return {"status": "ok", "added": added_count}
else:
conn.close()
return {"status": "exists"}
@app.put("/api/items/{item_id}/mark")
async def mark_item(item_id: int, current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("UPDATE items SET marked = NOT marked WHERE id = ?", (item_id,))
conn.commit()
await manager.broadcast_update()
conn.close()
return {"status": "ok"}
@app.post("/api/items/delete-marked")
async def delete_marked_items(request: Optional[DeletionRequest] = None, current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute(
"SELECT value FROM app_settings WHERE key = 'deletion_password'")
result = cursor.fetchone()
conn.close()
# If a deletion password is set in app_settings
if result:
# And no password was provided in the request, or the provided password is incorrect
if not request or not request.password or not pwd_context.verify(request.password, result[0]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect password")
# If no deletion password is set in app_settings, then no password is required for deletion.
# The request.password is ignored in this case.
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE marked = 1")
conn.commit()
await manager.broadcast_update()
conn.close()
return {"status": "ok"}
@app.get("/api/settings/deletion-password")
async def get_deletion_password(current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute(
"SELECT value FROM app_settings WHERE key = 'deletion_password'")
result = cursor.fetchone()
conn.close()
return {"is_set": result is not None}
@app.post("/api/settings/deletion-password")
async def set_deletion_password(request: DeletionRequest, current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can set the deletion password")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
if not request.password:
cursor.execute("DELETE FROM app_settings WHERE key = 'deletion_password'")
else:
hashed_password = pwd_context.hash(request.password)
cursor.execute("INSERT OR REPLACE INTO app_settings (key, value) VALUES (?, ?)",
('deletion_password', hashed_password))
conn.commit()
conn.close()
return {"status": "ok"}
@app.delete("/api/items/{item_id}")
async def delete_item(item_id: int, current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete items")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
conn.close()
await manager.broadcast_update()
return {"status": "ok"}
@app.post("/api/notify")
async def trigger_notification(lang: str = 'en', current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM items ORDER BY name")
items = cursor.fetchall()
conn.close()
item_names = [dict(row)["name"] for row in items]
translations = get_all_translations(lang)
if not item_names:
message = translations.get(
"empty_list_message", "The shopping list is empty.")
else:
message = ", ".join(item_names)
status = send_gotify_notification(
translations.get("notification_title", "Shopping List Updated"),
message
)
return {"status": status}
@app.get("/api/db-status")
async def get_db_status(current_user: User = Depends(get_current_active_user)):
try:
conn = sqlite3.connect(DB_FILE)
version = sqlite3.sqlite_version
conn.close()
return {"version": version, "error": None}
except sqlite3.Error as e:
return {"version": None, "error": str(e)}
@app.websocket("/ws/{token}")
async def websocket_endpoint(websocket: WebSocket, token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
user = get_user(username=username)
if user is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
except JWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, user)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast_user_list()
# --- Statische Dateien (Frontend) ---
# Create data directory if it doesn't exist
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True)
init_db()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/{full_path:path}", response_class=HTMLResponse)
async def catch_all(request: Request, full_path: str):
lang = "de" # default
accept_language = request.headers.get("accept-language")
if accept_language:
langs = accept_language.split(",")[0]
if "de" in langs:
lang = "de"
elif "en" in langs:
lang = "en"
with open("static/index.html") as f:
html_content = f.read()
# Replace lang and favicon path
html_content = html_content.replace('lang="de"', f'lang="{lang}"')
html_content = html_content.replace(
'href="/favicon.png"', 'href="/static/favicon.png"')
return HTMLResponse(content=html_content)