Commit 64aa5210 authored by Manuel's avatar Manuel

finished verification method for microservices

parent 3e752fb6
......@@ -27,6 +27,7 @@ def api_root():
# SSL configuration
try:
# should be ../../../modules/certificate local
certificate_path = os.environ['ARTICONF_CERTIFICATE_PATH']
except KeyError:
certificate_path = '/srv/articonf/'
......@@ -34,4 +35,4 @@ context = (os.path.normpath(f'{certificate_path}/articonf1.crt'), os.path.normpa
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True, ssl_context=context)
app.run(host='0.0.0.0', port=5000, ssl_context=context)
# global import
from security import security_util
import logging
def verifyTokenRegular(token, required_scopes):
LOGGER = logging.getLogger(__name__)
from typing import List
def _verify(token:str, roles:List[str]=[]):
try:
token_info = security_util.decodeToken(token)
return token_info
token_info = security_util.decodeToken(token, roles=roles)
return token_info
except Exception as e:
LOGGER.error("ERROR DURING TOKEN VALIDATION: "+str(e))
print("ERROR DURING TOKEN VALIDATION: "+str(e))
return None
def verifyTokenRegular(token, required_scopes):
return _verify(token)
def verifyTokenAdmin(token, required_scopes):
# TODO call restGateway to verify the token
return {}
return _verify(token, roles=["a"])
\ No newline at end of file
......@@ -3,7 +3,7 @@ import network_constants
import requests
import json
from typing import Dict
from typing import Dict, List
class TokenStash:
'''
......@@ -33,30 +33,30 @@ class TokenStash:
return None
def decodeToken(token: str) -> Dict:
def decodeToken(token: str, roles:List[str]=[]) -> Dict:
'''
verifies the passed token on the user-microservice and returns a dictionary with the
subject entry if the verification was successful, an error is raised otherwise
@params:
token - Required : JWT token from authorization header, must start with "Bearer "
roles - Optional : User must have at least one of these roles
'''
cached_username = TokenStash.is_token_cached(token)
if cached_username != None:
print("Re-using cached token!")
return {"sub": cached_username}
if not token.startswith("Bearer "):
raise ValueError('Invalid JWT token (must be a Bearer string)')
token = token[7:]
cached_data = TokenStash.is_token_cached(token)
if cached_data != None:
# Re-Use cached token
return cached_data
url = f'https://{network_constants.REST_GATEWAY_HOSTNAME}:{network_constants.REST_GATEWAY_REST_PORT}/api/tokens/{token}'
response = requests.post(
url,
verify=False,
headers={'User-Agent': 'Chrome', 'Accept': 'application/json', 'Content-Type': 'application/json'},
# proxies = { "http":"http://proxy.uni-klu.ac.at:3128/", "https":"http://proxy.uni-klu.ac.at:3128/" }
)
......@@ -64,18 +64,15 @@ def decodeToken(token: str) -> Dict:
raise ValueError(
f"Validation of token failed ({response.status_code})!")
# TODO replace with token information
data = json.dumps(response.text)
print("Verification Response (raw): "+response.text)
print("Verification Response: "+data)
print(type(data))
data = json.loads(response.text)
if not "username" in data or not "role" in data:
raise ValueError(
f"Validation of token failed (missing field in verification response)!")
if len(roles) > 0 and data["role"] not in roles:
raise ValueError(
f"Validation of token failed (wrong role)!")
TokenStash.add(token, data["username"], data["role"])
return {"sub": data["username"], "role": data["role"]}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment