add .vscode dir on .gitignore and reformate pythonfiles

This commit is contained in:
2025-10-26 10:40:44 +01:00
parent 6a72e7b3d0
commit f9a3c0f28d
3 changed files with 64 additions and 43 deletions

3
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.env
data/
data/
.vscode/

93
main.py
View File

@@ -1,17 +1,17 @@
from fastapi import FastAPI, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Request
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta
from jose import JWTError, jwt
import sqlite3
import uvicorn
import requests
import os
from passlib.context import CryptContext
from collections import defaultdict
from passlib.context import CryptContext
import os
import requests
import sqlite3
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
from translations import get_standard_items, get_all_translations
from fastapi import FastAPI, HTTPException, Depends, status, WebSocket, WebSocketDisconnect, Request
app = FastAPI()
@@ -28,7 +28,8 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
MAX_FAILED_ATTEMPTS_USER = 5
MAX_FAILED_ATTEMPTS_ADMIN = 3
LOCKOUT_PERIOD_MINUTES = 30
failed_login_attempts_ip = defaultdict(lambda: {'count': 0, 'timestamp': datetime.utcnow()})
failed_login_attempts_ip = defaultdict(
lambda: {'count': 0, 'timestamp': datetime.utcnow()})
# --- Pydantic Models ---
@@ -103,7 +104,8 @@ def get_user(username: str):
user_dict = dict(user_data)
if user_dict.get('locked_at'):
try:
user_dict['locked_at'] = datetime.fromisoformat(user_dict['locked_at'])
user_dict['locked_at'] = datetime.fromisoformat(
user_dict['locked_at'])
except (TypeError, ValueError):
user_dict['locked_at'] = None
return UserInDB(**user_dict)
@@ -161,9 +163,6 @@ def send_gotify_notification(title: str, message: str) -> str:
return "Error sending Gotify notification."
from translations import get_standard_items, get_all_translations
# --- Datenbank-Setup ---
def column_exists(cursor, table_name, column_name):
cursor.execute(f"PRAGMA table_info({table_name})")
@@ -185,9 +184,11 @@ def init_db():
""")
if not column_exists(cursor, "users", "failed_login_attempts"):
cursor.execute("ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER NOT NULL DEFAULT 0")
cursor.execute(
"ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER NOT NULL DEFAULT 0")
if not column_exists(cursor, "users", "is_locked"):
cursor.execute("ALTER TABLE users ADD COLUMN is_locked BOOLEAN NOT NULL DEFAULT 0")
cursor.execute(
"ALTER TABLE users ADD COLUMN is_locked BOOLEAN NOT NULL DEFAULT 0")
if not column_exists(cursor, "users", "locked_at"):
cursor.execute("ALTER TABLE users ADD COLUMN locked_at TIMESTAMP")
@@ -200,7 +201,8 @@ def init_db():
""")
if not column_exists(cursor, "items", "created_by_user_id"):
cursor.execute("ALTER TABLE items ADD COLUMN created_by_user_id INTEGER REFERENCES users(id)")
cursor.execute(
"ALTER TABLE items ADD COLUMN created_by_user_id INTEGER REFERENCES users(id)")
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] == 0:
@@ -244,7 +246,8 @@ async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequ
user = get_user(form_data.username)
if user and user.is_locked:
lockout_expiry = user.locked_at + timedelta(minutes=LOCKOUT_PERIOD_MINUTES) if user.locked_at else now
lockout_expiry = user.locked_at + \
timedelta(minutes=LOCKOUT_PERIOD_MINUTES) if user.locked_at else now
if now < lockout_expiry:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@@ -270,7 +273,8 @@ async def login_for_access_token(request: Request, form_data: OAuth2PasswordRequ
cursor.execute("UPDATE users SET failed_login_attempts = ?, is_locked = 1, locked_at = ? WHERE username = ?",
(new_attempts, now, user.username))
else:
cursor.execute("UPDATE users SET failed_login_attempts = ? WHERE username = ?", (new_attempts, user.username))
cursor.execute(
"UPDATE users SET failed_login_attempts = ? WHERE username = ?", (new_attempts, user.username))
conn.commit()
conn.close()
@@ -309,7 +313,8 @@ async def read_users_me(current_user: User = Depends(get_current_active_user)):
@app.post("/api/users", status_code=status.HTTP_201_CREATED)
async def create_user(user: NewUser, current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can create users")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can create users")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
@@ -317,7 +322,8 @@ async def create_user(user: NewUser, current_user: User = Depends(get_current_ac
cursor.execute("SELECT id FROM users WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered")
hashed_password = pwd_context.hash(user.password)
cursor.execute(
@@ -369,7 +375,8 @@ class UserListUser(BaseModel):
@app.get("/api/users", response_model=List[UserListUser])
async def get_users(current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can access the user list")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can access the user list")
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
@@ -386,10 +393,12 @@ async def delete_user(
current_user: UserInDB = Depends(get_current_active_user)
):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can delete users")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete users")
if current_user.id == user_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Admins cannot delete themselves")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
detail="Admins cannot delete themselves")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
@@ -397,7 +406,8 @@ async def delete_user(
cursor.execute("SELECT id FROM users WHERE id = ?", (user_id,))
if cursor.fetchone() is None:
conn.close()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
@@ -409,12 +419,14 @@ async def delete_user(
@app.get("/api/users/locked", response_model=List[UserListUser])
async def get_locked_users(current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can access this resource")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can access this resource")
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT id, username, is_admin, is_locked FROM users WHERE is_locked = 1")
cursor.execute(
"SELECT id, username, is_admin, is_locked FROM users WHERE is_locked = 1")
locked_users = cursor.fetchall()
conn.close()
return [dict(row) for row in locked_users]
@@ -423,11 +435,13 @@ async def get_locked_users(current_user: UserInDB = Depends(get_current_active_u
@app.put("/api/users/{user_id}/unlock", status_code=status.HTTP_200_OK)
async def unlock_user(user_id: int, current_user: UserInDB = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can perform this action")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can perform this action")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE id = ?", (user_id,))
cursor.execute(
"UPDATE users SET is_locked = 0, failed_login_attempts = 0, locked_at = NULL WHERE id = ?", (user_id,))
conn.commit()
conn.close()
@@ -439,7 +453,8 @@ async def get_items(current_user: User = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT id, name, is_standard, created_by_user_id FROM items ORDER BY name")
cursor.execute(
"SELECT id, name, is_standard, created_by_user_id FROM items ORDER BY name")
items = cursor.fetchall()
conn.close()
return {"items": [dict(row) for row in items]}
@@ -449,7 +464,7 @@ async def get_items(current_user: User = Depends(get_current_active_user)):
async def add_item(item: Item, current_user: UserInDB = Depends(get_current_active_user)):
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
added_count = 0
for name in item.names:
cursor.execute("SELECT * FROM items WHERE name = ?", (name,))
@@ -466,10 +481,10 @@ async def add_item(item: Item, current_user: UserInDB = Depends(get_current_acti
added_count += 1
except sqlite3.IntegrityError:
continue
conn.commit()
conn.close()
if added_count > 0:
return {"status": "ok", "added": added_count}
else:
@@ -479,7 +494,8 @@ async def add_item(item: Item, current_user: UserInDB = Depends(get_current_acti
@app.delete("/api/items/{item_id}")
async def delete_item(item_id: int, current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can delete items")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can delete items")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
@@ -502,7 +518,8 @@ async def trigger_notification(lang: str = 'en', current_user: User = Depends(ge
translations = get_all_translations(lang)
if not item_names:
message = translations.get("empty_list_message", "The shopping list is empty.")
message = translations.get(
"empty_list_message", "The shopping list is empty.")
else:
message = ", ".join(item_names)

View File

@@ -2,9 +2,11 @@
import xml.etree.ElementTree as ET
from typing import List, Dict
def get_standard_items(lang: str) -> List[str]:
"""
Parses the strings.xml file for the given language and returns the standard shopping list items.
Parses the strings.xml file for the given language and returns the
standard shopping list items.
"""
if lang == "de":
filepath = "translations/de/strings.xml"
@@ -22,9 +24,11 @@ def get_standard_items(lang: str) -> List[str]:
return []
def get_all_translations(lang: str) -> Dict[str, str]:
"""
Returns a dictionary with all UI and notification translations for the web app.
Returns a dictionary with all UI and notification translations
for the web app.
"""
if lang == "de":
return {
@@ -65,7 +69,7 @@ def get_all_translations(lang: str) -> Dict[str, str]:
"login_error_incorrect": "Benutzername oder Passwort inkorrekt.",
"login_error_generic": "Ein Fehler ist aufgetreten. Bitte erneut versuchen."
}
else: # Fallback to English
else: # Fallback to English
return {
"title": "Shared Shopping List",
"placeholder": "Enter item...",
@@ -104,4 +108,3 @@ def get_all_translations(lang: str) -> Dict[str, str]:
"login_error_incorrect": "Incorrect username or password.",
"login_error_generic": "An error occurred. Please try again."
}