]> git.giorgioravera.it Git - mercedes_me_api.git/commitdiff
Added dedicated module for oauth.
authorGiorgio Ravera <giorgio.ravera@gmail.com>
Sat, 31 Oct 2020 11:00:03 +0000 (12:00 +0100)
committerGiorgio Ravera <giorgio.ravera@gmail.com>
Sat, 31 Oct 2020 11:00:03 +0000 (12:00 +0100)
config.py
const.py
mercedes_me_api.py
oauth.py [new file with mode: 0644]
query.py
resources.py

index e9b167a27d4aa459bc15b993d754a5b0dc4d1309..c5e20616a3a282b912dd6b5cbaeec51d4da6f98c 100644 (file)
--- a/config.py
+++ b/config.py
@@ -7,16 +7,10 @@ For more details about this component, please refer to the documentation at
 https://github.com/xraver/mercedes_me_api/
 """
 from configobj import ConfigObj
-import base64
-import json
 import logging
 import os
-import requests
 
-from query import (
-       GetResource,
-       GetToken
-)
+from oauth import MercedesMeOauth
 from const import *
 
 # Logger
@@ -24,25 +18,14 @@ _LOGGER = logging.getLogger(__name__)
 
 class MercedesMeConfig:
 
-    token_file = ""
-    credentials_file = ""
-    resources_file = ""
-    client_id = ""
-    client_secret = ""
-    vin = ""
-    base64 = ""
-    access_token = ""
-    refresh_token = ""
-    token_expires_in = ""
-
     ########################
     # Init
     ########################
     def __init__(self):
-        # Files
-        self.token_file = TOKEN_FILE
         self.credentials_file = CREDENTIAL_FILE
-        self.resources_file = RESOURCES_FILE
+        self.client_id = ""
+        self.client_secret = ""
+        self.vin = ""
 
     ########################
     # Read Configuration
@@ -50,7 +33,7 @@ class MercedesMeConfig:
     def ReadConfig(self):
         needToRefresh = False
 
-        # Read credentials from file
+        # Read Credentials from file
         if not os.path.isfile(self.credentials_file):
             _LOGGER.error ("Credential File " + self.credentials_file + " not found")
             return False
@@ -74,117 +57,8 @@ class MercedesMeConfig:
         if not self.vin:
             _LOGGER.error ("No "+ CONF_VEHICLE_ID + " found in the configuration")
             return False
-        # Base64
-        b64_str = self.client_id + ":" + self.client_secret
-        b64_bytes = base64.b64encode( b64_str.encode('ascii') )
-        self.base64 = b64_bytes.decode('ascii')
 
         # Read Token
-        if not os.path.isfile(self.token_file):
-            _LOGGER.error ("Token File missing - Creating a new one")
-            if (not self.CreateToken()):
-                _LOGGER.error ("Error creating token")
-                return False
-        else:
-            with open(self.token_file, 'r') as file:
-                try:
-                    token = json.load(file)
-                except ValueError:
-                    token = None
-                if self.CheckToken(token):
-                    # Save Token
-                    self.access_token = token['access_token']
-                    self.refresh_token = token['refresh_token']
-                    self.token_expires_in = token['expires_in']
-                    needToRefresh = True
-                else:
-                    _LOGGER.error ("Token File not correct - Creating a new one")
-                    if (not self.CreateToken()):
-                        _LOGGER.error ("Error creating token")
-                        return False
-
-        if (needToRefresh):
-            if (not self.RefreshToken()):
-                _LOGGER.error ("Error refreshing token")
-                return False
-
-        return True
-
-    ########################
-    # Write Token
-    ########################
-    def WriteToken(self, token):
-        with open(self.token_file, 'w') as file:
-            json.dump(token, file)
-
-    ########################
-    # Check Token
-    ########################
-    def CheckToken(self, token):
-        if token == None:
-            _LOGGER.error ("Error reading token.")
-            return False
-        if "error" in token:
-            if "error_description" in token:
-                _LOGGER.error ("Error retriving token: " + token["error_description"])
-            else:
-                _LOGGER.error ("Error retriving token: " + token["error"])
-            return False
-        if len(token) == 0:
-            _LOGGER.error ("Empty token found.")
-            return False
-        if not 'access_token' in token:
-            _LOGGER.error ("Access token not present.")
-            return False
-        if not 'refresh_token' in token:
-            _LOGGER.error ("Refresh token not present.")
-            return False
-        return True
-
-    ########################
-    # Create Token
-    ########################
-    def CreateToken(self):
-
-        auth_url = (
-            f"{URL_OAUTH_AUTH}&" +
-            f"client_id={self.client_id}&" + 
-            f"redirect_uri={REDIRECT_URL}&" + 
-            f"scope={SCOPE}"
-        )
-
-        print( "Open the browser and insert this link:\n" )
-        print(auth_url + "\n")
-        print( "Copy the code in the url:")
-        auth_code = input()
-
-        token = GetToken(self, refresh=False, auth_code=auth_code)
-
-        # Check Token
-        if not self.CheckToken(token):
-            return False
-        else:
-            # Save Token
-            self.WriteToken(token)
-            self.access_token = token['access_token']
-            self.refresh_token = token['refresh_token']
-            self.token_expires_in = token['expires_in']
-            return True
-
-    ########################
-    # Refresh Token
-    ########################
-    def RefreshToken(self):
-
-        token = GetToken(self, refresh=True)
-
-        # Check Token
-        if not self.CheckToken(token):
-            return False
-        else:
-            # Save Token
-            self.WriteToken(token)
-            self.access_token = token['access_token']
-            self.refresh_token = token['refresh_token']
-            self.token_expires_in = token['expires_in']
+        self.token = MercedesMeOauth(self.client_id, self.client_secret)
+        self.token.ReadToken()
         return True
index e6136e067c29a8f80d93a1bca722889bfa07c111..62f30344d1bfea1fc76a8096b72430e7b8866458 100644 (file)
--- a/const.py
+++ b/const.py
@@ -15,9 +15,6 @@ CREDENTIAL_FILE = ".mercedesme_credentials"
 RESOURCES_FILE = ".mercedesme_resources"
 REDIRECT_URL = "https://localhost"
 SCOPE = "mb:vehicle:mbdata:fuelstatus%20mb:vehicle:mbdata:vehiclestatus%20mb:vehicle:mbdata:vehiclelock%20offline_access"
-URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"
-URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"
-URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"
 URL_RES_PREFIX = "https://api.mercedes-benz.com/vehicledata/v2"
 
 # File Parameters
index d7019fde81568478a1844d0697d0bb354270f9cd..3a7ef69e5419e638cc9fa522fae4e4707aef532a 100644 (file)
@@ -59,13 +59,13 @@ if __name__ == "__main__":
 
     # Create Token
     if (args.token == True):
-        if not data.mercedesConfig.CreateToken():
+        if not data.mercedesConfig.token.CreateToken():
             _LOGGER.error ("Error creating token")
             exit (1)
 
     # Refresh Token
     if (args.refresh == True):
-        if not data.mercedesConfig.RefreshToken():
+        if not data.mercedesConfig.token.RefreshToken():
             _LOGGER.error ("Error refreshing token")
             exit (1)
             
diff --git a/oauth.py b/oauth.py
new file mode 100644 (file)
index 0000000..f20ae9e
--- /dev/null
+++ b/oauth.py
@@ -0,0 +1,172 @@
+"""
+Mercedes Me APIs
+
+Author: G. Ravera
+
+For more details about this component, please refer to the documentation at
+https://github.com/xraver/mercedes_me_api/
+"""
+import base64
+import json
+import logging
+import os
+from const import *
+from query import *
+
+URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"
+URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"
+URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"
+
+# Logger
+_LOGGER = logging.getLogger(__name__)
+
+class MercedesMeOauth:
+
+    token_file = ""
+    headers = ""
+    access_token = ""
+    refresh_token = ""
+    token_expires_in = ""
+
+    ########################
+    # Init
+    ########################
+    def __init__(self, client_id, client_secret):
+        self.client_id = client_id
+        self.client_secret = client_secret
+        # Token File
+        self.token_file = TOKEN_FILE
+        # Base64
+        b64_str = client_id + ":" + client_secret
+        b64_bytes = base64.b64encode( b64_str.encode('ascii') )
+        self.base64 = b64_bytes.decode('ascii')
+        # Headers
+        self.headers = {
+            "Authorization": "Basic " + self.base64,
+            "content-type": "application/x-www-form-urlencoded"
+        }
+
+    ########################
+    # Read Token
+    ########################
+    def ReadToken(self):
+
+        found = False
+        needToRefresh = False
+
+        # Read Token
+        if not os.path.isfile(self.token_file):
+            # Token File not present - Creating new one
+            _LOGGER.error ("Token File missing - Creating a new one")
+            found = False
+        else:
+            with open(self.token_file, 'r') as file:
+                try:
+                    token = json.load(file)
+                    if not self.CheckToken(token):
+                        raise ValueError
+                    else:
+                        found = True
+                except ValueError:
+                    _LOGGER.error ("Error reading token file - Creating a new one")
+                    found = False
+
+        if ( not found ):
+            # Not valid or file missing
+            if (not self.CreateToken()):
+                _LOGGER.error ("Error creating token")
+                return False
+        else:
+            # Valid: just import
+            self.access_token = token['access_token']
+            self.refresh_token = token['refresh_token']
+            self.token_expires_in = token['expires_in']
+            needToRefresh = True
+
+        if (needToRefresh):
+            if (not self.RefreshToken()):
+                _LOGGER.error ("Error refreshing token")
+                return False
+
+        return True
+
+    ########################
+    # Write Token
+    ########################
+    def WriteToken(self, token):
+        with open(self.token_file, 'w') as file:
+            json.dump(token, file)
+
+    ########################
+    # Check Token
+    ########################
+    def CheckToken(self, token):
+        if "reason" in token:
+            _LOGGER.error ("Error retriving token - " + token["reason"] + " (" + str(token["code"]) + ")")
+            return False
+        if "error" in token:
+            if "error_description" in token:
+                _LOGGER.error ("Error retriving token: " + token["error_description"])
+            else:
+                _LOGGER.error ("Error retriving token: " + token["error"])
+            return False
+        if len(token) == 0:
+            _LOGGER.error ("Empty token found.")
+            return False
+        if not 'access_token' in token:
+            _LOGGER.error ("Access token not present.")
+            return False
+        if not 'refresh_token' in token:
+            _LOGGER.error ("Refresh token not present.")
+            return False
+        return True
+
+    ########################
+    # Create Token
+    ########################
+    def CreateToken(self):
+
+        auth_url = (
+            f"{URL_OAUTH_AUTH}&" +
+            f"client_id={self.client_id}&" + 
+            f"redirect_uri={REDIRECT_URL}&" + 
+            f"scope={SCOPE}"
+        )
+
+        print( "Open the browser and insert this link:\n" )
+        print(auth_url + "\n")
+        print( "Copy the code in the url:")
+        auth_code = input()
+
+        data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL
+        token = GetToken(URL_OAUTH_TOKEN, self.headers, data)
+
+        # Check Token
+        if not self.CheckToken(token):
+            return False
+        else:
+            # Save Token
+            self.WriteToken(token)
+            self.access_token = token['access_token']
+            self.refresh_token = token['refresh_token']
+            self.token_expires_in = token['expires_in']
+            return True
+
+    ########################
+    # Refresh Token
+    ########################
+    def RefreshToken(self):
+
+        data = "grant_type=refresh_token&refresh_token=" + self.refresh_token
+        token = GetToken(URL_OAUTH_TOKEN, self.headers, data)
+
+        # Check Token
+        if not self.CheckToken(token):
+            return False
+        else:
+            # Save Token
+            self.WriteToken(token)
+            self.access_token = token['access_token']
+            self.refresh_token = token['refresh_token']
+            self.token_expires_in = token['expires_in']
+        return True
index 39799b07f3507c764445ac3e7bb06ded0c632cea..733c6bd8bcfe701fcb9bde6eec62a7cef90ef06b 100644 (file)
--- a/query.py
+++ b/query.py
@@ -17,12 +17,12 @@ _LOGGER = logging.getLogger(__name__)
 ########################
 # GetResource
 ########################
-def GetResource(resourceName, resourceURL, config):
+def GetResource(resourceURL, config):
 
     # Set Header
     headers = {
         "accept": "application/json;charset=utf-8", 
-        "authorization": "Bearer "+ config.access_token
+        "authorization": "Bearer "+ config.token.access_token
     }
 
     # Send Request
@@ -64,24 +64,30 @@ def GetResource(resourceName, resourceURL, config):
 ########################
 # GetToken
 ########################
-def GetToken(config, refresh=True, auth_code=""):
-    headers = {
-        "Authorization": "Basic " + config.base64,
-        "content-type": "application/x-www-form-urlencoded"
-    }
-
-    if (not refresh):
-        # New Token
-        data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL
-    else:
-        # Refresh
-        data = "grant_type=refresh_token&refresh_token=" + config.refresh_token
-
-    res = requests.post(URL_OAUTH_TOKEN, data = data, headers = headers)
+def GetToken(tokenURL, headers, data):
+    res = requests.post(tokenURL, data = data, headers = headers)
     try:
-        token = res.json()
+        data = res.json()
     except ValueError:
         _LOGGER.error ("Error retriving token " + str(res.status_code))
-        return None
+        data = { "reason": "No Data",
+                 "code" : res.status_code 
+        }
+
+    # Check Error
+    if not res.ok:
+        if ("reason" in data):
+            reason = data["reason"]
+        else:
+            if res.status_code == 302:
+                reason = "The request scope is invalid"
+            elif res.status_code == 400:
+                reason = "The redirect_uri differs from the registered one"
+            elif res.status_code == 401:
+                reason = "The specified client ID is invalid"
+            else:
+                reason = "Generic Error"
+        data["reason"] = reason
+        data["code"] = res.status_code
 
-    return token
+    return data
index abf2913b1987cd20b85acde525ce46c5c41c96ef..5691f0e0d3d5e471870cea2a218cd536cc75350e 100644 (file)
@@ -11,8 +11,8 @@ import json
 import os
 
 from config import MercedesMeConfig
-from query import GetResource
 from const import *
+from query import *
 
 # Logger
 _LOGGER = logging.getLogger(__name__)
@@ -66,13 +66,11 @@ class MercedesMeResource:
 
     def update(self):
         """Fetch new state data for the sensor."""
-        resName = self._name
-        resURL = URL_RES_PREFIX + self._href
-        result = GetResource(resName, resURL, self._config)
+        result = GetResource(URL_RES_PREFIX + self._href, self._config)
         if not "reason" in result:
             self._valid = True
-            self._timestamp = result[resName]["timestamp"]
-            self._state = result[resName]["value"]
+            self._timestamp = result[self._name]["timestamp"]
+            self._state = result[self._name]["value"]
 
 class MercedesMeResources:
 
@@ -82,6 +80,7 @@ class MercedesMeResources:
     def __init__(self, mercedesConfig):
 
         self.database = []
+        self.resources_file = RESOURCES_FILE
         self.mercedesConfig = mercedesConfig
 
     ########################
@@ -92,12 +91,12 @@ class MercedesMeResources:
         found = False
         resources = None
 
-        if not os.path.isfile(self.mercedesConfig.resources_file):
+        if not os.path.isfile(self.resources_file):
             # Resources File not present - Retriving new one from server
             _LOGGER.error ("Resource File missing - Creating a new one.")
             found = False
         else:
-            with open(self.mercedesConfig.resources_file, 'r') as file:
+            with open(self.resources_file, 'r') as file:
                 try:
                     resources = json.load(file)
                     if (not self.CheckResources(resources)):
@@ -147,9 +146,8 @@ class MercedesMeResources:
     # Retrive Resources List
     ########################
     def RetriveResourcesList(self):
-        resName = "resources"
-        resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/" + resName
-        resources = GetResource(resName, resURL, self.mercedesConfig)
+        resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/resources"
+        resources = GetResource(resURL, self.mercedesConfig)
         if not self.CheckResources(resources):
             _LOGGER.error ("Error retriving available resources")
             return None
@@ -175,7 +173,7 @@ class MercedesMeResources:
         for res in self.database:
             output.append( res.getJson() )
         # Write File
-        with open(self.mercedesConfig.resources_file, 'w') as file:
+        with open(self.resources_file, 'w') as file:
             json.dump(output, file)
 
     ########################
@@ -202,12 +200,10 @@ class MercedesMeResources:
     ########################
     def UpdateResourcesState(self):
         for res in self.database:
-            resName = res._name
-            resURL = URL_RES_PREFIX + res._href
-            result = GetResource(resName, resURL, self.mercedesConfig)
+            result = GetResource(URL_RES_PREFIX + res._href, self.mercedesConfig)
             if not "reason" in result:
                 res._valid = True
-                res._timestamp = result[resName]["timestamp"]
-                res._state = result[resName]["value"]
+                res._timestamp = result[res._name]["timestamp"]
+                res._state = result[res._name]["value"]
         # Write Resource File
         self.WriteResourcesFile()