From: Giorgio Ravera <47370115+xraver@users.noreply.github.com> Date: Mon, 21 Dec 2020 19:58:40 +0000 (+0100) Subject: Merge pull request #17 from KTibow/patch-1 X-Git-Tag: v0.9~5 X-Git-Url: http://git.giorgioravera.it/?a=commitdiff_plain;h=e3530a88b66e6516b2831c5d760df3f0e4519b6a;p=mercedes_me_api.git Merge pull request #17 from KTibow/patch-1 Add advanced all-in-one action --- e3530a88b66e6516b2831c5d760df3f0e4519b6a diff --cc config.py index 75071db,75071db..a826d36 --- a/config.py +++ b/config.py @@@ -6,16 -6,16 +6,18 @@@ Author: G. Raver For more details about this component, please refer to the documentation at https://github.com/xraver/mercedes_me_api/ """ --from configobj import ConfigObj import logging import os --from oauth import MercedesMeOauth ++from configobj import ConfigObj ++ from const import * ++from oauth import MercedesMeOauth # Logger _LOGGER = logging.getLogger(__name__) ++ class MercedesMeConfig: ######################## @@@ -36,31 -36,31 +38,31 @@@ # Read Credentials from file if not os.path.isfile(self.credentials_file): -- _LOGGER.error (f"Credential File {self.credentials_file} not found") ++ _LOGGER.error(f"Credential File {self.credentials_file} not found") return False try: f = ConfigObj(self.credentials_file) except Exception: -- _LOGGER.error (f"Wrong {self.credentials_file} file found") ++ _LOGGER.error(f"Wrong {self.credentials_file} file found") return False # Client ID self.client_id = f.get(CONF_CLIENT_ID) if not self.client_id: -- _LOGGER.error ("No {CONF_CLIENT_ID} found in the configuration") ++ _LOGGER.error("No {CONF_CLIENT_ID} found in the configuration") return False # Client Secret self.client_secret = f.get(CONF_CLIENT_SECRET) if not self.client_secret: -- _LOGGER.error ("No {CONF_CLIENT_SECRET} found in the configuration") ++ _LOGGER.error("No {CONF_CLIENT_SECRET} found in the configuration") return False # Vehicle ID self.vin = f.get(CONF_VEHICLE_ID) if not self.vin: -- _LOGGER.error ("No {CONF_VEHICLE_ID} found in the configuration") ++ _LOGGER.error("No {CONF_VEHICLE_ID} found in the configuration") return False # Read Token self.token = MercedesMeOauth(self.client_id, self.client_secret) if not self.token.ReadToken(): return False -- ++ return True diff --cc mercedes_me_api.py index a24f91d,a24f91d..4c8d0f3 --- a/mercedes_me_api.py +++ b/mercedes_me_api.py @@@ -11,12 -11,12 +11,13 @@@ import loggin import sys from config import MercedesMeConfig --from resources import MercedesMeResources from const import * ++from resources import MercedesMeResources # Logger _LOGGER = logging.getLogger(__name__) ++ class MercedesMeData: def __init__(self): # Configuration Data @@@ -24,23 -24,23 +25,47 @@@ # Resource Data self.mercedesResources = MercedesMeResources(self.mercedesConfig) ++ ######################## # Parse Input ######################## def ParseInput(): parser = argparse.ArgumentParser() -- parser.add_argument('-t', '--token', action='store_true', help="Procedure to obtatin the Access Token") -- parser.add_argument('-r', '--refresh', action='store_true', help="Procedure to refresh the Access Token") -- parser.add_argument('-s', '--status', action='store_true', help="Retrieve the Status of your Vehicle") -- parser.add_argument('-R', '--resources', action='store_true', help="Retrieve the list of available resources of your Vehicle") -- parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + VERSION) -- -- if len(sys.argv)==1: ++ parser.add_argument( ++ "-t", ++ "--token", ++ action="store_true", ++ help="Procedure to obtatin the Access Token", ++ ) ++ parser.add_argument( ++ "-r", ++ "--refresh", ++ action="store_true", ++ help="Procedure to refresh the Access Token", ++ ) ++ parser.add_argument( ++ "-s", ++ "--status", ++ action="store_true", ++ help="Retrieve the Status of your Vehicle", ++ ) ++ parser.add_argument( ++ "-R", ++ "--resources", ++ action="store_true", ++ help="Retrieve the list of available resources of your Vehicle", ++ ) ++ parser.add_argument( ++ "-v", "--version", action="version", version="%(prog)s " + VERSION ++ ) ++ ++ if len(sys.argv) == 1: parser.print_help(sys.stderr) exit(1) return parser.parse_args() ++ ######################## # Main ######################## @@@ -54,31 -54,31 +79,31 @@@ if __name__ == "__main__" # Reading Configuration if not data.mercedesConfig.ReadConfig(): -- _LOGGER.error ("Error initializing configuration") -- exit (1) ++ _LOGGER.error("Error initializing configuration") ++ exit(1) # Create Token -- if (args.token == True): ++ if args.token == True: if not data.mercedesConfig.token.CreateToken(): -- _LOGGER.error ("Error creating token") -- exit (1) ++ _LOGGER.error("Error creating token") ++ exit(1) # Refresh Token -- if (args.refresh == True): ++ if args.refresh == True: if not data.mercedesConfig.token.RefreshToken(): -- _LOGGER.error ("Error refreshing token") -- exit (1) -- ++ _LOGGER.error("Error refreshing token") ++ exit(1) ++ # Read Resources if not data.mercedesResources.ReadResources(): -- _LOGGER.error ("Error initializing resources") -- exit (1) ++ _LOGGER.error("Error initializing resources") ++ exit(1) # Print Available Resources -- if (args.resources): ++ if args.resources: data.mercedesResources.PrintAvailableResources() # Print Resources State -- if (args.status == True): ++ if args.status == True: data.mercedesResources.UpdateResourcesState() data.mercedesResources.PrintResourcesState() diff --cc oauth.py index 5863ace,5863ace..e01b755 --- a/oauth.py +++ b/oauth.py @@@ -10,6 -10,6 +10,7 @@@ import base6 import json import logging import os ++ from const import * from query import * @@@ -20,6 -20,6 +21,7 @@@ URL_OAUTH_TOKEN = f"{URL_OAUTH_BASE}/to # Logger _LOGGER = logging.getLogger(__name__) ++ class MercedesMeOauth: ######################## @@@ -40,12 -40,12 +42,12 @@@ self.token_file = TOKEN_FILE # Base64 b64_str = f"{client_id}:{client_secret}" -- b64_bytes = base64.b64encode( b64_str.encode('ascii') ) -- self.base64 = b64_bytes.decode('ascii') ++ b64_bytes = base64.b64encode(b64_str.encode("ascii")) ++ self.base64 = b64_bytes.decode("ascii") # Headers self.headers = { "Authorization": f"Basic {self.base64}", -- "content-type": "application/x-www-form-urlencoded" ++ "content-type": "application/x-www-form-urlencoded", } ######################## @@@ -59,10 -59,10 +61,10 @@@ # Read Token if not os.path.isfile(self.token_file): # Token File not present - Creating new one -- _LOGGER.error ("Token File missing - Creating a new one") ++ _LOGGER.error("Token File missing - Creating a new one") found = False else: -- with open(self.token_file, 'r') as file: ++ with open(self.token_file, "r") as file: try: token = json.load(file) if not self.CheckToken(token): @@@ -70,24 -70,24 +72,24 @@@ else: found = True except ValueError: -- _LOGGER.error ("Error reading token file - Creating a new one") ++ _LOGGER.error("Error reading token file - Creating a new one") found = False -- if ( not found ): ++ if not found: # Not valid or file missing -- if (not self.CreateToken()): -- _LOGGER.error ("Error creating token") ++ if not self.CreateToken(): ++ _LOGGER.error("Error creating token") return False else: # Valid: just import -- self.access_token = token['access_token'] -- self.refresh_token = token['refresh_token'] -- self.token_expires_in = token['expires_in'] ++ self.access_token = token["access_token"] ++ self.refresh_token = token["refresh_token"] ++ self.token_expires_in = token["expires_in"] needToRefresh = True -- if (needToRefresh): -- if (not self.RefreshToken()): -- _LOGGER.error ("Error refreshing token") ++ if needToRefresh: ++ if not self.RefreshToken(): ++ _LOGGER.error("Error refreshing token") return False return True @@@ -96,7 -96,7 +98,7 @@@ # Write Token ######################## def WriteToken(self, token): -- with open(self.token_file, 'w') as file: ++ with open(self.token_file, "w") as file: json.dump(token, file) ######################## @@@ -104,22 -104,22 +106,24 @@@ ######################## def CheckToken(self, token): if "reason" in token: -- _LOGGER.error (f"Error retrieving token - {token['reason']} ({token['code']})") ++ _LOGGER.error( ++ f"Error retrieving token - {token['reason']} ({token['code']})" ++ ) return False if "error" in token: if "error_description" in token: -- _LOGGER.error (f"Error retrieving token: {token['error_description']}") ++ _LOGGER.error(f"Error retrieving token: {token['error_description']}") else: -- _LOGGER.error (f"Error retrieving token: {token['error']}") ++ _LOGGER.error(f"Error retrieving token: {token['error']}") return False if len(token) == 0: -- _LOGGER.error ("Empty token found.") ++ _LOGGER.error("Empty token found.") return False -- if not 'access_token' in token: -- _LOGGER.error ("Access token not present.") ++ if not "access_token" in token: ++ _LOGGER.error("Access token not present.") return False -- if not 'refresh_token' in token: -- _LOGGER.error ("Refresh token not present.") ++ if not "refresh_token" in token: ++ _LOGGER.error("Refresh token not present.") return False return True @@@ -129,15 -129,15 +133,15 @@@ def CreateToken(self): auth_url = ( -- f"{URL_OAUTH_AUTH}&" + -- f"client_id={self.client_id}&" + -- f"redirect_uri={REDIRECT_URL}&" + -- f"scope={SCOPE}" ++ f"{URL_OAUTH_AUTH}&" ++ + f"client_id={self.client_id}&" ++ + f"redirect_uri={REDIRECT_URL}&" ++ + f"scope={SCOPE}" ) -- print( "Open the browser and insert this link:\n" ) ++ print("Open the browser and insert this link:\n") print(f"{auth_url}\n") -- print( "Copy the code in the url:") ++ print("Copy the code in the url:") auth_code = input() data = f"grant_type=authorization_code&code={auth_code}&redirect_uri={REDIRECT_URL}" @@@ -149,9 -149,9 +153,9 @@@ else: # Save Token self.WriteToken(token) -- self.access_token = token['access_token'] -- self.refresh_token = token['refresh_token'] -- self.token_expires_in = token['expires_in'] ++ self.access_token = token["access_token"] ++ self.refresh_token = token["refresh_token"] ++ self.token_expires_in = token["expires_in"] return True ######################## @@@ -168,7 -168,7 +172,7 @@@ else: # Save Token self.WriteToken(token) -- self.access_token = token['access_token'] -- self.refresh_token = token['refresh_token'] -- self.token_expires_in = token['expires_in'] ++ self.access_token = token["access_token"] ++ self.refresh_token = token["refresh_token"] ++ self.token_expires_in = token["expires_in"] return True diff --cc query.py index 5f547da,5f547da..0c9569a --- a/query.py +++ b/query.py @@@ -7,6 -7,6 +7,7 @@@ For more details about this component, https://github.com/xraver/mercedes_me_api/ """ import logging ++ import requests from const import * @@@ -21,8 -21,8 +22,8 @@@ def GetResource(resourceURL, config) # Set Header headers = { -- "accept": "application/json;charset=utf-8", -- "authorization": f"Bearer {config.token.access_token}" ++ "accept": "application/json;charset=utf-8", ++ "authorization": f"Bearer {config.token.access_token}", } # Send Request @@@ -30,13 -30,13 +31,11 @@@ try: data = res.json() except ValueError: -- data = { "reason": "No Data", -- "code" : res.status_code -- } ++ data = {"reason": "No Data", "code": res.status_code} # Check Error if not res.ok: -- if ("reason" in data): ++ if "reason" in data: reason = data["reason"] else: if res.status_code == 400: @@@ -50,7 -50,7 +49,9 @@@ elif res.status_code == 404: reason = "Page not found" elif res.status_code == 429: -- reason = "The service received too many requests in a given amount of time" ++ reason = ( ++ "The service received too many requests in a given amount of time" ++ ) elif res.status_code == 500: reason = "An error occurred on the server side" elif res.status_code == 503: @@@ -61,25 -61,25 +62,24 @@@ data["code"] = res.status_code return data ++ ######################## # GetToken ######################## def GetToken(tokenURL, headers, data, refresh=True): -- res = requests.post(tokenURL, data = data, headers = headers) ++ res = requests.post(tokenURL, data=data, headers=headers) try: data = res.json() except ValueError: -- _LOGGER.error (f"Error retrieving token {res.status_code}") -- data = { "reason": "No Data", -- "code" : res.status_code -- } ++ _LOGGER.error(f"Error retrieving token {res.status_code}") ++ data = {"reason": "No Data", "code": res.status_code} # Check Error if not res.ok: -- if ("reason" in data): ++ if "reason" in data: reason = data["reason"] else: -- if (refresh == False): ++ if refresh == False: # New Token Errors if res.status_code == 302: reason = "The request scope is invalid" diff --cc resources.py index 7c2d280,7c2d280..d6c3e30 --- a/resources.py +++ b/resources.py @@@ -6,10 -6,10 +6,10 @@@ Author: G. Raver For more details about this component, please refer to the documentation at https://github.com/xraver/mercedes_me_api/ """ --from datetime import datetime --import logging import json ++import logging import os ++from datetime import datetime from config import MercedesMeConfig from const import * @@@ -18,8 -18,8 +18,11 @@@ from query import # Logger _LOGGER = logging.getLogger(__name__) ++ class MercedesMeResource: -- def __init__( self, name, vin, version, href, state=None, timestamp=None, valid=False ): ++ def __init__( ++ self, name, vin, version, href, state=None, timestamp=None, valid=False ++ ): self._name = name self._version = version self._href = href @@@ -27,38 -27,38 +30,40 @@@ self._state = state self._timestamp = timestamp self._valid = valid -- if(timestamp != None): -- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000) ++ if timestamp != None: ++ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000) else: self._lastupdate = 0 def __str__(self): -- return json.dumps({ -- "name" : self._name, -- "vin" : self._vin, -- "version" : self._version, -- "href" : self._href, -- "state" : self._state, -- "timestamp" : self._timestamp, -- "valid" : self._valid, -- }) ++ return json.dumps( ++ { ++ "name": self._name, ++ "vin": self._vin, ++ "version": self._version, ++ "href": self._href, ++ "state": self._state, ++ "timestamp": self._timestamp, ++ "valid": self._valid, ++ } ++ ) def getJson(self): -- return ({ -- "name" : self._name, -- "vin" : self._vin, -- "version" : self._version, -- "href" : self._href, -- "state" : self._state, -- "timestamp" : self._timestamp, -- "valid" : self._valid, -- }) ++ return { ++ "name": self._name, ++ "vin": self._vin, ++ "version": self._version, ++ "href": self._href, ++ "state": self._state, ++ "timestamp": self._timestamp, ++ "valid": self._valid, ++ } def UpdateState(self, state, timestamp): """Update status of the resource.""" self._state = state self._timestamp = timestamp -- self._lastupdate = datetime.fromtimestamp(self._timestamp/1000) ++ self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000) self._valid = True def unique_id(self): @@@ -75,11 -75,11 +80,12 @@@ def device_state_attributes(self): """Return attributes for the sensor.""" -- return ({ -- "valid": self._valid, -- "timestamp": self._timestamp, -- "last_update": self._lastupdate, -- }) ++ return { ++ "valid": self._valid, ++ "timestamp": self._timestamp, ++ "last_update": self._lastupdate, ++ } ++ class MercedesMeResources: @@@ -102,26 -102,26 +108,26 @@@ if not os.path.isfile(self.resources_file): # Resources File not present - Retrieving new one from server -- _LOGGER.error ("Resource File missing - Creating a new one.") ++ _LOGGER.error("Resource File missing - Creating a new one.") found = False else: -- with open(self.resources_file, 'r') as file: ++ with open(self.resources_file, "r") as file: try: resources = json.load(file) -- if (not self.CheckResources(resources)): ++ if not self.CheckResources(resources): raise ValueError else: found = True except ValueError: -- _LOGGER.error ("Error reading resource file - Creating a new one.") ++ _LOGGER.error("Error reading resource file - Creating a new one.") found = False -- if ( not found ): ++ if not found: # Not valid or file missing resources = self.RetrieveResourcesList() -- if( resources == None ): ++ if resources == None: # Not found or wrong -- _LOGGER.error ("Error retrieving resource list.") ++ _LOGGER.error("Error retrieving resource list.") return False else: # import and write @@@ -138,16 -138,16 +144,20 @@@ ######################## def CheckResources(self, resources): if "reason" in resources: -- _LOGGER.error (f"Error retrieving available resources - {resources['reason']} ({resources['code']})") ++ _LOGGER.error( ++ f"Error retrieving available resources - {resources['reason']} ({resources['code']})" ++ ) return False if "error" in resources: if "error_description" in resources: -- _LOGGER.error (f"Error retrieving resources: {resources['error_description']}") ++ _LOGGER.error( ++ f"Error retrieving resources: {resources['error_description']}" ++ ) else: -- _LOGGER.error (f"Error retrieving resources: {resources['error']}") ++ _LOGGER.error(f"Error retrieving resources: {resources['error']}") return False if len(resources) == 0: -- _LOGGER.error ("Empty resources found.") ++ _LOGGER.error("Empty resources found.") return False return True @@@ -158,7 -158,7 +168,7 @@@ resURL = f"{URL_RES_PREFIX}/vehicles/{self.mercedesConfig.vin}/resources" resources = GetResource(resURL, self.mercedesConfig) if not self.CheckResources(resources): -- _LOGGER.error ("Error retrieving available resources") ++ _LOGGER.error("Error retrieving available resources") return None else: return resources @@@ -168,10 -168,10 +178,27 @@@ ######################## def ImportResourcesList(self, resources): for res in resources: -- if("state" in res): -- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"], res["state"], res["timestamp"], res["valid"]) ) ++ if "state" in res: ++ self.database.append( ++ MercedesMeResource( ++ res["name"], ++ self.mercedesConfig.vin, ++ res["version"], ++ res["href"], ++ res["state"], ++ res["timestamp"], ++ res["valid"], ++ ) ++ ) else: -- self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"]) ) ++ self.database.append( ++ MercedesMeResource( ++ res["name"], ++ self.mercedesConfig.vin, ++ res["version"], ++ res["href"], ++ ) ++ ) ######################## # Write Resources File @@@ -180,30 -180,30 +207,30 @@@ output = [] # Extract List for res in self.database: -- output.append( res.getJson() ) ++ output.append(res.getJson()) # Write File -- with open(self.resources_file, 'w') as file: ++ with open(self.resources_file, "w") as file: json.dump(output, file) ######################## # Print Available Resources ######################## def PrintAvailableResources(self): -- print (f"Found {len(self.database)} resources:") ++ print(f"Found {len(self.database)} resources:") for res in self.database: -- print (f"{res._name}: {URL_RES_PREFIX}{res._href}") ++ print(f"{res._name}: {URL_RES_PREFIX}{res._href}") ######################## # Print Resources State ######################## -- def PrintResourcesState(self, valid = True): ++ def PrintResourcesState(self, valid=True): for res in self.database: -- if((not valid) | res._valid): -- print (f"{res._name}:") -- print (f"\tvalid: {res._valid}") -- print (f"\tstate: {res._state}") -- print (f"\ttimestamp: {res._timestamp}") -- print (f"\tlast_update: {res._lastupdate}") ++ if (not valid) | res._valid: ++ print(f"{res._name}:") ++ print(f"\tvalid: {res._valid}") ++ print(f"\tstate: {res._state}") ++ print(f"\ttimestamp: {res._timestamp}") ++ print(f"\tlast_update: {res._lastupdate}") ######################## # Update Resources State @@@ -212,6 -212,6 +239,8 @@@ for res in self.database: result = GetResource(f"{URL_RES_PREFIX}{res._href}", self.mercedesConfig) if not "reason" in result: -- res.UpdateState(result[res._name]["value"], result[res._name]["timestamp"]) ++ res.UpdateState( ++ result[res._name]["value"], result[res._name]["timestamp"] ++ ) # Write Resource File self.WriteResourcesFile()