Commit 9e4844c1 authored by Manuel's avatar Manuel

restGateway: added security definition to swagger file

parent eaf5c1f0
......@@ -4,6 +4,13 @@ info:
description: This is the documentation for the RESTful API gateway.
version: "1.0.0"
securityDefinitions:
JwtBearerAuth:
type: apiKey
name: Authorization
in: header
x-apikeyInfoFunc: "services.token_service.verifyToken"
consumes:
- "application/json"
produces:
......@@ -13,6 +20,20 @@ basePath: "/api"
# Paths supported by the server application
paths:
/secret:
get:
security:
- JwtBearerAuth: []
operationId: "rest.user.secret"
tags:
- "User"
summary: "Testpage for authentication"
description: "Should only be accessible with a valid JWT token in the 'authorization' header"
responses:
'200':
description: "OK"
'401':
description: "No or an invalid token was provided"
/tokens/{token}:
post:
operationId: "rest.user.verify"
......@@ -155,7 +176,7 @@ definitions:
properties:
token:
type: string
example: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.Et9HFtf9R3GEMA0IICOfFMVXY7kkTX1wr4qCyhIf58U"
example: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6InVzZXJuYW1lQGRvbWFpbi5jb20iLCJjcmVhdGVkX2F0IjoiMjAyMC0wNy0xNSAxNTo0Mzo0OC43MjQ4MjciLCJ2YWxpZF91bnRpbCI6IjIwMjAtMDctMTYgMTU6NDM6NDguNzI0ODI3In0.aR2Xe3pXj_MBS9UJKqhiq4u9M6Bv41ILPaKpA8BVzIY"
TokenRequest:
type: "object"
required:
......
# global imports (dont't worry, red is normal)
from db.entities.user import User
from services.user_service import UserService
from services.login_wrapper import login_required
from services.token_service import TokenService
from flask import request, Response
import bcrypt
import jwt
from datetime import datetime, timedelta
SIGNING_KEY = "yteNrMy6142WKwp8fKfrHkS5nlFpxtHgOXJh1ZPsOrV_gTcsO9eMY7aB7HUzRbTRO9dmZhCl3FdPtuvMe3K8aBA_wc2MmHRo8IkUIGmvUJGsAxKFClN_6oNW5fEvoeVKiL1krA-qjWbR_em-WksePgPoTsySW7QbKdi4f7cwuyK2_JZ2fQj9hDKlfJ2GzMXkKiWcfyCTr30yC6BviAFeRDD_Bpvg6znsrXr53Tq66hnwDwQ6QU7aHVu-bERblKZTYuvkSxsov6yRMEVWQoiuBITsQtIOcgSWK4Dy3BjSbqoIcKw3WG-s3wx1lTen19QbEu8vJC64e0iGeGDWT6vbtg"
TOKEN_VALIDITY_IN_DAYS = 1
def generate_token(user: User) -> str:
'''
creates a JWT token for a user which has the following fields:
- username
- created_at
- valid_until
'''
created_at = datetime.now()
valid_until = created_at + timedelta(days=1)
return jwt.encode(
{
'username': user.username,
'created_at': str(created_at),
'valid_until': str(valid_until),
},
SIGNING_KEY, algorithm='HS256'
).decode("utf-8")
def secret():
return "Pineapple does not belong to pizza!"
def verify(token):
'''
......@@ -40,30 +22,11 @@ def verify(token):
'''
try:
payload = jwt.decode(token, SIGNING_KEY, algorithms=['HS256'])
except:
return Response(status = 401, response=f'Invalid JWT token')
TokenService.verify("Bearer "+token)
# check if all needed fields are in the payload
if not "username" in payload or not "created_at" in payload or not "valid_until" in payload:
return Response(status = 401, response=f'Invalid JWT token')
try:
UserService.get_by_username(payload["username"])
return Response(status = 200)
except ValueError as e:
# return 400 if the user does not exist
return Response(status = 400, response=str(e))
# check if token has already expired
token_created_at = datetime.strptime(payload["created_at"], '%Y-%m-%d %H:%M:%S.%f')
valid_until = datetime.strptime(payload["valid_until"], '%Y-%m-%d %H:%M:%S.%f')
now = datetime.now()
if now <= token_created_at or now >= valid_until:
return Response(status = 401, response=f'Token expired')
# everthing is fine
return Response(status = 200)
return Response(status=401, response=str(e))
def authenticate():
'''
......@@ -75,7 +38,7 @@ def authenticate():
try:
user = UserService.get_by_credentials(username, data["password"])
return {"token": generate_token(user)}
return {"token": TokenService.generate_token(user)}
except ValueError as e:
# return 400 if the user does not exist or the password is wrong
return Response(status = 400, response=str(e))
......
from functools import wraps
from flask import g, request, redirect, url_for
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth = request.authorization
if auth == None:
return redirect(url_for('/api.rest_user_forbidden', next=request.url))
return f(*args, **kwargs)
return decorated_function
# global imports (dont't worry, red is normal)
from db.entities.user import User
from services.user_service import UserService
import jwt
from datetime import datetime, timedelta
from typing import Dict
SIGNING_KEY = "yteNrMy6142WKwp8fKfrHkS5nlFpxtHgOXJh1ZPsOrV_gTcsO9eMY7aB7HUzRbTRO9dmZhCl3FdPtuvMe3K8aBA_wc2MmHRo8IkUIGmvUJGsAxKFClN_6oNW5fEvoeVKiL1krA-qjWbR_em-WksePgPoTsySW7QbKdi4f7cwuyK2_JZ2fQj9hDKlfJ2GzMXkKiWcfyCTr30yC6BviAFeRDD_Bpvg6znsrXr53Tq66hnwDwQ6QU7aHVu-bERblKZTYuvkSxsov6yRMEVWQoiuBITsQtIOcgSWK4Dy3BjSbqoIcKw3WG-s3wx1lTen19QbEu8vJC64e0iGeGDWT6vbtg"
TOKEN_VALIDITY_IN_DAYS = 1
# needed by connexion, do not remove!
def verifyToken(token, **kwargs) -> Dict:
try:
user = TokenService.verify(token)
except ValueError as e:
print(f'ERROR DURING TOKEN VALIDATION: {str(e)}')
return None
return {"sub": user.username}
class TokenService:
@staticmethod
def generate_token(user: User) -> str:
'''
creates a JWT token for a user which has the following fields:
- username
- created_at
- valid_until
'''
created_at = datetime.now()
valid_until = created_at + timedelta(days=1)
return jwt.encode(
{
'username': user.username,
'created_at': str(created_at),
'valid_until': str(valid_until),
},
SIGNING_KEY, algorithm='HS256'
).decode("utf-8")
@staticmethod
def verify(token : str, **kwargs) -> User:
'''
verifies the validity of a JWT token. Raises a ValueError if one of the tests failes
performs the following tests (int this order):
- is the JWT token parsable? (it has not been damaged + the signature is valid)
- does the payload contain all necessary fields?
- does the user specified by the payload exist?
- is the expiration/creation date sound?
@params:
token - Required : JWT token from authorization header, must start with "Bearer "
'''
if not token.startswith("Bearer "):
raise ValueError('Invalid JWT token (must be a Bearer string)')
token = token[7:]
try:
payload = jwt.decode(token, SIGNING_KEY, algorithms=['HS256'])
except:
raise ValueError('Invalid JWT token (decoding failed)')
# check if all needed fields are in the payload
if not "username" in payload or not "created_at" in payload or not "valid_until" in payload:
return 'Invalid JWT token (missing fields)'
user = UserService.get_by_username(payload["username"])
# check if token has already expired
token_created_at = datetime.strptime(payload["created_at"], '%Y-%m-%d %H:%M:%S.%f')
valid_until = datetime.strptime(payload["valid_until"], '%Y-%m-%d %H:%M:%S.%f')
now = datetime.now()
if now <= token_created_at or now >= valid_until:
raise ValueError('Invalid JWT token (token expired)')
return user
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment