implement substition

This commit is contained in:
Michi 2025-05-02 12:55:57 +02:00
parent b0d0f7ab5f
commit cd80cfd77a
3 changed files with 66 additions and 49 deletions

View file

@ -9,3 +9,7 @@ def hash_password(password: str) -> str:
def generate_new_token(length: int = 32) -> str: def generate_new_token(length: int = 32) -> str:
"""Generiert einen neuen sicheren Token.""" """Generiert einen neuen sicheren Token."""
return secrets.token_hex(length) # Erzeugt einen sicheren Token return secrets.token_hex(length) # Erzeugt einen sicheren Token
def substitute_string_reverse(s: str, alphabet: str, key: str) -> str:
reverse_map = {k: a for a, k in zip(alphabet, key)}
return "".join(reverse_map.get(ch.upper(), ch) for ch in s)

View file

@ -56,3 +56,11 @@ class SensorData(SQLModel, table=True):
class EncryptedPayload(BaseModel): class EncryptedPayload(BaseModel):
data: str data: str
class SensorDataEncryptedIn(BaseModel):
timestamp: str = Field(...)
temperature: Optional[str] = None
humidity: Optional[str] = None
pressure: Optional[str] = None
voc: Optional[str] = None
gas: Optional[str] = None

View file

@ -6,34 +6,24 @@ from fastapi import FastAPI, Depends, HTTPException, Header, Body, Query
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from sqlmodel import Session from sqlmodel import Session
from dbfunctions import List, Optional, get_db, save_sensor_data, get_client_id_by_name, validate_token_with_access, engine, save_token_to_db, get_recent_sensor_data, get_all_clients from dbfunctions import List, Optional, get_db, save_sensor_data, get_client_id_by_name, validate_token_with_access, engine, save_token_to_db, get_recent_sensor_data, get_all_clients
from models import EncryptedPayload, SensorDataIn, SensorData, MessageOnly, User, Client, ClientCreate, TokenResponse, Session as SessionModel from models import SensorDataIn, SensorData, SensorDataEncryptedIn, MessageOnly, User, Client, ClientCreate, TokenResponse, Session as SessionModel
from datetime import datetime, timedelta from datetime import datetime, timedelta
from crypto import hash_password, generate_new_token from crypto import hash_password, generate_new_token, substitute_string_reverse
from Crypto.Cipher import AES from os import getenv as env
import base64
AES_KEY = bytes([0x27, 0x9A, 0xB3, 0x11, 0xE7, 0x3D, 0xCC, 0x88,
0x55, 0x10, 0x9F, 0x4A, 0x6B, 0x1D, 0x20, 0x2F])
def decrypt_base64_aes128(encrypted_b64: str) -> str:
encrypted_bytes = base64.b64decode(encrypted_b64)
cipher = AES.new(AES_KEY, AES.MODE_ECB)
decrypted = cipher.decrypt(encrypted_bytes)
# Entferne Null-Byte-Padding (falls verwendet)
return decrypted.rstrip(b"\0").decode("utf-8")
############# SUBSTITION #############
ALPHABET = "ABCDEFGHIJKLMONPQRSTUVWXYZ012345689-:."
KEY = env("CRYPTO_SUBSTITION_KEY")
################ API ################ ################ API ################
app = FastAPI( app = FastAPI(
title="BBZW-Horizon", title="BBZW-Horizon",
description="BBZW-Horizon ist ein Tool, welches entwickelt wurde, um durch die Erfassung und Auswertung von Luftqualitätsmesswerten die Luftqualität in den Schulzimmern des BBZW Sursee zu verbessern. Bei dieser API handelt es sich um die Kommunikationsschnittstelle, zwischen den Arduinos, welche mit Sensoren die Daten erfassen und an die API senden. Diese API speichert die Daten dann in der Datenbank, damit diese durch das Frontend abgerufen und visualisiert werden können.", description="BBZW-Horizon ist ein Tool, welches entwickelt wurde, um durch die Erfassung und Auswertung von Luftqualitätsmesswerten die Luftqualität in den Schulzimmern des BBZW Sursee zu verbessern. Bei dieser API handelt es sich um die Kommunikationsschnittstelle, zwischen den Arduinos, welche mit Sensoren die Daten erfassen und an die API senden. Diese API speichert die Daten dann in der Datenbank, damit diese durch das Frontend abgerufen und visualisiert werden können.",
summary="Die BBZW-Horizon API dient als Kommunikationsschnittstelle, um Luftqualitätsmesswerte von Arduinos, die mit Sensoren ausgestattet sind, zu erfassen", summary="Die BBZW-Horizon API dient als Kommunikationsschnittstelle, um Luftqualitätsmesswerte von Arduinos, die mit Sensoren ausgestattet sind, zu erfassen",
version="0.0.6" version="0.0.7"
) )
# CORS ############### CORS ################
origins = [ origins = [
"*", "*",
] ]
@ -83,6 +73,49 @@ async def saveNewSensorData(
except Exception as error: except Exception as error:
raise HTTPException(status_code=500, detail=str(error)) raise HTTPException(status_code=500, detail=str(error))
@app.post("/sensors/push-data/secure", response_model=MessageOnly, tags=["sensors"])
async def saveNewSensorDataSecure(
client: str,
data: SensorDataEncryptedIn,
db: Session = Depends(get_db),
auth: bool = Depends(authenticate_user)
):
# Client-ID bestimmen
client_id = get_client_id_by_name(db, client)
if client_id is None:
raise HTTPException(status_code=404, detail="Client not found")
# Entschlüssle Timestamp
decrypted_timestamp_str = substitute_string_reverse(data.timestamp, ALPHABET, KEY)
try:
timestamp = datetime.fromisoformat(decrypted_timestamp_str)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid encrypted timestamp format")
# Hilfsfunktion zum Entschlüsseln Zahlenwerte
def decrypt_value(enc_val: Optional[str]) -> Optional[float]:
if enc_val is None:
return None
decrypted_str = substitute_string_reverse(enc_val, ALPHABET, KEY)
try:
return float(decrypted_str)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid encrypted sensor value: {enc_val}")
sensor_data = SensorData(
timestamp=timestamp,
temperature=decrypt_value(data.temperature),
humidity=decrypt_value(data.humidity),
pressure=decrypt_value(data.pressure),
voc=decrypt_value(data.voc),
gas=decrypt_value(data.gas),
clientid=client_id
)
save_sensor_data(db, sensor_data)
return MessageOnly(message="Decrypted sensor data saved successfully.")
@app.post("/user/new-session", response_model=TokenResponse, tags=["auth"]) @app.post("/user/new-session", response_model=TokenResponse, tags=["auth"])
async def generate_token( async def generate_token(
username: str = Body(...), username: str = Body(...),
@ -205,31 +238,3 @@ async def create_client(
db.refresh(new_client) db.refresh(new_client)
return MessageOnly(message="Client created successfully.") return MessageOnly(message="Client created successfully.")
@app.post("/sensors/push-encrypted", response_model=MessageOnly, tags=["sensors"])
async def save_encrypted_sensor_data(
client: str,
encrypted: EncryptedPayload,
db: Session = Depends(get_db),
auth: bool = Depends(authenticate_user)
):
try:
decrypted_json = decrypt_base64_aes128(encrypted.data)
# Parse JSON zu Pydantic-Modell
data = SensorDataIn.model_validate_json(decrypted_json)
# Client-ID abrufen
client_id = get_client_id_by_name(db, client)
if client_id is None:
raise HTTPException(status_code=404, detail="Client not found")
sensor_data = SensorData(**data.dict())
sensor_data.clientid = client_id
save_sensor_data(db, sensor_data)
return MessageOnly(message="Encrypted sensor data saved successfully.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error decoding or saving: {str(e)}")