import json\r
import logging\r
import os\r
+\r
from .const import *\r
from .query import *\r
\r
# Logger\r
_LOGGER = logging.getLogger(__name__)\r
\r
+\r
class MercedesMeOauth:\r
\r
########################\r
# Read Token\r
if not os.path.isfile(self.token_file):\r
# Token File not present - Creating new one\r
- _LOGGER.error ("Token File missing - Creating a new one")\r
+ _LOGGER.error("Token File missing - Creating a new one")\r
found = False\r
else:\r
- with open(self.token_file, 'r') as file:\r
+ with open(self.token_file, "r") as file:\r
try:\r
token = json.load(file)\r
if not self.CheckToken(token):\r
else:\r
found = True\r
except ValueError:\r
- _LOGGER.error ("Error reading token file - Creating a new one")\r
+ _LOGGER.error("Error reading token file - Creating a new one")\r
found = False\r
\r
- if ( not found ):\r
+ if not found:\r
# Not valid or file missing\r
#if (not self.CreateToken()): GRGR -> to be fixed\r
if (True):\r
- _LOGGER.error ("Error creating token")\r
+ _LOGGER.error("Error creating token")\r
return False\r
else:\r
# Valid: just import\r
- self.access_token = token['access_token']\r
- self.refresh_token = token['refresh_token']\r
- self.token_expires_in = token['expires_in']\r
+ self.access_token = token["access_token"]\r
+ self.refresh_token = token["refresh_token"]\r
+ self.token_expires_in = token["expires_in"]\r
needToRefresh = True\r
\r
- if (needToRefresh):\r
- if (not self.RefreshToken()):\r
- _LOGGER.error ("Error refreshing token")\r
+ if needToRefresh:\r
+ if not self.RefreshToken():\r
+ _LOGGER.error("Error refreshing token")\r
return False\r
\r
return True\r
# Write Token\r
########################\r
def WriteToken(self, token):\r
- with open(self.token_file, 'w') as file:\r
+ with open(self.token_file, "w") as file:\r
json.dump(token, file)\r
\r
########################\r
########################\r
def CheckToken(self, token):\r
if "reason" in token:\r
- _LOGGER.error (f"Error retrieving token - {token['reason']} ({token['code']})")\r
+ _LOGGER.error(\r
+ f"Error retrieving token - {token['reason']} ({token['code']})"\r
+ )\r
return False\r
if "error" in token:\r
if "error_description" in token:\r
- _LOGGER.error (f"Error retrieving token: {token['error_description']}")\r
+ _LOGGER.error(f"Error retrieving token: {token['error_description']}")\r
else:\r
- _LOGGER.error (f"Error retrieving token: {token['error']}")\r
+ _LOGGER.error(f"Error retrieving token: {token['error']}")\r
return False\r
if len(token) == 0:\r
- _LOGGER.error ("Empty token found.")\r
+ _LOGGER.error("Empty token found.")\r
return False\r
- if not 'access_token' in token:\r
- _LOGGER.error ("Access token not present.")\r
+ if not "access_token" in token:\r
+ _LOGGER.error("Access token not present.")\r
return False\r
- if not 'refresh_token' in token:\r
- _LOGGER.error ("Refresh token not present.")\r
+ if not "refresh_token" in token:\r
+ _LOGGER.error("Refresh token not present.")\r
return False\r
return True\r
\r
def CreateToken(self):\r
\r
auth_url = (\r
- f"{URL_OAUTH_AUTH}&" +\r
- f"client_id={self.client_id}&" + \r
- f"redirect_uri={REDIRECT_URL}&" + \r
- f"scope={SCOPE}"\r
+ f"{URL_OAUTH_AUTH}&"\r
+ + f"client_id={self.client_id}&"\r
+ + f"redirect_uri={REDIRECT_URL}&"\r
+ + f"scope={SCOPE}"\r
)\r
\r
- print( "Open the browser and insert this link:\n" )\r
+ print("Open the browser and insert this link:\n")\r
print(f"{auth_url}\n")\r
- print( "Copy the code in the url:")\r
+ print("Copy the code in the url:")\r
auth_code = input()\r
\r
data = f"grant_type=authorization_code&code={auth_code}&redirect_uri={REDIRECT_URL}"\r
else:\r
# Save Token\r
self.WriteToken(token)\r
- self.access_token = token['access_token']\r
- self.refresh_token = token['refresh_token']\r
- self.token_expires_in = token['expires_in']\r
+ self.access_token = token["access_token"]\r
+ self.refresh_token = token["refresh_token"]\r
+ self.token_expires_in = token["expires_in"]\r
return True\r
\r
########################\r
else:\r
# Save Token\r
self.WriteToken(token)\r
- self.access_token = token['access_token']\r
- self.refresh_token = token['refresh_token']\r
- self.token_expires_in = token['expires_in']\r
+ self.access_token = token["access_token"]\r
+ self.refresh_token = token["refresh_token"]\r
+ self.token_expires_in = token["expires_in"]\r
return True\r
For more details about this component, please refer to the documentation at
https://github.com/xraver/mercedes_me_api/
"""
-from datetime import datetime
-import logging
import json
+import logging
import os
-
+from datetime import datetime
from homeassistant.helpers.entity import Entity
from .config import MercedesMeConfig
# Logger
_LOGGER = logging.getLogger(__name__)
+
class MercedesMeResource (Entity):
- def __init__( self, name, vin, version, href, state=None, timestamp=None, valid=False ):
+ def __init__(
+ self, name, vin, version, href, state=None, timestamp=None, valid=False
+ ):
self._name = name
self._version = version
self._href = href
self._state = state
self._timestamp = timestamp
self._valid = valid
- if(timestamp != None):
- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
+ if timestamp != None:
+ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
else:
self._lastupdate = 0
def __str__(self):
- return json.dumps({
- "name" : self._name,
- "vin" : self._vin,
- "version" : self._version,
- "href" : self._href,
- "state" : self._state,
- "timestamp" : self._timestamp,
- "valid" : self._valid,
- })
+ return json.dumps(
+ {
+ "name": self._name,
+ "vin": self._vin,
+ "version": self._version,
+ "href": self._href,
+ "state": self._state,
+ "timestamp": self._timestamp,
+ "valid": self._valid,
+ }
+ )
def getJson(self):
- return ({
- "name" : self._name,
- "vin" : self._vin,
- "version" : self._version,
- "href" : self._href,
- "state" : self._state,
- "timestamp" : self._timestamp,
- "valid" : self._valid,
- })
+ return {
+ "name": self._name,
+ "vin": self._vin,
+ "version": self._version,
+ "href": self._href,
+ "state": self._state,
+ "timestamp": self._timestamp,
+ "valid": self._valid,
+ }
def UpdateState(self, state, timestamp):
"""Update status of the resource."""
self._state = state
self._timestamp = timestamp
- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
+ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
self._valid = True
@property
@property
def device_state_attributes(self):
"""Return attributes for the sensor."""
- return ({
- "valid": self._valid,
- "timestamp": self._timestamp,
- "last_update": self._lastupdate,
- })
+ return {
+ "valid": self._valid,
+ "timestamp": self._timestamp,
+ "last_update": self._lastupdate,
+ }
+
class MercedesMeResources:
if not os.path.isfile(self.resources_file):
# Resources File not present - Retrieving new one from server
- _LOGGER.error ("Resource File missing - Creating a new one.")
+ _LOGGER.error("Resource File missing - Creating a new one.")
found = False
else:
- with open(self.resources_file, 'r') as file:
+ with open(self.resources_file, "r") as file:
try:
resources = json.load(file)
- if (not self.CheckResources(resources)):
+ if not self.CheckResources(resources):
raise ValueError
else:
found = True
except ValueError:
- _LOGGER.error ("Error reading resource file - Creating a new one.")
+ _LOGGER.error("Error reading resource file - Creating a new one.")
found = False
- if ( not found ):
+ if not found:
# Not valid or file missing
resources = self.RetrieveResourcesList()
- if( resources == None ):
+ if resources == None:
# Not found or wrong
- _LOGGER.error ("Error retrieving resource list.")
+ _LOGGER.error("Error retrieving resource list.")
return False
else:
# import and write
########################
def CheckResources(self, resources):
if "reason" in resources:
- _LOGGER.error (f"Error retrieving available resources - {resources['reason']} ({resources['code']})")
+ _LOGGER.error(
+ f"Error retrieving available resources - {resources['reason']} ({resources['code']})"
+ )
return False
if "error" in resources:
if "error_description" in resources:
- _LOGGER.error (f"Error retrieving resources: {resources['error_description']}")
+ _LOGGER.error(
+ f"Error retrieving resources: {resources['error_description']}"
+ )
else:
- _LOGGER.error (f"Error retrieving resources: {resources['error']}")
+ _LOGGER.error(f"Error retrieving resources: {resources['error']}")
return False
if len(resources) == 0:
- _LOGGER.error ("Empty resources found.")
+ _LOGGER.error("Empty resources found.")
return False
return True
resURL = f"{URL_RES_PREFIX}/vehicles/{self.mercedesConfig.vin}/resources"
resources = GetResource(resURL, self.mercedesConfig)
if not self.CheckResources(resources):
- _LOGGER.error ("Error retrieving available resources")
+ _LOGGER.error("Error retrieving available resources")
return None
else:
return resources
########################
def ImportResourcesList(self, resources):
for res in resources:
- if("state" in res):
- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"], res["state"], res["timestamp"], res["valid"]) )
+ if "state" in res:
+ self.database.append(
+ MercedesMeResource(
+ res["name"],
+ self.mercedesConfig.vin,
+ res["version"],
+ res["href"],
+ res["state"],
+ res["timestamp"],
+ res["valid"],
+ )
+ )
else:
- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"]) )
+ self.database.append(
+ MercedesMeResource(
+ res["name"],
+ self.mercedesConfig.vin,
+ res["version"],
+ res["href"],
+ )
+ )
########################
# Write Resources File
output = []
# Extract List
for res in self.database:
- output.append( res.getJson() )
+ output.append(res.getJson())
# Write File
- with open(self.resources_file, 'w') as file:
+ with open(self.resources_file, "w") as file:
json.dump(output, file)
########################
# Print Available Resources
########################
def PrintAvailableResources(self):
- print (f"Found {len(self.database)} resources:")
+ print(f"Found {len(self.database)} resources:")
for res in self.database:
- print (f"{res._name}: {URL_RES_PREFIX}{res._href}")
+ print(f"{res._name}: {URL_RES_PREFIX}{res._href}")
########################
# Print Resources State
########################
- def PrintResourcesState(self, valid = True):
+ def PrintResourcesState(self, valid=True):
for res in self.database:
- if((not valid) | res._valid):
- print (f"{res._name}:")
- print (f"\tvalid: {res._valid}")
- print (f"\tstate: {res._state}")
- print (f"\ttimestamp: {res._timestamp}")
- print (f"\tlast_update: {res._lastupdate}")
+ if (not valid) | res._valid:
+ print(f"{res._name}:")
+ print(f"\tvalid: {res._valid}")
+ print(f"\tstate: {res._state}")
+ print(f"\ttimestamp: {res._timestamp}")
+ print(f"\tlast_update: {res._lastupdate}")
########################
# Update Resources State
for res in self.database:
result = GetResource(f"{URL_RES_PREFIX}{res._href}", self.mercedesConfig)
if not "reason" in result:
- res.UpdateState(result[res._name]["value"], result[res._name]["timestamp"])
+ res.UpdateState(
+ result[res._name]["value"], result[res._name]["timestamp"]
+ )
# Write Resource File
self.WriteResourcesFile()