https://github.com/xraver/mercedes_me_api/
"""
from configobj import ConfigObj
-import base64
-import json
import logging
import os
-import requests
-from query import (
- GetResource,
- GetToken
-)
+from oauth import MercedesMeOauth
from const import *
# Logger
class MercedesMeConfig:
- token_file = ""
- credentials_file = ""
- resources_file = ""
- client_id = ""
- client_secret = ""
- vin = ""
- base64 = ""
- access_token = ""
- refresh_token = ""
- token_expires_in = ""
-
########################
# Init
########################
def __init__(self):
- # Files
- self.token_file = TOKEN_FILE
self.credentials_file = CREDENTIAL_FILE
- self.resources_file = RESOURCES_FILE
+ self.client_id = ""
+ self.client_secret = ""
+ self.vin = ""
########################
# Read Configuration
def ReadConfig(self):
needToRefresh = False
- # Read credentials from file
+ # Read Credentials from file
if not os.path.isfile(self.credentials_file):
_LOGGER.error ("Credential File " + self.credentials_file + " not found")
return False
if not self.vin:
_LOGGER.error ("No "+ CONF_VEHICLE_ID + " found in the configuration")
return False
- # Base64
- b64_str = self.client_id + ":" + self.client_secret
- b64_bytes = base64.b64encode( b64_str.encode('ascii') )
- self.base64 = b64_bytes.decode('ascii')
# Read Token
- if not os.path.isfile(self.token_file):
- _LOGGER.error ("Token File missing - Creating a new one")
- if (not self.CreateToken()):
- _LOGGER.error ("Error creating token")
- return False
- else:
- with open(self.token_file, 'r') as file:
- try:
- token = json.load(file)
- except ValueError:
- token = None
- if self.CheckToken(token):
- # Save Token
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
- needToRefresh = True
- else:
- _LOGGER.error ("Token File not correct - Creating a new one")
- if (not self.CreateToken()):
- _LOGGER.error ("Error creating token")
- return False
-
- if (needToRefresh):
- if (not self.RefreshToken()):
- _LOGGER.error ("Error refreshing token")
- return False
-
- return True
-
- ########################
- # Write Token
- ########################
- def WriteToken(self, token):
- with open(self.token_file, 'w') as file:
- json.dump(token, file)
-
- ########################
- # Check Token
- ########################
- def CheckToken(self, token):
- if token == None:
- _LOGGER.error ("Error reading token.")
- return False
- if "error" in token:
- if "error_description" in token:
- _LOGGER.error ("Error retriving token: " + token["error_description"])
- else:
- _LOGGER.error ("Error retriving token: " + token["error"])
- return False
- if len(token) == 0:
- _LOGGER.error ("Empty token found.")
- return False
- if not 'access_token' in token:
- _LOGGER.error ("Access token not present.")
- return False
- if not 'refresh_token' in token:
- _LOGGER.error ("Refresh token not present.")
- return False
- return True
-
- ########################
- # Create Token
- ########################
- def CreateToken(self):
-
- auth_url = (
- f"{URL_OAUTH_AUTH}&" +
- f"client_id={self.client_id}&" +
- f"redirect_uri={REDIRECT_URL}&" +
- f"scope={SCOPE}"
- )
-
- print( "Open the browser and insert this link:\n" )
- print(auth_url + "\n")
- print( "Copy the code in the url:")
- auth_code = input()
-
- token = GetToken(self, refresh=False, auth_code=auth_code)
-
- # Check Token
- if not self.CheckToken(token):
- return False
- else:
- # Save Token
- self.WriteToken(token)
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
- return True
-
- ########################
- # Refresh Token
- ########################
- def RefreshToken(self):
-
- token = GetToken(self, refresh=True)
-
- # Check Token
- if not self.CheckToken(token):
- return False
- else:
- # Save Token
- self.WriteToken(token)
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
+ self.token = MercedesMeOauth(self.client_id, self.client_secret)
+ self.token.ReadToken()
return True
RESOURCES_FILE = ".mercedesme_resources"
REDIRECT_URL = "https://localhost"
SCOPE = "mb:vehicle:mbdata:fuelstatus%20mb:vehicle:mbdata:vehiclestatus%20mb:vehicle:mbdata:vehiclelock%20offline_access"
-URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"
-URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"
-URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"
URL_RES_PREFIX = "https://api.mercedes-benz.com/vehicledata/v2"
# File Parameters
# Create Token
if (args.token == True):
- if not data.mercedesConfig.CreateToken():
+ if not data.mercedesConfig.token.CreateToken():
_LOGGER.error ("Error creating token")
exit (1)
# Refresh Token
if (args.refresh == True):
- if not data.mercedesConfig.RefreshToken():
+ if not data.mercedesConfig.token.RefreshToken():
_LOGGER.error ("Error refreshing token")
exit (1)
--- /dev/null
+"""
+Mercedes Me APIs
+
+Author: G. Ravera
+
+For more details about this component, please refer to the documentation at
+https://github.com/xraver/mercedes_me_api/
+"""
+import base64
+import json
+import logging
+import os
+from const import *
+from query import *
+
+URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"
+URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"
+URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"
+
+# Logger
+_LOGGER = logging.getLogger(__name__)
+
+class MercedesMeOauth:
+
+ token_file = ""
+ headers = ""
+ access_token = ""
+ refresh_token = ""
+ token_expires_in = ""
+
+ ########################
+ # Init
+ ########################
+ def __init__(self, client_id, client_secret):
+ self.client_id = client_id
+ self.client_secret = client_secret
+ # Token File
+ self.token_file = TOKEN_FILE
+ # Base64
+ b64_str = client_id + ":" + client_secret
+ b64_bytes = base64.b64encode( b64_str.encode('ascii') )
+ self.base64 = b64_bytes.decode('ascii')
+ # Headers
+ self.headers = {
+ "Authorization": "Basic " + self.base64,
+ "content-type": "application/x-www-form-urlencoded"
+ }
+
+ ########################
+ # Read Token
+ ########################
+ def ReadToken(self):
+
+ found = False
+ needToRefresh = False
+
+ # Read Token
+ if not os.path.isfile(self.token_file):
+ # Token File not present - Creating new one
+ _LOGGER.error ("Token File missing - Creating a new one")
+ found = False
+ else:
+ with open(self.token_file, 'r') as file:
+ try:
+ token = json.load(file)
+ if not self.CheckToken(token):
+ raise ValueError
+ else:
+ found = True
+ except ValueError:
+ _LOGGER.error ("Error reading token file - Creating a new one")
+ found = False
+
+ if ( not found ):
+ # Not valid or file missing
+ if (not self.CreateToken()):
+ _LOGGER.error ("Error creating token")
+ return False
+ else:
+ # Valid: just import
+ self.access_token = token['access_token']
+ self.refresh_token = token['refresh_token']
+ self.token_expires_in = token['expires_in']
+ needToRefresh = True
+
+ if (needToRefresh):
+ if (not self.RefreshToken()):
+ _LOGGER.error ("Error refreshing token")
+ return False
+
+ return True
+
+ ########################
+ # Write Token
+ ########################
+ def WriteToken(self, token):
+ with open(self.token_file, 'w') as file:
+ json.dump(token, file)
+
+ ########################
+ # Check Token
+ ########################
+ def CheckToken(self, token):
+ if "reason" in token:
+ _LOGGER.error ("Error retriving token - " + token["reason"] + " (" + str(token["code"]) + ")")
+ return False
+ if "error" in token:
+ if "error_description" in token:
+ _LOGGER.error ("Error retriving token: " + token["error_description"])
+ else:
+ _LOGGER.error ("Error retriving token: " + token["error"])
+ return False
+ if len(token) == 0:
+ _LOGGER.error ("Empty token found.")
+ return False
+ if not 'access_token' in token:
+ _LOGGER.error ("Access token not present.")
+ return False
+ if not 'refresh_token' in token:
+ _LOGGER.error ("Refresh token not present.")
+ return False
+ return True
+
+ ########################
+ # Create Token
+ ########################
+ def CreateToken(self):
+
+ auth_url = (
+ f"{URL_OAUTH_AUTH}&" +
+ f"client_id={self.client_id}&" +
+ f"redirect_uri={REDIRECT_URL}&" +
+ f"scope={SCOPE}"
+ )
+
+ print( "Open the browser and insert this link:\n" )
+ print(auth_url + "\n")
+ print( "Copy the code in the url:")
+ auth_code = input()
+
+ data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL
+ token = GetToken(URL_OAUTH_TOKEN, self.headers, data)
+
+ # Check Token
+ if not self.CheckToken(token):
+ return False
+ else:
+ # Save Token
+ self.WriteToken(token)
+ self.access_token = token['access_token']
+ self.refresh_token = token['refresh_token']
+ self.token_expires_in = token['expires_in']
+ return True
+
+ ########################
+ # Refresh Token
+ ########################
+ def RefreshToken(self):
+
+ data = "grant_type=refresh_token&refresh_token=" + self.refresh_token
+ token = GetToken(URL_OAUTH_TOKEN, self.headers, data)
+
+ # Check Token
+ if not self.CheckToken(token):
+ return False
+ else:
+ # Save Token
+ self.WriteToken(token)
+ self.access_token = token['access_token']
+ self.refresh_token = token['refresh_token']
+ self.token_expires_in = token['expires_in']
+ return True
########################
# GetResource
########################
-def GetResource(resourceName, resourceURL, config):
+def GetResource(resourceURL, config):
# Set Header
headers = {
"accept": "application/json;charset=utf-8",
- "authorization": "Bearer "+ config.access_token
+ "authorization": "Bearer "+ config.token.access_token
}
# Send Request
########################
# GetToken
########################
-def GetToken(config, refresh=True, auth_code=""):
- headers = {
- "Authorization": "Basic " + config.base64,
- "content-type": "application/x-www-form-urlencoded"
- }
-
- if (not refresh):
- # New Token
- data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL
- else:
- # Refresh
- data = "grant_type=refresh_token&refresh_token=" + config.refresh_token
-
- res = requests.post(URL_OAUTH_TOKEN, data = data, headers = headers)
+def GetToken(tokenURL, headers, data):
+ res = requests.post(tokenURL, data = data, headers = headers)
try:
- token = res.json()
+ data = res.json()
except ValueError:
_LOGGER.error ("Error retriving token " + str(res.status_code))
- return None
+ data = { "reason": "No Data",
+ "code" : res.status_code
+ }
+
+ # Check Error
+ if not res.ok:
+ if ("reason" in data):
+ reason = data["reason"]
+ else:
+ if res.status_code == 302:
+ reason = "The request scope is invalid"
+ elif res.status_code == 400:
+ reason = "The redirect_uri differs from the registered one"
+ elif res.status_code == 401:
+ reason = "The specified client ID is invalid"
+ else:
+ reason = "Generic Error"
+ data["reason"] = reason
+ data["code"] = res.status_code
- return token
+ return data
import os
from config import MercedesMeConfig
-from query import GetResource
from const import *
+from query import *
# Logger
_LOGGER = logging.getLogger(__name__)
def update(self):
"""Fetch new state data for the sensor."""
- resName = self._name
- resURL = URL_RES_PREFIX + self._href
- result = GetResource(resName, resURL, self._config)
+ result = GetResource(URL_RES_PREFIX + self._href, self._config)
if not "reason" in result:
self._valid = True
- self._timestamp = result[resName]["timestamp"]
- self._state = result[resName]["value"]
+ self._timestamp = result[self._name]["timestamp"]
+ self._state = result[self._name]["value"]
class MercedesMeResources:
def __init__(self, mercedesConfig):
self.database = []
+ self.resources_file = RESOURCES_FILE
self.mercedesConfig = mercedesConfig
########################
found = False
resources = None
- if not os.path.isfile(self.mercedesConfig.resources_file):
+ if not os.path.isfile(self.resources_file):
# Resources File not present - Retriving new one from server
_LOGGER.error ("Resource File missing - Creating a new one.")
found = False
else:
- with open(self.mercedesConfig.resources_file, 'r') as file:
+ with open(self.resources_file, 'r') as file:
try:
resources = json.load(file)
if (not self.CheckResources(resources)):
# Retrive Resources List
########################
def RetriveResourcesList(self):
- resName = "resources"
- resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/" + resName
- resources = GetResource(resName, resURL, self.mercedesConfig)
+ resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/resources"
+ resources = GetResource(resURL, self.mercedesConfig)
if not self.CheckResources(resources):
_LOGGER.error ("Error retriving available resources")
return None
for res in self.database:
output.append( res.getJson() )
# Write File
- with open(self.mercedesConfig.resources_file, 'w') as file:
+ with open(self.resources_file, 'w') as file:
json.dump(output, file)
########################
########################
def UpdateResourcesState(self):
for res in self.database:
- resName = res._name
- resURL = URL_RES_PREFIX + res._href
- result = GetResource(resName, resURL, self.mercedesConfig)
+ result = GetResource(URL_RES_PREFIX + res._href, self.mercedesConfig)
if not "reason" in result:
res._valid = True
- res._timestamp = result[resName]["timestamp"]
- res._state = result[resName]["value"]
+ res._timestamp = result[res._name]["timestamp"]
+ res._state = result[res._name]["value"]
# Write Resource File
self.WriteResourcesFile()