For more details about this component, please refer to the documentation at
https://github.com/xraver/mercedes_me_api/
"""
--from configobj import ConfigObj
import logging
import os
--from oauth import MercedesMeOauth
++from configobj import ConfigObj
++
from const import *
++from oauth import MercedesMeOauth
# Logger
_LOGGER = logging.getLogger(__name__)
++
class MercedesMeConfig:
########################
# Read Credentials from file
if not os.path.isfile(self.credentials_file):
-- _LOGGER.error (f"Credential File {self.credentials_file} not found")
++ _LOGGER.error(f"Credential File {self.credentials_file} not found")
return False
try:
f = ConfigObj(self.credentials_file)
except Exception:
-- _LOGGER.error (f"Wrong {self.credentials_file} file found")
++ _LOGGER.error(f"Wrong {self.credentials_file} file found")
return False
# Client ID
self.client_id = f.get(CONF_CLIENT_ID)
if not self.client_id:
-- _LOGGER.error ("No {CONF_CLIENT_ID} found in the configuration")
++ _LOGGER.error("No {CONF_CLIENT_ID} found in the configuration")
return False
# Client Secret
self.client_secret = f.get(CONF_CLIENT_SECRET)
if not self.client_secret:
-- _LOGGER.error ("No {CONF_CLIENT_SECRET} found in the configuration")
++ _LOGGER.error("No {CONF_CLIENT_SECRET} found in the configuration")
return False
# Vehicle ID
self.vin = f.get(CONF_VEHICLE_ID)
if not self.vin:
-- _LOGGER.error ("No {CONF_VEHICLE_ID} found in the configuration")
++ _LOGGER.error("No {CONF_VEHICLE_ID} found in the configuration")
return False
# Read Token
self.token = MercedesMeOauth(self.client_id, self.client_secret)
if not self.token.ReadToken():
return False
--
++
return True
import sys
from config import MercedesMeConfig
--from resources import MercedesMeResources
from const import *
++from resources import MercedesMeResources
# Logger
_LOGGER = logging.getLogger(__name__)
++
class MercedesMeData:
def __init__(self):
# Configuration Data
# Resource Data
self.mercedesResources = MercedesMeResources(self.mercedesConfig)
++
########################
# Parse Input
########################
def ParseInput():
parser = argparse.ArgumentParser()
-- parser.add_argument('-t', '--token', action='store_true', help="Procedure to obtatin the Access Token")
-- parser.add_argument('-r', '--refresh', action='store_true', help="Procedure to refresh the Access Token")
-- parser.add_argument('-s', '--status', action='store_true', help="Retrieve the Status of your Vehicle")
-- parser.add_argument('-R', '--resources', action='store_true', help="Retrieve the list of available resources of your Vehicle")
-- parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + VERSION)
--
-- if len(sys.argv)==1:
++ parser.add_argument(
++ "-t",
++ "--token",
++ action="store_true",
++ help="Procedure to obtatin the Access Token",
++ )
++ parser.add_argument(
++ "-r",
++ "--refresh",
++ action="store_true",
++ help="Procedure to refresh the Access Token",
++ )
++ parser.add_argument(
++ "-s",
++ "--status",
++ action="store_true",
++ help="Retrieve the Status of your Vehicle",
++ )
++ parser.add_argument(
++ "-R",
++ "--resources",
++ action="store_true",
++ help="Retrieve the list of available resources of your Vehicle",
++ )
++ parser.add_argument(
++ "-v", "--version", action="version", version="%(prog)s " + VERSION
++ )
++
++ if len(sys.argv) == 1:
parser.print_help(sys.stderr)
exit(1)
return parser.parse_args()
++
########################
# Main
########################
# Reading Configuration
if not data.mercedesConfig.ReadConfig():
-- _LOGGER.error ("Error initializing configuration")
-- exit (1)
++ _LOGGER.error("Error initializing configuration")
++ exit(1)
# Create Token
-- if (args.token == True):
++ if args.token == True:
if not data.mercedesConfig.token.CreateToken():
-- _LOGGER.error ("Error creating token")
-- exit (1)
++ _LOGGER.error("Error creating token")
++ exit(1)
# Refresh Token
-- if (args.refresh == True):
++ if args.refresh == True:
if not data.mercedesConfig.token.RefreshToken():
-- _LOGGER.error ("Error refreshing token")
-- exit (1)
--
++ _LOGGER.error("Error refreshing token")
++ exit(1)
++
# Read Resources
if not data.mercedesResources.ReadResources():
-- _LOGGER.error ("Error initializing resources")
-- exit (1)
++ _LOGGER.error("Error initializing resources")
++ exit(1)
# Print Available Resources
-- if (args.resources):
++ if args.resources:
data.mercedesResources.PrintAvailableResources()
# Print Resources State
-- if (args.status == True):
++ if args.status == True:
data.mercedesResources.UpdateResourcesState()
data.mercedesResources.PrintResourcesState()
import json
import logging
import os
++
from const import *
from query import *
# Logger
_LOGGER = logging.getLogger(__name__)
++
class MercedesMeOauth:
########################
self.token_file = TOKEN_FILE
# Base64
b64_str = f"{client_id}:{client_secret}"
-- b64_bytes = base64.b64encode( b64_str.encode('ascii') )
-- self.base64 = b64_bytes.decode('ascii')
++ b64_bytes = base64.b64encode(b64_str.encode("ascii"))
++ self.base64 = b64_bytes.decode("ascii")
# Headers
self.headers = {
"Authorization": f"Basic {self.base64}",
-- "content-type": "application/x-www-form-urlencoded"
++ "content-type": "application/x-www-form-urlencoded",
}
########################
# Read Token
if not os.path.isfile(self.token_file):
# Token File not present - Creating new one
-- _LOGGER.error ("Token File missing - Creating a new one")
++ _LOGGER.error("Token File missing - Creating a new one")
found = False
else:
-- with open(self.token_file, 'r') as file:
++ with open(self.token_file, "r") as file:
try:
token = json.load(file)
if not self.CheckToken(token):
else:
found = True
except ValueError:
-- _LOGGER.error ("Error reading token file - Creating a new one")
++ _LOGGER.error("Error reading token file - Creating a new one")
found = False
-- if ( not found ):
++ if not found:
# Not valid or file missing
-- if (not self.CreateToken()):
-- _LOGGER.error ("Error creating token")
++ if not self.CreateToken():
++ _LOGGER.error("Error creating token")
return False
else:
# Valid: just import
-- self.access_token = token['access_token']
-- self.refresh_token = token['refresh_token']
-- self.token_expires_in = token['expires_in']
++ self.access_token = token["access_token"]
++ self.refresh_token = token["refresh_token"]
++ self.token_expires_in = token["expires_in"]
needToRefresh = True
-- if (needToRefresh):
-- if (not self.RefreshToken()):
-- _LOGGER.error ("Error refreshing token")
++ if needToRefresh:
++ if not self.RefreshToken():
++ _LOGGER.error("Error refreshing token")
return False
return True
# Write Token
########################
def WriteToken(self, token):
-- with open(self.token_file, 'w') as file:
++ with open(self.token_file, "w") as file:
json.dump(token, file)
########################
########################
def CheckToken(self, token):
if "reason" in token:
-- _LOGGER.error (f"Error retrieving token - {token['reason']} ({token['code']})")
++ _LOGGER.error(
++ f"Error retrieving token - {token['reason']} ({token['code']})"
++ )
return False
if "error" in token:
if "error_description" in token:
-- _LOGGER.error (f"Error retrieving token: {token['error_description']}")
++ _LOGGER.error(f"Error retrieving token: {token['error_description']}")
else:
-- _LOGGER.error (f"Error retrieving token: {token['error']}")
++ _LOGGER.error(f"Error retrieving token: {token['error']}")
return False
if len(token) == 0:
-- _LOGGER.error ("Empty token found.")
++ _LOGGER.error("Empty token found.")
return False
-- if not 'access_token' in token:
-- _LOGGER.error ("Access token not present.")
++ if not "access_token" in token:
++ _LOGGER.error("Access token not present.")
return False
-- if not 'refresh_token' in token:
-- _LOGGER.error ("Refresh token not present.")
++ if not "refresh_token" in token:
++ _LOGGER.error("Refresh token not present.")
return False
return True
def CreateToken(self):
auth_url = (
-- f"{URL_OAUTH_AUTH}&" +
-- f"client_id={self.client_id}&" +
-- f"redirect_uri={REDIRECT_URL}&" +
-- f"scope={SCOPE}"
++ f"{URL_OAUTH_AUTH}&"
++ + f"client_id={self.client_id}&"
++ + f"redirect_uri={REDIRECT_URL}&"
++ + f"scope={SCOPE}"
)
-- print( "Open the browser and insert this link:\n" )
++ print("Open the browser and insert this link:\n")
print(f"{auth_url}\n")
-- print( "Copy the code in the url:")
++ print("Copy the code in the url:")
auth_code = input()
data = f"grant_type=authorization_code&code={auth_code}&redirect_uri={REDIRECT_URL}"
else:
# Save Token
self.WriteToken(token)
-- self.access_token = token['access_token']
-- self.refresh_token = token['refresh_token']
-- self.token_expires_in = token['expires_in']
++ self.access_token = token["access_token"]
++ self.refresh_token = token["refresh_token"]
++ self.token_expires_in = token["expires_in"]
return True
########################
else:
# Save Token
self.WriteToken(token)
-- self.access_token = token['access_token']
-- self.refresh_token = token['refresh_token']
-- self.token_expires_in = token['expires_in']
++ self.access_token = token["access_token"]
++ self.refresh_token = token["refresh_token"]
++ self.token_expires_in = token["expires_in"]
return True
https://github.com/xraver/mercedes_me_api/
"""
import logging
++
import requests
from const import *
# Set Header
headers = {
-- "accept": "application/json;charset=utf-8",
-- "authorization": f"Bearer {config.token.access_token}"
++ "accept": "application/json;charset=utf-8",
++ "authorization": f"Bearer {config.token.access_token}",
}
# Send Request
try:
data = res.json()
except ValueError:
-- data = { "reason": "No Data",
-- "code" : res.status_code
-- }
++ data = {"reason": "No Data", "code": res.status_code}
# Check Error
if not res.ok:
-- if ("reason" in data):
++ if "reason" in data:
reason = data["reason"]
else:
if res.status_code == 400:
elif res.status_code == 404:
reason = "Page not found"
elif res.status_code == 429:
-- reason = "The service received too many requests in a given amount of time"
++ reason = (
++ "The service received too many requests in a given amount of time"
++ )
elif res.status_code == 500:
reason = "An error occurred on the server side"
elif res.status_code == 503:
data["code"] = res.status_code
return data
++
########################
# GetToken
########################
def GetToken(tokenURL, headers, data, refresh=True):
-- res = requests.post(tokenURL, data = data, headers = headers)
++ res = requests.post(tokenURL, data=data, headers=headers)
try:
data = res.json()
except ValueError:
-- _LOGGER.error (f"Error retrieving token {res.status_code}")
-- data = { "reason": "No Data",
-- "code" : res.status_code
-- }
++ _LOGGER.error(f"Error retrieving token {res.status_code}")
++ data = {"reason": "No Data", "code": res.status_code}
# Check Error
if not res.ok:
-- if ("reason" in data):
++ if "reason" in data:
reason = data["reason"]
else:
-- if (refresh == False):
++ if refresh == False:
# New Token Errors
if res.status_code == 302:
reason = "The request scope is invalid"
For more details about this component, please refer to the documentation at
https://github.com/xraver/mercedes_me_api/
"""
--from datetime import datetime
--import logging
import json
++import logging
import os
++from datetime import datetime
from config import MercedesMeConfig
from const import *
# Logger
_LOGGER = logging.getLogger(__name__)
++
class MercedesMeResource:
-- def __init__( self, name, vin, version, href, state=None, timestamp=None, valid=False ):
++ def __init__(
++ self, name, vin, version, href, state=None, timestamp=None, valid=False
++ ):
self._name = name
self._version = version
self._href = href
self._state = state
self._timestamp = timestamp
self._valid = valid
-- if(timestamp != None):
-- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
++ if timestamp != None:
++ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
else:
self._lastupdate = 0
def __str__(self):
-- return json.dumps({
-- "name" : self._name,
-- "vin" : self._vin,
-- "version" : self._version,
-- "href" : self._href,
-- "state" : self._state,
-- "timestamp" : self._timestamp,
-- "valid" : self._valid,
-- })
++ return json.dumps(
++ {
++ "name": self._name,
++ "vin": self._vin,
++ "version": self._version,
++ "href": self._href,
++ "state": self._state,
++ "timestamp": self._timestamp,
++ "valid": self._valid,
++ }
++ )
def getJson(self):
-- return ({
-- "name" : self._name,
-- "vin" : self._vin,
-- "version" : self._version,
-- "href" : self._href,
-- "state" : self._state,
-- "timestamp" : self._timestamp,
-- "valid" : self._valid,
-- })
++ return {
++ "name": self._name,
++ "vin": self._vin,
++ "version": self._version,
++ "href": self._href,
++ "state": self._state,
++ "timestamp": self._timestamp,
++ "valid": self._valid,
++ }
def UpdateState(self, state, timestamp):
"""Update status of the resource."""
self._state = state
self._timestamp = timestamp
-- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
++ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
self._valid = True
def unique_id(self):
def device_state_attributes(self):
"""Return attributes for the sensor."""
-- return ({
-- "valid": self._valid,
-- "timestamp": self._timestamp,
-- "last_update": self._lastupdate,
-- })
++ return {
++ "valid": self._valid,
++ "timestamp": self._timestamp,
++ "last_update": self._lastupdate,
++ }
++
class MercedesMeResources:
if not os.path.isfile(self.resources_file):
# Resources File not present - Retrieving new one from server
-- _LOGGER.error ("Resource File missing - Creating a new one.")
++ _LOGGER.error("Resource File missing - Creating a new one.")
found = False
else:
-- with open(self.resources_file, 'r') as file:
++ with open(self.resources_file, "r") as file:
try:
resources = json.load(file)
-- if (not self.CheckResources(resources)):
++ if not self.CheckResources(resources):
raise ValueError
else:
found = True
except ValueError:
-- _LOGGER.error ("Error reading resource file - Creating a new one.")
++ _LOGGER.error("Error reading resource file - Creating a new one.")
found = False
-- if ( not found ):
++ if not found:
# Not valid or file missing
resources = self.RetrieveResourcesList()
-- if( resources == None ):
++ if resources == None:
# Not found or wrong
-- _LOGGER.error ("Error retrieving resource list.")
++ _LOGGER.error("Error retrieving resource list.")
return False
else:
# import and write
########################
def CheckResources(self, resources):
if "reason" in resources:
-- _LOGGER.error (f"Error retrieving available resources - {resources['reason']} ({resources['code']})")
++ _LOGGER.error(
++ f"Error retrieving available resources - {resources['reason']} ({resources['code']})"
++ )
return False
if "error" in resources:
if "error_description" in resources:
-- _LOGGER.error (f"Error retrieving resources: {resources['error_description']}")
++ _LOGGER.error(
++ f"Error retrieving resources: {resources['error_description']}"
++ )
else:
-- _LOGGER.error (f"Error retrieving resources: {resources['error']}")
++ _LOGGER.error(f"Error retrieving resources: {resources['error']}")
return False
if len(resources) == 0:
-- _LOGGER.error ("Empty resources found.")
++ _LOGGER.error("Empty resources found.")
return False
return True
resURL = f"{URL_RES_PREFIX}/vehicles/{self.mercedesConfig.vin}/resources"
resources = GetResource(resURL, self.mercedesConfig)
if not self.CheckResources(resources):
-- _LOGGER.error ("Error retrieving available resources")
++ _LOGGER.error("Error retrieving available resources")
return None
else:
return resources
########################
def ImportResourcesList(self, resources):
for res in resources:
-- if("state" in res):
-- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"], res["state"], res["timestamp"], res["valid"]) )
++ if "state" in res:
++ self.database.append(
++ MercedesMeResource(
++ res["name"],
++ self.mercedesConfig.vin,
++ res["version"],
++ res["href"],
++ res["state"],
++ res["timestamp"],
++ res["valid"],
++ )
++ )
else:
-- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"]) )
++ self.database.append(
++ MercedesMeResource(
++ res["name"],
++ self.mercedesConfig.vin,
++ res["version"],
++ res["href"],
++ )
++ )
########################
# Write Resources File
output = []
# Extract List
for res in self.database:
-- output.append( res.getJson() )
++ output.append(res.getJson())
# Write File
-- with open(self.resources_file, 'w') as file:
++ with open(self.resources_file, "w") as file:
json.dump(output, file)
########################
# Print Available Resources
########################
def PrintAvailableResources(self):
-- print (f"Found {len(self.database)} resources:")
++ print(f"Found {len(self.database)} resources:")
for res in self.database:
-- print (f"{res._name}: {URL_RES_PREFIX}{res._href}")
++ print(f"{res._name}: {URL_RES_PREFIX}{res._href}")
########################
# Print Resources State
########################
-- def PrintResourcesState(self, valid = True):
++ def PrintResourcesState(self, valid=True):
for res in self.database:
-- if((not valid) | res._valid):
-- print (f"{res._name}:")
-- print (f"\tvalid: {res._valid}")
-- print (f"\tstate: {res._state}")
-- print (f"\ttimestamp: {res._timestamp}")
-- print (f"\tlast_update: {res._lastupdate}")
++ if (not valid) | res._valid:
++ print(f"{res._name}:")
++ print(f"\tvalid: {res._valid}")
++ print(f"\tstate: {res._state}")
++ print(f"\ttimestamp: {res._timestamp}")
++ print(f"\tlast_update: {res._lastupdate}")
########################
# Update Resources State
for res in self.database:
result = GetResource(f"{URL_RES_PREFIX}{res._href}", self.mercedesConfig)
if not "reason" in result:
-- res.UpdateState(result[res._name]["value"], result[res._name]["timestamp"])
++ res.UpdateState(
++ result[res._name]["value"], result[res._name]["timestamp"]
++ )
# Write Resource File
self.WriteResourcesFile()