The Home Assistant Custom Component is a component to be added in Home Assistant in order to integrate sensors of a Mercedes Benz car.
This component is still in development.
### Open Points
-- Fix OAUTH2 Authentication & Get the First Token
+- Complete OAUTH2 Authentication & Get the First Token
+- Get state after starts -> now it waits <scan_interval> seconds.
+- Config Flow for automatic configuration
- Log Management
- Bugfix & Software optimizations
import logging
from .config import MercedesMeConfig
+from .oauth import MercedesMeOauth
from .resources import MercedesMeResources
from .const import *
_LOGGER.debug ("End Update")
########################
- # Update State
+ # Update Token
########################
def UpdateToken(self, event_time):
_LOGGER.debug ("Start Refresh Token")
- self.mercedesConfig.RefreshToken()
+ self.mercedesConfig.token.RefreshToken()
#hass.helpers.dispatcher.dispatcher_send(UPDATE_SIGNAL)
_LOGGER.debug ("End Refresh Token")
hass.helpers.event.track_time_interval(data.UpdateState, timedelta(seconds=data.mercedesConfig.scan_interval))
# Create Task to Update Token
- hass.helpers.event.track_time_interval(data.UpdateToken, timedelta(seconds=data.mercedesConfig.token_expires_in))
+ hass.helpers.event.track_time_interval(data.UpdateToken, timedelta(seconds=data.mercedesConfig.token.token_expires_in))
return True
\ No newline at end of file
For more details about this component, please refer to the documentation at
https://github.com/xraver/mercedes_me_api/
"""
-import base64
-import json
import logging
-import os
-import requests
import voluptuous as vol
from homeassistant.const import (
from homeassistant.helpers import discovery, config_validation as cv
-from .query import (
- GetResource,
- GetToken
-)
+from .oauth import MercedesMeOauth
from .const import *
# Logger
class MercedesMeConfig:
- token_file = ""
- credentials_file = ""
- resources_file = ""
- client_id = ""
- client_secret = ""
- vin = ""
- scan_interval = ""
- base64 = ""
- access_token = ""
- refresh_token = ""
- token_expires_in = ""
-
########################
# Init
########################
# Home Assistant structures
self.hass = hass
self.config = config
- # Files
- self.token_file = self.hass.config.path(TOKEN_FILE)
- self.resources_file = self.hass.config.path(RESOURCES_FILE)
+ self.client_id = ""
+ self.client_secret = ""
+ self.vin = ""
+ self.token = ""
########################
# Read Configuration
self.vin = config[CONF_VEHICLE_ID]
# Scan Interval
self.scan_interval = config[CONF_SCAN_INTERVAL]
- # Base64
- b64_str = self.client_id + ":" + self.client_secret
- b64_bytes = base64.b64encode( b64_str.encode('ascii') )
- self.base64 = b64_bytes.decode('ascii')
-
# Read Token
- if not os.path.isfile(self.token_file):
- _LOGGER.error ("Token File missing - Creating a new one")
- #if (not self.CreateToken()): GRGR -> to be fixed
- if (True):
- _LOGGER.error ("Error creating token")
- return False
- else:
- with open(self.token_file, 'r') as file:
- try:
- token = json.load(file)
- except ValueError:
- token = None
- if self.CheckToken(token):
- # Save Token
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
- needToRefresh = True
- else:
- _LOGGER.error ("Token File not correct - Creating a new one")
- #if (not self.CreateToken()):
- if (True): # GRGR -> to be fixed
- _LOGGER.error ("Error creating token")
- return False
-
- if (needToRefresh):
- if (not self.RefreshToken()):
- _LOGGER.error ("Error refreshing token")
- return False
-
- return True
-
- ########################
- # Write Token
- ########################
- def WriteToken(self, token):
- with open(self.token_file, 'w') as file:
- json.dump(token, file)
-
- ########################
- # Check Token
- ########################
- def CheckToken(self, token):
- if token == None:
- _LOGGER.error ("Error reading token.")
- return False
- if "error" in token:
- if "error_description" in token:
- _LOGGER.error ("Error retriving token: " + token["error_description"])
- else:
- _LOGGER.error ("Error retriving token: " + token["error"])
- return False
- if len(token) == 0:
- _LOGGER.error ("Empty token found.")
- return False
- if not 'access_token' in token:
- _LOGGER.error ("Access token not present.")
- return False
- if not 'refresh_token' in token:
- _LOGGER.error ("Refresh token not present.")
- return False
- return True
-
- ########################
- # Create Token
- ########################
- def CreateToken(self):
-
- auth_url = (
- f"{URL_OAUTH_AUTH}&" +
- f"client_id={self.client_id}&" +
- f"redirect_uri={REDIRECT_URL}&" +
- f"scope={SCOPE}"
- )
-
- print( "Open the browser and insert this link:\n" )
- print(auth_url + "\n")
- print( "Copy the code in the url:")
- auth_code = input()
-
- token = GetToken(self, refresh=False, auth_code=auth_code)
-
- # Check Token
- if not self.CheckToken(token):
- return False
- else:
- # Save Token
- self.WriteToken(token)
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
- return True
-
- ########################
- # Refresh Token
- ########################
- def RefreshToken(self):
-
- token = GetToken(self, refresh=True)
-
- # Check Token
- if not self.CheckToken(token):
+ self.token = MercedesMeOauth(self.hass, self.client_id, self.client_secret)
+ if not self.token.ReadToken():
return False
- else:
- # Save Token
- self.WriteToken(token)
- self.access_token = token['access_token']
- self.refresh_token = token['refresh_token']
- self.token_expires_in = token['expires_in']
+
return True
RESOURCES_FILE = ".mercedesme_resources"
REDIRECT_URL = "https://localhost"
SCOPE = "mb:vehicle:mbdata:fuelstatus%20mb:vehicle:mbdata:vehiclestatus%20mb:vehicle:mbdata:vehiclelock%20offline_access"
-URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"
-URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"
-URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"
URL_RES_PREFIX = "https://api.mercedes-benz.com/vehicledata/v2"
#UPDATE_SIGNAL = "mercedesmeapi_update"
--- /dev/null
+"""\r
+Mercedes Me APIs\r
+\r
+Author: G. Ravera\r
+\r
+For more details about this component, please refer to the documentation at\r
+https://github.com/xraver/mercedes_me_api/\r
+"""\r
+import base64\r
+import json\r
+import logging\r
+import os\r
+from .const import *\r
+from .query import *\r
+\r
+URL_OAUTH_BASE = "https://id.mercedes-benz.com/as"\r
+URL_OAUTH_AUTH = URL_OAUTH_BASE + "/authorization.oauth2?response_type=code"\r
+URL_OAUTH_TOKEN = URL_OAUTH_BASE + "/token.oauth2"\r
+\r
+# Logger\r
+_LOGGER = logging.getLogger(__name__)\r
+\r
+class MercedesMeOauth:\r
+\r
+ ########################\r
+ # Init\r
+ ########################\r
+ def __init__(self, hass, client_id, client_secret):\r
+ # Access Token\r
+ self.access_token = ""\r
+ # Refresh Token\r
+ self.refresh_token = ""\r
+ # Expiration Time\r
+ self.token_expires_in = ""\r
+ # Client ID\r
+ self.client_id = client_id\r
+ # Client Secret\r
+ self.client_secret = client_secret\r
+ # Token File\r
+ self.token_file = hass.config.path(TOKEN_FILE)\r
+ # Base64\r
+ b64_str = client_id + ":" + client_secret\r
+ b64_bytes = base64.b64encode( b64_str.encode('ascii') )\r
+ self.base64 = b64_bytes.decode('ascii')\r
+ # Headers\r
+ self.headers = {\r
+ "Authorization": "Basic " + self.base64,\r
+ "content-type": "application/x-www-form-urlencoded"\r
+ }\r
+\r
+ ########################\r
+ # Read Token\r
+ ########################\r
+ def ReadToken(self):\r
+\r
+ found = False\r
+ needToRefresh = False\r
+\r
+ # Read Token\r
+ if not os.path.isfile(self.token_file):\r
+ # Token File not present - Creating new one\r
+ _LOGGER.error ("Token File missing - Creating a new one")\r
+ found = False\r
+ else:\r
+ with open(self.token_file, 'r') as file:\r
+ try:\r
+ token = json.load(file)\r
+ if not self.CheckToken(token):\r
+ raise ValueError\r
+ else:\r
+ found = True\r
+ except ValueError:\r
+ _LOGGER.error ("Error reading token file - Creating a new one")\r
+ found = False\r
+\r
+ if ( not found ):\r
+ # Not valid or file missing\r
+ #if (not self.CreateToken()): GRGR -> to be fixed\r
+ if (True):\r
+ _LOGGER.error ("Error creating token")\r
+ return False\r
+ else:\r
+ # Valid: just import\r
+ self.access_token = token['access_token']\r
+ self.refresh_token = token['refresh_token']\r
+ self.token_expires_in = token['expires_in']\r
+ needToRefresh = True\r
+\r
+ if (needToRefresh):\r
+ if (not self.RefreshToken()):\r
+ _LOGGER.error ("Error refreshing token")\r
+ return False\r
+\r
+ return True\r
+\r
+ ########################\r
+ # Write Token\r
+ ########################\r
+ def WriteToken(self, token):\r
+ with open(self.token_file, 'w') as file:\r
+ json.dump(token, file)\r
+\r
+ ########################\r
+ # Check Token\r
+ ########################\r
+ def CheckToken(self, token):\r
+ if "reason" in token:\r
+ _LOGGER.error ("Error retriving token - " + token["reason"] + " (" + str(token["code"]) + ")")\r
+ return False\r
+ if "error" in token:\r
+ if "error_description" in token:\r
+ _LOGGER.error ("Error retriving token: " + token["error_description"])\r
+ else:\r
+ _LOGGER.error ("Error retriving token: " + token["error"])\r
+ return False\r
+ if len(token) == 0:\r
+ _LOGGER.error ("Empty token found.")\r
+ return False\r
+ if not 'access_token' in token:\r
+ _LOGGER.error ("Access token not present.")\r
+ return False\r
+ if not 'refresh_token' in token:\r
+ _LOGGER.error ("Refresh token not present.")\r
+ return False\r
+ return True\r
+\r
+ ########################\r
+ # Create Token\r
+ ########################\r
+ def CreateToken(self):\r
+\r
+ auth_url = (\r
+ f"{URL_OAUTH_AUTH}&" +\r
+ f"client_id={self.client_id}&" + \r
+ f"redirect_uri={REDIRECT_URL}&" + \r
+ f"scope={SCOPE}"\r
+ )\r
+\r
+ print( "Open the browser and insert this link:\n" )\r
+ print(auth_url + "\n")\r
+ print( "Copy the code in the url:")\r
+ auth_code = input()\r
+\r
+ data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL\r
+ token = GetToken(URL_OAUTH_TOKEN, self.headers, data)\r
+\r
+ # Check Token\r
+ if not self.CheckToken(token):\r
+ return False\r
+ else:\r
+ # Save Token\r
+ self.WriteToken(token)\r
+ self.access_token = token['access_token']\r
+ self.refresh_token = token['refresh_token']\r
+ self.token_expires_in = token['expires_in']\r
+ return True\r
+\r
+ ########################\r
+ # Refresh Token\r
+ ########################\r
+ def RefreshToken(self):\r
+\r
+ data = "grant_type=refresh_token&refresh_token=" + self.refresh_token\r
+ token = GetToken(URL_OAUTH_TOKEN, self.headers, data)\r
+\r
+ # Check Token\r
+ if not self.CheckToken(token):\r
+ return False\r
+ else:\r
+ # Save Token\r
+ self.WriteToken(token)\r
+ self.access_token = token['access_token']\r
+ self.refresh_token = token['refresh_token']\r
+ self.token_expires_in = token['expires_in']\r
+ return True\r
########################
# GetResource
########################
-def GetResource(resourceName, resourceURL, config):
+def GetResource(resourceURL, config):
# Set Header
headers = {
"accept": "application/json;charset=utf-8",
- "authorization": "Bearer "+ config.access_token
+ "authorization": "Bearer "+ config.token.access_token
}
# Send Request
########################
# GetToken
########################
-def GetToken(config, refresh=True, auth_code=""):
- headers = {
- "Authorization": "Basic " + config.base64,
- "content-type": "application/x-www-form-urlencoded"
- }
-
- if (not refresh):
- # New Token
- data = "grant_type=authorization_code&code=" + auth_code + "&redirect_uri=" + REDIRECT_URL
- else:
- # Refresh
- data = "grant_type=refresh_token&refresh_token=" + config.refresh_token
-
- res = requests.post(URL_OAUTH_TOKEN, data = data, headers = headers)
+def GetToken(tokenURL, headers, data):
+ res = requests.post(tokenURL, data = data, headers = headers)
try:
- token = res.json()
+ data = res.json()
except ValueError:
_LOGGER.error ("Error retriving token " + str(res.status_code))
- return None
+ data = { "reason": "No Data",
+ "code" : res.status_code
+ }
+
+ # Check Error
+ if not res.ok:
+ if ("reason" in data):
+ reason = data["reason"]
+ else:
+ if res.status_code == 302:
+ reason = "The request scope is invalid"
+ elif res.status_code == 400:
+ reason = "The redirect_uri differs from the registered one"
+ elif res.status_code == 401:
+ reason = "The specified client ID is invalid"
+ else:
+ reason = "Generic Error"
+ data["reason"] = reason
+ data["code"] = res.status_code
- return token
+ return data
from homeassistant.helpers.entity import Entity
from .config import MercedesMeConfig
-from .query import GetResource
from .const import *
+from .query import *
# Logger
_LOGGER = logging.getLogger(__name__)
self._version = version
self._href = href
self._vin = vin
- self._vin = "mercedes" # GRGR -> test
self._state = state
self._timestamp = timestamp
self._valid = valid
self.database = []
self.mercedesConfig = mercedesConfig
+ self.resources_file = mercedesConfig.hass.config.path(RESOURCES_FILE)
########################
# Read Resources
found = False
resources = None
- if not os.path.isfile(self.mercedesConfig.resources_file):
+ if not os.path.isfile(self.resources_file):
# Resources File not present - Retriving new one from server
_LOGGER.error ("Resource File missing - Creating a new one.")
found = False
else:
- with open(self.mercedesConfig.resources_file, 'r') as file:
+ with open(self.resources_file, 'r') as file:
try:
resources = json.load(file)
if (not self.CheckResources(resources)):
# Retrive Resources List
########################
def RetriveResourcesList(self):
- resName = "resources"
- resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/" + resName
- resources = GetResource(resName, resURL, self.mercedesConfig)
+ resURL = URL_RES_PREFIX + "/vehicles/" + self.mercedesConfig.vin + "/resources"
+ resources = GetResource(resURL, self.mercedesConfig)
if not self.CheckResources(resources):
_LOGGER.error ("Error retriving available resources")
return None
for res in self.database:
output.append( res.getJson() )
# Write File
- with open(self.mercedesConfig.resources_file, 'w') as file:
+ with open(self.resources_file, 'w') as file:
json.dump(output, file)
########################
########################
def UpdateResourcesState(self):
for res in self.database:
- resName = res._name
- resURL = URL_RES_PREFIX + res._href
- result = GetResource(resName, resURL, self.mercedesConfig)
+ result = GetResource(URL_RES_PREFIX + res._href, self.mercedesConfig)
if not "reason" in result:
res._valid = True
- res._timestamp = result[resName]["timestamp"]
- res._state = result[resName]["value"]
+ res._timestamp = result[res._name]["timestamp"]
+ res._state = result[res._name]["value"]
# Write Resource File
self.WriteResourcesFile()
def __init__(self, mercedesConfig):
self.database = []
- self.resources_file = RESOURCES_FILE
self.mercedesConfig = mercedesConfig
+ self.resources_file = RESOURCES_FILE
########################
# Read Resources