From: Giorgio Ravera Date: Sat, 31 Oct 2020 11:00:03 +0000 (+0100) Subject: Added dedicated module for oauth. X-Git-Tag: v0.3~15 X-Git-Url: http://git.giorgioravera.it/?a=commitdiff_plain;h=0f84d7b5a9814da8aac402e22c4384583fa686bb;p=mercedes_me_api.git Added dedicated module for oauth. --- diff --git a/config.py b/config.py index e9b167a..c5e2061 100644 --- a/config.py +++ b/config.py @@ -7,16 +7,10 @@ For more details about this component, please refer to the documentation at https://github.com/xraver/mercedes_me_api/ """ from configobj import ConfigObj -import base64 -import json import logging import os -import requests -from query import ( - GetResource, - GetToken -) +from oauth import MercedesMeOauth from const import * # Logger @@ -24,25 +18,14 @@ _LOGGER = logging.getLogger(__name__) class MercedesMeConfig: - token_file = "" - credentials_file = "" - resources_file = "" - client_id = "" - client_secret = "" - vin = "" - base64 = "" - access_token = "" - refresh_token = "" - token_expires_in = "" - ######################## # Init ######################## def __init__(self): - # Files - self.token_file = TOKEN_FILE self.credentials_file = CREDENTIAL_FILE - self.resources_file = RESOURCES_FILE + self.client_id = "" + self.client_secret = "" + self.vin = "" ######################## # Read Configuration @@ -50,7 +33,7 @@ class MercedesMeConfig: def ReadConfig(self): needToRefresh = False - # Read credentials from file + # Read Credentials from file if not os.path.isfile(self.credentials_file): _LOGGER.error ("Credential File " + self.credentials_file + " not found") return False @@ -74,117 +57,8 @@ class MercedesMeConfig: if not self.vin: _LOGGER.error ("No "+ CONF_VEHICLE_ID + " found in the configuration") return False - # Base64 - b64_str = self.client_id + ":" + self.client_secret - b64_bytes = base64.b64encode( b64_str.encode('ascii') ) - self.base64 = b64_bytes.decode('ascii') # Read Token - if not os.path.isfile(self.token_file): - _LOGGER.error ("Token File missing - Creating a new one") - if (not self.CreateToken()): - _LOGGER.error ("Error creating token") - return False - else: - with open(self.token_file, 'r') as file: - try: - token = json.load(file) - except ValueError: - token = None - if self.CheckToken(token): - # Save Token - self.access_token = token['access_token'] - self.refresh_token = token['refresh_token'] - self.token_expires_in = token['expires_in'] - needToRefresh = True - else: - _LOGGER.error ("Token File not correct - Creating a new one") - if (not self.CreateToken()): - _LOGGER.error ("Error creating token") - return False - - if (needToRefresh): - if (not self.RefreshToken()): - _LOGGER.error ("Error refreshing token") - return False - - return True - - ######################## - # Write Token - ######################## - def WriteToken(self, token): - with open(self.token_file, 'w') as file: - json.dump(token, file) - - ######################## - # Check Token - ######################## - def CheckToken(self, token): - if token == None: - _LOGGER.error ("Error reading token.") - return False - if "error" in token: - if "error_description" in token: - _LOGGER.error ("Error retriving token: " + token["error_description"]) - else: - _LOGGER.error ("Error retriving token: " + token["error"]) - return False - if len(token) == 0: - _LOGGER.error ("Empty token found.") - return False - if not 'access_token' in token: - _LOGGER.error ("Access token not present.") - return False - if not 'refresh_token' in token: - _LOGGER.error ("Refresh token not present.") - return False - return True - - ######################## - # Create Token - ######################## - def CreateToken(self): - - auth_url = ( - f"{URL_OAUTH_AUTH}&" + - f"client_id={self.client_id}&" + - f"redirect_uri={REDIRECT_URL}&" + - f"scope={SCOPE}" - ) - - print( "Open the browser and insert this link:\n" ) - print(auth_url + "\n") - print( "Copy the code in the url:") - auth_code = input() - - token = GetToken(self, refresh=False, auth_code=auth_code) - - # Check Token - if not self.CheckToken(token): - return False - else: - # Save Token - self.WriteToken(token) - self.access_token = token['access_token'] - self.refresh_token = token['refresh_token'] - self.token_expires_in = token['expires_in'] - return True - - ######################## - # Refresh Token - ######################## - def RefreshToken(self): - - token = GetToken(self, refresh=True) - - # Check Token - if not self.CheckToken(token): - return False - else: - # Save Token - self.WriteToken(token) - self.access_token = token['access_token'] - self.refresh_token = token['refresh_token'] - self.token_expires_in = token['expires_in'] + self.token = MercedesMeOauth(self.client_id, self.client_secret) + self.token.ReadToken() return True diff --git a/const.py b/const.py index e6136e0..62f3034 100644 --- a/const.py +++ b/const.py @@ -15,9 +15,6 @@ CREDENTIAL_FILE = ".mercedesme_credentials" RESOURCES_FILE = ".mercedesme_resources" REDIRECT_URL = "https://localhost" SCOPE = "mb:vehicle:mbdata:fuelstatus%20mb:vehicle:mbdata:vehiclestatus%20mb:vehicle:mbdata:vehiclelock%20offline_access" -URL_OAUTH_BASE = "https://id.mercedes-benz.com/as" -URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code" -URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2" URL_RES_PREFIX = "https://api.mercedes-benz.com/vehicledata/v2" # File Parameters diff --git a/mercedes_me_api.py b/mercedes_me_api.py index d7019fd..3a7ef69 100644 --- a/mercedes_me_api.py +++ b/mercedes_me_api.py @@ -59,13 +59,13 @@ if __name__ == "__main__": # Create Token if (args.token == True): - if not data.mercedesConfig.CreateToken(): + if not data.mercedesConfig.token.CreateToken(): _LOGGER.error ("Error creating token") exit (1) # Refresh Token if (args.refresh == True): - if not data.mercedesConfig.RefreshToken(): + if not data.mercedesConfig.token.RefreshToken(): _LOGGER.error ("Error refreshing token") exit (1) diff --git a/oauth.py b/oauth.py new file mode 100644 index 0000000..f20ae9e --- /dev/null +++ b/oauth.py @@ -0,0 +1,172 @@ +""" +Mercedes Me APIs + +Author: G. Ravera + +For more details about this component, please refer to the documentation at +https://github.com/xraver/mercedes_me_api/ +""" +import base64 +import json +import logging +import os +from const import * +from query import * + +URL_OAUTH_BASE = "https://id.mercedes-benz.com/as" +URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code" +URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2" + +# Logger +_LOGGER = logging.getLogger(__name__) + +class MercedesMeOauth: + + token_file = "" + headers = "" + access_token = "" + refresh_token = "" + token_expires_in = "" + + ######################## + # Init + ######################## + def __init__(self, client_id, client_secret): + self.client_id = client_id + self.client_secret = client_secret + # Token File + self.token_file = TOKEN_FILE + # Base64 + b64_str = client_id + ":" + client_secret + b64_bytes = base64.b64encode( b64_str.encode('ascii') ) + self.base64 = b64_bytes.decode('ascii') + # Headers + self.headers = { + "Authorization": "Basic " + self.base64, + "content-type": "application/x-www-form-urlencoded" + } + + ######################## + # Read Token + ######################## + def ReadToken(self): + + found = False + needToRefresh = False + + # Read Token + if not os.path.isfile(self.token_file): + # Token File not present - Creating new one + _LOGGER.error ("Token File missing - Creating a new one") + found = False + else: + with open(self.token_file, 'r') as file: + try: + token = json.load(file) + if not self.CheckToken(token): + raise ValueError + else: + found = True + except ValueError: + _LOGGER.error ("Error reading token file - Creating a new one") + found = False + + if ( not found ): + # Not valid or file missing + if (not self.CreateToken()): + _LOGGER.error ("Error creating token") + return False + else: + # Valid: just import + self.access_token = token['access_token'] + self.refresh_token = token['refresh_token'] + self.token_expires_in = token['expires_in'] + needToRefresh = True + + if (needToRefresh): + if (not self.RefreshToken()): + _LOGGER.error ("Error refreshing token") + return False + + return True + + ######################## + # Write Token + ######################## + def WriteToken(self, token): + with open(self.token_file, 'w') as file: + json.dump(token, file) + + ######################## + # Check Token + ######################## + def CheckToken(self, token): + if "reason" in token: + _LOGGER.error ("Error retriving token - " + token["reason"] + " (" + str(token["code"]) + ")") + return False + if "error" in token: + if "error_description" in token: + _LOGGER.error ("Error retriving token: " + token["error_description"]) + else: + _LOGGER.error ("Error retriving token: " + token["error"]) + return False + if len(token) == 0: + _LOGGER.error ("Empty token found.") + return False + if not 'access_token' in token: + _LOGGER.error ("Access token not present.") + return False + if not 'refresh_token' in token: + _LOGGER.error ("Refresh token not present.") + return False + return True + + ######################## + # Create Token + ######################## + def CreateToken(self): + + auth_url = ( + f"{URL_OAUTH_AUTH}&" + + f"client_id={self.client_id}&" + + f"redirect_uri={REDIRECT_URL}&" + + f"scope={SCOPE}" + ) + + print( "Open the browser and insert this link:\n" ) + print(auth_url + "\n") + print( "Copy the code in the url:") + auth_code = input() + + data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL + token = GetToken(URL_OAUTH_TOKEN, self.headers, data) + + # Check Token + if not self.CheckToken(token): + return False + else: + # Save Token + self.WriteToken(token) + self.access_token = token['access_token'] + self.refresh_token = token['refresh_token'] + self.token_expires_in = token['expires_in'] + return True + + ######################## + # Refresh Token + ######################## + def RefreshToken(self): + + data = "grant_type=refresh_token&refresh_token=" + self.refresh_token + token = GetToken(URL_OAUTH_TOKEN, self.headers, data) + + # Check Token + if not self.CheckToken(token): + return False + else: + # Save Token + self.WriteToken(token) + self.access_token = token['access_token'] + self.refresh_token = token['refresh_token'] + self.token_expires_in = token['expires_in'] + return True diff --git a/query.py b/query.py index 39799b0..733c6bd 100644 --- a/query.py +++ b/query.py @@ -17,12 +17,12 @@ _LOGGER = logging.getLogger(__name__) ######################## # GetResource ######################## -def GetResource(resourceName, resourceURL, config): +def GetResource(resourceURL, config): # Set Header headers = { "accept": "application/json;charset=utf-8", - "authorization": "Bearer "+ config.access_token + "authorization": "Bearer "+ config.token.access_token } # Send Request @@ -64,24 +64,30 @@ def GetResource(resourceName, resourceURL, config): ######################## # GetToken ######################## -def GetToken(config, refresh=True, auth_code=""): - headers = { - "Authorization": "Basic " + config.base64, - "content-type": "application/x-www-form-urlencoded" - } - - if (not refresh): - # New Token - data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL - else: - # Refresh - data = "grant_type=refresh_token&refresh_token=" + config.refresh_token - - res = requests.post(URL_OAUTH_TOKEN, data = data, headers = headers) +def GetToken(tokenURL, headers, data): + res = requests.post(tokenURL, data = data, headers = headers) try: - token = res.json() + data = res.json() except ValueError: _LOGGER.error ("Error retriving token " + str(res.status_code)) - return None + data = { "reason": "No Data", + "code" : res.status_code + } + + # Check Error + if not res.ok: + if ("reason" in data): + reason = data["reason"] + else: + if res.status_code == 302: + reason = "The request scope is invalid" + elif res.status_code == 400: + reason = "The redirect_uri differs from the registered one" + elif res.status_code == 401: + reason = "The specified client ID is invalid" + else: + reason = "Generic Error" + data["reason"] = reason + data["code"] = res.status_code - return token + return data diff --git a/resources.py b/resources.py index abf2913..5691f0e 100644 --- a/resources.py +++ b/resources.py @@ -11,8 +11,8 @@ import json import os from config import MercedesMeConfig -from query import GetResource from const import * +from query import * # Logger _LOGGER = logging.getLogger(__name__) @@ -66,13 +66,11 @@ class MercedesMeResource: def update(self): """Fetch new state data for the sensor.""" - resName = self._name - resURL = URL_RES_PREFIX + self._href - result = GetResource(resName, resURL, self._config) + result = GetResource(URL_RES_PREFIX + self._href, self._config) if not "reason" in result: self._valid = True - self._timestamp = result[resName]["timestamp"] - self._state = result[resName]["value"] + self._timestamp = result[self._name]["timestamp"] + self._state = result[self._name]["value"] class MercedesMeResources: @@ -82,6 +80,7 @@ class MercedesMeResources: def __init__(self, mercedesConfig): self.database = [] + self.resources_file = RESOURCES_FILE self.mercedesConfig = mercedesConfig ######################## @@ -92,12 +91,12 @@ class MercedesMeResources: found = False resources = None - if not os.path.isfile(self.mercedesConfig.resources_file): + if not os.path.isfile(self.resources_file): # Resources File not present - Retriving new one from server _LOGGER.error ("Resource File missing - Creating a new one.") found = False else: - with open(self.mercedesConfig.resources_file, 'r') as file: + with open(self.resources_file, 'r') as file: try: resources = json.load(file) if (not self.CheckResources(resources)): @@ -147,9 +146,8 @@ class MercedesMeResources: # Retrive Resources List ######################## def RetriveResourcesList(self): - resName = "resources" - resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/" + resName - resources = GetResource(resName, resURL, self.mercedesConfig) + resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/resources" + resources = GetResource(resURL, self.mercedesConfig) if not self.CheckResources(resources): _LOGGER.error ("Error retriving available resources") return None @@ -175,7 +173,7 @@ class MercedesMeResources: for res in self.database: output.append( res.getJson() ) # Write File - with open(self.mercedesConfig.resources_file, 'w') as file: + with open(self.resources_file, 'w') as file: json.dump(output, file) ######################## @@ -202,12 +200,10 @@ class MercedesMeResources: ######################## def UpdateResourcesState(self): for res in self.database: - resName = res._name - resURL = URL_RES_PREFIX + res._href - result = GetResource(resName, resURL, self.mercedesConfig) + result = GetResource(URL_RES_PREFIX + res._href, self.mercedesConfig) if not "reason" in result: res._valid = True - res._timestamp = result[resName]["timestamp"] - res._state = result[resName]["value"] + res._timestamp = result[res._name]["timestamp"] + res._state = result[res._name]["value"] # Write Resource File self.WriteResourcesFile()