Introduces WebSocket-based real-time updates for the shopping list. Changes to items (add, mark, delete) are now instantly reflected across all connected user sessions without requiring a page refresh. This commit: - Extends the WebSocket ConnectionManager to broadcast item updates. - Modifies item manipulation endpoints (add, mark, delete) to trigger broadcasts. - Updates the frontend to listen for update broadcasts and refresh the list. - Updates README.md to reflect the new real-time update feature.
689 lines
23 KiB
Python
689 lines
23 KiB
Python
|
|
from collections import defaultdict
|
|
from passlib.context import CryptContext
|
|
import os
|
|
import requests
|
|
import sqlite3
|
|
from jose import JWTError, jwt
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, List
|
|
from pydantic import BaseModel
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from fastapi.staticfiles import StaticFiles
|
|
from translations import get_standard_items, get_all_translations
|
|
from fastapi import FastAPI, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Request
|
|
from fastapi.responses import HTMLResponse
|
|
|
|
app = FastAPI()
|
|
|
|
# --- Configuration ---
|
|
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 43200 # 30 days
|
|
DB_FILE = "data/shoppinglist.db"
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
|
|
# --- Brute-Force Protection ---
|
|
MAX_FAILED_ATTEMPTS_USER = 5
|
|
MAX_FAILED_ATTEMPTS_ADMIN = 3
|
|
LOCKOUT_PERIOD_MINUTES = 30
|
|
failed_login_attempts_ip = defaultdict(
|
|
lambda: {'count': 0, 'timestamp': datetime.utcnow()})
|
|
|
|
|
|
# --- Pydantic Models ---
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
|
|
|
|
class TokenData(BaseModel):
|
|
username: Optional[str] = None
|
|
|
|
|
|
class User(BaseModel):
|
|
username: str
|
|
is_admin: bool
|
|
|
|
|
|
class UserInDB(User):
|
|
id: int
|
|
password_hash: str
|
|
failed_login_attempts: int
|
|
is_locked: bool
|
|
locked_at: Optional[datetime]
|
|
|
|
|
|
class Item(BaseModel):
|
|
names: List[str]
|
|
|
|
|
|
class NewUser(BaseModel):
|
|
username: str
|
|
password: str
|
|
is_admin: bool = False
|
|
|
|
|
|
# --- WebSocket Connection Manager ---
|
|
class ConnectionManager:
|
|
def __init__(self):
|
|
self.active_connections: List[WebSocket] = []
|
|
self.user_map: dict[WebSocket, User] = {}
|
|
|
|
async def connect(self, websocket: WebSocket, user: User):
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
self.user_map[websocket] = user
|
|
await self.broadcast_user_list()
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
if websocket in self.active_connections:
|
|
self.active_connections.remove(websocket)
|
|
if websocket in self.user_map:
|
|
del self.user_map[websocket]
|
|
|
|
async def broadcast_user_list(self):
|
|
user_list = sorted([user.username for user in self.user_map.values()])
|
|
for connection in self.active_connections:
|
|
await connection.send_json({"type": "user_list", "users": user_list})
|
|
|
|
async def broadcast_update(self):
|
|
for connection in self.active_connections:
|
|
await connection.send_json({"type": "update"})
|
|
|
|
|
|
manager = ConnectionManager()
|
|
|
|
|
|
# --- Auth Helper Functions ---
|
|
def get_user(username: str):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
|
|
user_data = cursor.fetchone()
|
|
conn.close()
|
|
if user_data:
|
|
user_dict = dict(user_data)
|
|
if user_dict.get('locked_at'):
|
|
try:
|
|
user_dict['locked_at'] = datetime.fromisoformat(
|
|
user_dict['locked_at'])
|
|
except (TypeError, ValueError):
|
|
user_dict['locked_at'] = None
|
|
return UserInDB(**user_dict)
|
|
|
|
|
|
def verify_password(plain_password, hashed_password):
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(minutes=15)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
async def get_current_active_user(token: str = Depends(oauth2_scheme)):
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise credentials_exception
|
|
token_data = TokenData(username=username)
|
|
except JWTError:
|
|
raise credentials_exception
|
|
user = get_user(username=token_data.username)
|
|
if user is None:
|
|
raise credentials_exception
|
|
return user
|
|
|
|
|
|
# --- Gotify Konfiguration ---
|
|
GOTIFY_URL = os.getenv("GOTIFY_URL")
|
|
|
|
|
|
def send_gotify_notification(title: str, message: str) -> str:
|
|
if not GOTIFY_URL:
|
|
return "GOTIFY_URL is not set. Skipping notification."
|
|
try:
|
|
payload = {"title": title, "message": message}
|
|
response = requests.post(GOTIFY_URL, json=payload)
|
|
response.raise_for_status()
|
|
return "Gotify notification sent successfully."
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Error sending Gotify notification: {e}")
|
|
return "Error sending Gotify notification."
|
|
|
|
|
|
# --- Datenbank-Setup ---
|
|
def column_exists(cursor, table_name, column_name):
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = [info[1] for info in cursor.fetchall()]
|
|
return column_name in columns
|
|
|
|
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT NOT NULL UNIQUE,
|
|
password_hash TEXT NOT NULL,
|
|
is_admin BOOLEAN NOT NULL DEFAULT 0
|
|
)
|
|
""")
|
|
|
|
if not column_exists(cursor, "users", "failed_login_attempts"):
|
|
cursor.execute(
|
|
"ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER NOT NULL DEFAULT 0")
|
|
if not column_exists(cursor, "users", "is_locked"):
|
|
cursor.execute(
|
|
"ALTER TABLE users ADD COLUMN is_locked BOOLEAN NOT NULL DEFAULT 0")
|
|
if not column_exists(cursor, "users", "locked_at"):
|
|
cursor.execute("ALTER TABLE users ADD COLUMN locked_at TIMESTAMP")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
is_standard BOOLEAN NOT NULL DEFAULT 0
|
|
)
|
|
""")
|
|
|
|
if not column_exists(cursor, "items", "created_by_user_id"):
|
|
cursor.execute("ALTER TABLE items ADD COLUMN created_by_user_id INTEGER REFERENCES users(id)")
|
|
|
|
if not column_exists(cursor, "items", "marked"):
|
|
cursor.execute(
|
|
"ALTER TABLE items ADD COLUMN marked BOOLEAN NOT NULL DEFAULT 0")
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS app_settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
)
|
|
''')
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users")
|
|
if cursor.fetchone()[0] == 0:
|
|
admin_password = "admin"
|
|
hashed_password = pwd_context.hash(admin_password)
|
|
cursor.execute(
|
|
"INSERT INTO users (username, password_hash, is_admin) VALUES (?, ?, ?)",
|
|
("admin", hashed_password, True)
|
|
)
|
|
print("Default admin user 'admin' with password 'admin' created.")
|
|
print("Please change this password in a production environment.")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print("Datenbank initialisiert.")
|
|
|
|
|
|
class DeletionRequest(BaseModel):
|
|
password: Optional[str] = None
|
|
|
|
|
|
@app.get("/api/standard-items/{lang}")
|
|
async def get_standard_items_route(lang: str):
|
|
return get_standard_items(lang)
|
|
|
|
|
|
@app.get("/api/translations/{lang}")
|
|
async def get_translations_route(lang: str):
|
|
return get_all_translations(lang)
|
|
|
|
|
|
# --- API Endpunkte ---
|
|
@app.post("/token", response_model=Token)
|
|
async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
|
|
client_ip = request.client.host
|
|
now = datetime.utcnow()
|
|
|
|
ip_failures = failed_login_attempts_ip[client_ip]
|
|
if ip_failures['count'] >= 15 and (now - ip_failures['timestamp']) < timedelta(minutes=LOCKOUT_PERIOD_MINUTES):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail="Too many failed login attempts from this IP. Please try again later."
|
|
)
|
|
|
|
user = get_user(form_data.username)
|
|
|
|
if user and user.is_locked:
|
|
lockout_expiry = user.locked_at + \
|
|
timedelta(minutes=LOCKOUT_PERIOD_MINUTES) if user.locked_at else now
|
|
if now < lockout_expiry:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Account is locked due to too many failed login attempts."
|
|
)
|
|
else:
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE username = ?",
|
|
(user.username,))
|
|
conn.commit()
|
|
conn.close()
|
|
user = get_user(form_data.username)
|
|
|
|
if not user or not verify_password(form_data.password, user.password_hash):
|
|
if user:
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
new_attempts = user.failed_login_attempts + 1
|
|
max_attempts = MAX_FAILED_ATTEMPTS_ADMIN if user.is_admin else MAX_FAILED_ATTEMPTS_USER
|
|
|
|
if new_attempts >= max_attempts:
|
|
cursor.execute("UPDATE users SET failed_login_attempts = ?, is_locked = 1, locked_at = ? WHERE username = ?",
|
|
(new_attempts, now, user.username))
|
|
else:
|
|
cursor.execute(
|
|
"UPDATE users SET failed_login_attempts = ? WHERE username = ?", (new_attempts, user.username))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
ip_failures['count'] += 1
|
|
ip_failures['timestamp'] = now
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if user.failed_login_attempts > 0 or user.is_locked:
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE users SET failed_login_attempts = 0, is_locked = 0, locked_at = NULL WHERE username = ?",
|
|
(user.username,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
failed_login_attempts_ip[client_ip] = {'count': 0, 'timestamp': now}
|
|
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.username}, expires_delta=access_token_expires
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@app.get("/api/users/me", response_model=User)
|
|
async def read_users_me(current_user: User = Depends(get_current_active_user)):
|
|
return current_user
|
|
|
|
|
|
@app.post("/api/users", status_code=status.HTTP_201_CREATED)
|
|
async def create_user(user: NewUser, current_user: User = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can create users")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT id FROM users WHERE username = ?", (user.username,))
|
|
if cursor.fetchone():
|
|
conn.close()
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Username already registered")
|
|
|
|
hashed_password = pwd_context.hash(user.password)
|
|
cursor.execute(
|
|
"INSERT INTO users (username, password_hash, is_admin) VALUES (?, ?, ?)",
|
|
(user.username, hashed_password, user.is_admin)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return {"status": f"User {user.username} created"}
|
|
|
|
|
|
class PasswordChangeRequest(BaseModel):
|
|
current_password: str
|
|
new_password: str
|
|
|
|
|
|
@app.put("/api/users/me/password", status_code=status.HTTP_200_OK)
|
|
async def change_password(
|
|
request: PasswordChangeRequest,
|
|
current_user: UserInDB = Depends(get_current_active_user)
|
|
):
|
|
if not verify_password(request.current_password, current_user.password_hash):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Incorrect current password",
|
|
)
|
|
|
|
new_hashed_password = pwd_context.hash(request.new_password)
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE users SET password_hash = ? WHERE id = ?",
|
|
(new_hashed_password, current_user.id)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"status": "Password updated successfully"}
|
|
|
|
|
|
class UserListUser(BaseModel):
|
|
id: int
|
|
username: str
|
|
is_admin: bool
|
|
is_locked: bool
|
|
|
|
|
|
@app.get("/api/users", response_model=List[UserListUser])
|
|
async def get_users(current_user: UserInDB = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can access the user list")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT id, username, is_admin, is_locked FROM users")
|
|
users = cursor.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in users]
|
|
|
|
|
|
@app.delete("/api/users/{user_id}", status_code=status.HTTP_200_OK)
|
|
async def delete_user(
|
|
user_id: int,
|
|
current_user: UserInDB = Depends(get_current_active_user)
|
|
):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can delete users")
|
|
|
|
if current_user.id == user_id:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Admins cannot delete themselves")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
|
|
if cursor.fetchone() is None:
|
|
conn.close()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
|
|
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"status": "User deleted successfully"}
|
|
|
|
|
|
@app.get("/api/users/locked", response_model=List[UserListUser])
|
|
async def get_locked_users(current_user: UserInDB = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can access this resource")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, username, is_admin, is_locked FROM users WHERE is_locked = 1")
|
|
locked_users = cursor.fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in locked_users]
|
|
|
|
|
|
@app.put("/api/users/{user_id}/unlock", status_code=status.HTTP_200_OK)
|
|
async def unlock_user(user_id: int, current_user: UserInDB = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can perform this action")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE id = ?", (user_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"status": "User unlocked successfully"}
|
|
|
|
|
|
@app.get("/api/items")
|
|
async def get_items(current_user: User = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT id, name, is_standard, created_by_user_id, marked FROM items ORDER BY name")
|
|
items = cursor.fetchall()
|
|
conn.close()
|
|
return {"items": [dict(row) for row in items]}
|
|
|
|
|
|
@app.post("/api/items")
|
|
async def add_item(item: Item, current_user: UserInDB = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
|
|
added_count = 0
|
|
for name in item.names:
|
|
cursor.execute("SELECT * FROM items WHERE name = ?", (name,))
|
|
if cursor.fetchone():
|
|
continue
|
|
|
|
standard_items_de = get_standard_items('de')
|
|
standard_items_en = get_standard_items('en')
|
|
is_standard = name in standard_items_de or name in standard_items_en
|
|
|
|
try:
|
|
cursor.execute("INSERT INTO items (name, is_standard, created_by_user_id) VALUES (?, ?, ?)",
|
|
(name, is_standard, current_user.id))
|
|
added_count += 1
|
|
except sqlite3.IntegrityError:
|
|
continue
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if added_count > 0:
|
|
await manager.broadcast_update()
|
|
conn.close()
|
|
return {"status": "ok", "added": added_count}
|
|
else:
|
|
conn.close()
|
|
return {"status": "exists"}
|
|
|
|
|
|
@app.put("/api/items/{item_id}/mark")
|
|
async def mark_item(item_id: int, current_user: User = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute("UPDATE items SET marked = NOT marked WHERE id = ?", (item_id,))
|
|
conn.commit()
|
|
await manager.broadcast_update()
|
|
conn.close()
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/api/items/delete-marked")
|
|
async def delete_marked_items(request: Optional[DeletionRequest] = None, current_user: User = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT value FROM app_settings WHERE key = 'deletion_password'")
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
# If a deletion password is set in app_settings
|
|
if result:
|
|
# And no password was provided in the request, or the provided password is incorrect
|
|
if not request or not request.password or not pwd_context.verify(request.password, result[0]):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect password")
|
|
# If no deletion password is set in app_settings, then no password is required for deletion.
|
|
# The request.password is ignored in this case.
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM items WHERE marked = 1")
|
|
conn.commit()
|
|
await manager.broadcast_update()
|
|
conn.close()
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/api/settings/deletion-password")
|
|
async def get_deletion_password(current_user: User = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT value FROM app_settings WHERE key = 'deletion_password'")
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
return {"is_set": result is not None}
|
|
|
|
|
|
@app.post("/api/settings/deletion-password")
|
|
async def set_deletion_password(request: DeletionRequest, current_user: User = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can set the deletion password")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
|
|
if not request.password:
|
|
cursor.execute("DELETE FROM app_settings WHERE key = 'deletion_password'")
|
|
else:
|
|
hashed_password = pwd_context.hash(request.password)
|
|
cursor.execute("INSERT OR REPLACE INTO app_settings (key, value) VALUES (?, ?)",
|
|
('deletion_password', hashed_password))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.delete("/api/items/{item_id}")
|
|
async def delete_item(item_id: int, current_user: User = Depends(get_current_active_user)):
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only admins can delete items")
|
|
|
|
conn = sqlite3.connect(DB_FILE)
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
await manager.broadcast_update()
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/api/notify")
|
|
async def trigger_notification(lang: str = 'en', current_user: User = Depends(get_current_active_user)):
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM items ORDER BY name")
|
|
items = cursor.fetchall()
|
|
conn.close()
|
|
|
|
item_names = [dict(row)["name"] for row in items]
|
|
translations = get_all_translations(lang)
|
|
|
|
if not item_names:
|
|
message = translations.get(
|
|
"empty_list_message", "The shopping list is empty.")
|
|
else:
|
|
message = ", ".join(item_names)
|
|
|
|
status = send_gotify_notification(
|
|
translations.get("notification_title", "Shopping List Updated"),
|
|
message
|
|
)
|
|
return {"status": status}
|
|
|
|
|
|
@app.get("/api/db-status")
|
|
async def get_db_status(current_user: User = Depends(get_current_active_user)):
|
|
try:
|
|
conn = sqlite3.connect(DB_FILE)
|
|
version = sqlite3.sqlite_version
|
|
conn.close()
|
|
return {"version": version, "error": None}
|
|
except sqlite3.Error as e:
|
|
return {"version": None, "error": str(e)}
|
|
|
|
|
|
@app.websocket("/ws/{token}")
|
|
async def websocket_endpoint(websocket: WebSocket, token: str):
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
return
|
|
user = get_user(username=username)
|
|
if user is None:
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
return
|
|
except JWTError:
|
|
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
return
|
|
|
|
await manager.connect(websocket, user)
|
|
try:
|
|
while True:
|
|
await websocket.receive_text()
|
|
except WebSocketDisconnect:
|
|
manager.disconnect(websocket)
|
|
await manager.broadcast_user_list()
|
|
|
|
|
|
# --- Statische Dateien (Frontend) ---
|
|
# Create data directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True)
|
|
init_db()
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
@app.get("/{full_path:path}", response_class=HTMLResponse)
|
|
async def catch_all(request: Request, full_path: str):
|
|
lang = "de" # default
|
|
accept_language = request.headers.get("accept-language")
|
|
if accept_language:
|
|
langs = accept_language.split(",")[0]
|
|
if "de" in langs:
|
|
lang = "de"
|
|
elif "en" in langs:
|
|
lang = "en"
|
|
|
|
with open("static/index.html") as f:
|
|
html_content = f.read()
|
|
|
|
# Replace lang and favicon path
|
|
html_content = html_content.replace('lang="de"', f'lang="{lang}"')
|
|
html_content = html_content.replace(
|
|
'href="/favicon.png"', 'href="/static/favicon.png"')
|
|
|
|
return HTMLResponse(content=html_content)
|