]> git.giorgioravera.it Git - mercedes_me_api.git/commitdiff
Merge pull request #17 from KTibow/patch-1
authorGiorgio Ravera <47370115+xraver@users.noreply.github.com>
Mon, 21 Dec 2020 19:58:40 +0000 (20:58 +0100)
committerGitHub Actions <41898282+github-actions[bot]@users.noreply.github.com>
Mon, 21 Dec 2020 20:20:09 +0000 (20:20 +0000)
Add advanced all-in-one action

1  2 
config.py
mercedes_me_api.py
oauth.py
query.py
resources.py

diff --cc config.py
index 75071dbd3873e3f2d26a0cb832c0ef902566c9b8,75071dbd3873e3f2d26a0cb832c0ef902566c9b8..a826d36f667498bb6c21eb943973882b56352098
+++ b/config.py
@@@ -6,16 -6,16 +6,18 @@@ Author: G. Raver
  For more details about this component, please refer to the documentation at
  https://github.com/xraver/mercedes_me_api/
  """
--from configobj import ConfigObj
  import logging
  import os
  
--from oauth import MercedesMeOauth
++from configobj import ConfigObj
++
  from const import *
++from oauth import MercedesMeOauth
  
  # Logger
  _LOGGER = logging.getLogger(__name__)
  
++
  class MercedesMeConfig:
  
      ########################
  
          # Read Credentials from file
          if not os.path.isfile(self.credentials_file):
--            _LOGGER.error (f"Credential File {self.credentials_file} not found")
++            _LOGGER.error(f"Credential File {self.credentials_file} not found")
              return False
          try:
              f = ConfigObj(self.credentials_file)
          except Exception:
--            _LOGGER.error (f"Wrong {self.credentials_file} file found")
++            _LOGGER.error(f"Wrong {self.credentials_file} file found")
              return False
          # Client ID
          self.client_id = f.get(CONF_CLIENT_ID)
          if not self.client_id:
--            _LOGGER.error ("No {CONF_CLIENT_ID} found in the configuration")
++            _LOGGER.error("No {CONF_CLIENT_ID} found in the configuration")
              return False
          # Client Secret
          self.client_secret = f.get(CONF_CLIENT_SECRET)
          if not self.client_secret:
--            _LOGGER.error ("No {CONF_CLIENT_SECRET} found in the configuration")
++            _LOGGER.error("No {CONF_CLIENT_SECRET} found in the configuration")
              return False
          # Vehicle ID
          self.vin = f.get(CONF_VEHICLE_ID)
          if not self.vin:
--            _LOGGER.error ("No {CONF_VEHICLE_ID} found in the configuration")
++            _LOGGER.error("No {CONF_VEHICLE_ID} found in the configuration")
              return False
          # Read Token
          self.token = MercedesMeOauth(self.client_id, self.client_secret)
          if not self.token.ReadToken():
              return False
--        
++
          return True
index a24f91d3c60e80280495817bfcd14abef0fcae7a,a24f91d3c60e80280495817bfcd14abef0fcae7a..4c8d0f38a19df3f9ce9d0f710de0677be4c48d3b
@@@ -11,12 -11,12 +11,13 @@@ import loggin
  import sys
  
  from config import MercedesMeConfig
--from resources import MercedesMeResources
  from const import *
++from resources import MercedesMeResources
  
  # Logger
  _LOGGER = logging.getLogger(__name__)
  
++
  class MercedesMeData:
      def __init__(self):
          # Configuration Data
          # Resource Data
          self.mercedesResources = MercedesMeResources(self.mercedesConfig)
  
++
  ########################
  # Parse Input
  ########################
  def ParseInput():
      parser = argparse.ArgumentParser()
--    parser.add_argument('-t', '--token', action='store_true', help="Procedure to obtatin the Access Token")
--    parser.add_argument('-r', '--refresh', action='store_true', help="Procedure to refresh the Access Token")
--    parser.add_argument('-s', '--status', action='store_true', help="Retrieve the Status of your Vehicle")
--    parser.add_argument('-R', '--resources', action='store_true', help="Retrieve the list of available resources of your Vehicle")
--    parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + VERSION)
--
--    if len(sys.argv)==1:
++    parser.add_argument(
++        "-t",
++        "--token",
++        action="store_true",
++        help="Procedure to obtatin the Access Token",
++    )
++    parser.add_argument(
++        "-r",
++        "--refresh",
++        action="store_true",
++        help="Procedure to refresh the Access Token",
++    )
++    parser.add_argument(
++        "-s",
++        "--status",
++        action="store_true",
++        help="Retrieve the Status of your Vehicle",
++    )
++    parser.add_argument(
++        "-R",
++        "--resources",
++        action="store_true",
++        help="Retrieve the list of available resources of your Vehicle",
++    )
++    parser.add_argument(
++        "-v", "--version", action="version", version="%(prog)s " + VERSION
++    )
++
++    if len(sys.argv) == 1:
          parser.print_help(sys.stderr)
          exit(1)
  
      return parser.parse_args()
  
++
  ########################
  # Main
  ########################
@@@ -54,31 -54,31 +79,31 @@@ if __name__ == "__main__"
  
      # Reading Configuration
      if not data.mercedesConfig.ReadConfig():
--        _LOGGER.error ("Error initializing configuration")
--        exit (1)
++        _LOGGER.error("Error initializing configuration")
++        exit(1)
  
      # Create Token
--    if (args.token == True):
++    if args.token == True:
          if not data.mercedesConfig.token.CreateToken():
--            _LOGGER.error ("Error creating token")
--            exit (1)
++            _LOGGER.error("Error creating token")
++            exit(1)
  
      # Refresh Token
--    if (args.refresh == True):
++    if args.refresh == True:
          if not data.mercedesConfig.token.RefreshToken():
--            _LOGGER.error ("Error refreshing token")
--            exit (1)
--            
++            _LOGGER.error("Error refreshing token")
++            exit(1)
++
      # Read Resources
      if not data.mercedesResources.ReadResources():
--        _LOGGER.error ("Error initializing resources")
--        exit (1)
++        _LOGGER.error("Error initializing resources")
++        exit(1)
  
      # Print Available Resources
--    if (args.resources):
++    if args.resources:
          data.mercedesResources.PrintAvailableResources()
  
      # Print Resources State
--    if (args.status == True):
++    if args.status == True:
          data.mercedesResources.UpdateResourcesState()
          data.mercedesResources.PrintResourcesState()
diff --cc oauth.py
index 5863acedea16026d94d1410ee39a9204b6a1d1d0,5863acedea16026d94d1410ee39a9204b6a1d1d0..e01b75579773dbc87be007a60e8a6cb09039f8fe
+++ b/oauth.py
@@@ -10,6 -10,6 +10,7 @@@ import base6
  import json
  import logging
  import os
++
  from const import *
  from query import *
  
@@@ -20,6 -20,6 +21,7 @@@ URL_OAUTH_TOKEN = f"{URL_OAUTH_BASE}/to
  # Logger
  _LOGGER = logging.getLogger(__name__)
  
++
  class MercedesMeOauth:
  
      ########################
          self.token_file = TOKEN_FILE
          # Base64
          b64_str = f"{client_id}:{client_secret}"
--        b64_bytes = base64.b64encode( b64_str.encode('ascii') )
--        self.base64 = b64_bytes.decode('ascii')
++        b64_bytes = base64.b64encode(b64_str.encode("ascii"))
++        self.base64 = b64_bytes.decode("ascii")
          # Headers
          self.headers = {
              "Authorization": f"Basic {self.base64}",
--            "content-type": "application/x-www-form-urlencoded"
++            "content-type": "application/x-www-form-urlencoded",
          }
  
      ########################
          # Read Token
          if not os.path.isfile(self.token_file):
              # Token File not present - Creating new one
--            _LOGGER.error ("Token File missing - Creating a new one")
++            _LOGGER.error("Token File missing - Creating a new one")
              found = False
          else:
--            with open(self.token_file, 'r') as file:
++            with open(self.token_file, "r") as file:
                  try:
                      token = json.load(file)
                      if not self.CheckToken(token):
                      else:
                          found = True
                  except ValueError:
--                    _LOGGER.error ("Error reading token file - Creating a new one")
++                    _LOGGER.error("Error reading token file - Creating a new one")
                      found = False
  
--        if ( not found ):
++        if not found:
              # Not valid or file missing
--            if (not self.CreateToken()):
--                _LOGGER.error ("Error creating token")
++            if not self.CreateToken():
++                _LOGGER.error("Error creating token")
                  return False
          else:
              # Valid: just import
--            self.access_token = token['access_token']
--            self.refresh_token = token['refresh_token']
--            self.token_expires_in = token['expires_in']
++            self.access_token = token["access_token"]
++            self.refresh_token = token["refresh_token"]
++            self.token_expires_in = token["expires_in"]
              needToRefresh = True
  
--        if (needToRefresh):
--            if (not self.RefreshToken()):
--                _LOGGER.error ("Error refreshing token")
++        if needToRefresh:
++            if not self.RefreshToken():
++                _LOGGER.error("Error refreshing token")
                  return False
  
          return True
@@@ -96,7 -96,7 +98,7 @@@
      # Write Token
      ########################
      def WriteToken(self, token):
--        with open(self.token_file, 'w') as file:
++        with open(self.token_file, "w") as file:
              json.dump(token, file)
  
      ########################
      ########################
      def CheckToken(self, token):
          if "reason" in token:
--            _LOGGER.error (f"Error retrieving token - {token['reason']} ({token['code']})")
++            _LOGGER.error(
++                f"Error retrieving token - {token['reason']} ({token['code']})"
++            )
              return False
          if "error" in token:
              if "error_description" in token:
--                _LOGGER.error (f"Error retrieving token: {token['error_description']}")
++                _LOGGER.error(f"Error retrieving token: {token['error_description']}")
              else:
--                _LOGGER.error (f"Error retrieving token: {token['error']}")
++                _LOGGER.error(f"Error retrieving token: {token['error']}")
              return False
          if len(token) == 0:
--            _LOGGER.error ("Empty token found.")
++            _LOGGER.error("Empty token found.")
              return False
--        if not 'access_token' in token:
--            _LOGGER.error ("Access token not present.")
++        if not "access_token" in token:
++            _LOGGER.error("Access token not present.")
              return False
--        if not 'refresh_token' in token:
--            _LOGGER.error ("Refresh token not present.")
++        if not "refresh_token" in token:
++            _LOGGER.error("Refresh token not present.")
              return False
          return True
  
      def CreateToken(self):
  
          auth_url = (
--            f"{URL_OAUTH_AUTH}&" +
--            f"client_id={self.client_id}&" + 
--            f"redirect_uri={REDIRECT_URL}&" + 
--            f"scope={SCOPE}"
++            f"{URL_OAUTH_AUTH}&"
++            + f"client_id={self.client_id}&"
++            + f"redirect_uri={REDIRECT_URL}&"
++            f"scope={SCOPE}"
          )
  
--        print( "Open the browser and insert this link:\n" )
++        print("Open the browser and insert this link:\n")
          print(f"{auth_url}\n")
--        print( "Copy the code in the url:")
++        print("Copy the code in the url:")
          auth_code = input()
  
          data = f"grant_type=authorization_code&code={auth_code}&redirect_uri={REDIRECT_URL}"
          else:
              # Save Token
              self.WriteToken(token)
--            self.access_token = token['access_token']
--            self.refresh_token = token['refresh_token']
--            self.token_expires_in = token['expires_in']
++            self.access_token = token["access_token"]
++            self.refresh_token = token["refresh_token"]
++            self.token_expires_in = token["expires_in"]
              return True
  
      ########################
          else:
              # Save Token
              self.WriteToken(token)
--            self.access_token = token['access_token']
--            self.refresh_token = token['refresh_token']
--            self.token_expires_in = token['expires_in']
++            self.access_token = token["access_token"]
++            self.refresh_token = token["refresh_token"]
++            self.token_expires_in = token["expires_in"]
          return True
diff --cc query.py
index 5f547da3c7a6831e2c13187c5a6e134e3db5aacc,5f547da3c7a6831e2c13187c5a6e134e3db5aacc..0c9569adcad3c3d48f6e4f61115d226d75a18926
+++ b/query.py
@@@ -7,6 -7,6 +7,7 @@@ For more details about this component, 
  https://github.com/xraver/mercedes_me_api/
  """
  import logging
++
  import requests
  
  from const import *
@@@ -21,8 -21,8 +22,8 @@@ def GetResource(resourceURL, config)
  
      # Set Header
      headers = {
--        "accept": "application/json;charset=utf-8", 
--        "authorization": f"Bearer {config.token.access_token}"
++        "accept": "application/json;charset=utf-8",
++        "authorization": f"Bearer {config.token.access_token}",
      }
  
      # Send Request
      try:
          data = res.json()
      except ValueError:
--        data = { "reason": "No Data",
--                 "code" : res.status_code 
--        }
++        data = {"reason": "No Data", "code": res.status_code}
  
      # Check Error
      if not res.ok:
--        if ("reason" in data):
++        if "reason" in data:
              reason = data["reason"]
          else:
              if res.status_code == 400:
@@@ -50,7 -50,7 +49,9 @@@
              elif res.status_code == 404:
                  reason = "Page not found"
              elif res.status_code == 429:
--                reason = "The service received too many requests in a given amount of time"
++                reason = (
++                    "The service received too many requests in a given amount of time"
++                )
              elif res.status_code == 500:
                  reason = "An error occurred on the server side"
              elif res.status_code == 503:
          data["code"] = res.status_code
      return data
  
++
  ########################
  # GetToken
  ########################
  def GetToken(tokenURL, headers, data, refresh=True):
--    res = requests.post(tokenURL, data = data, headers = headers)
++    res = requests.post(tokenURL, data=data, headers=headers)
      try:
          data = res.json()
      except ValueError:
--        _LOGGER.error (f"Error retrieving token {res.status_code}")
--        data = { "reason": "No Data",
--                 "code" : res.status_code 
--        }
++        _LOGGER.error(f"Error retrieving token {res.status_code}")
++        data = {"reason": "No Data", "code": res.status_code}
  
      # Check Error
      if not res.ok:
--        if ("reason" in data):
++        if "reason" in data:
              reason = data["reason"]
          else:
--            if (refresh == False):
++            if refresh == False:
                  # New Token Errors
                  if res.status_code == 302:
                      reason = "The request scope is invalid"
diff --cc resources.py
index 7c2d28055975a7b364f5fd124f1a97e44108ed53,7c2d28055975a7b364f5fd124f1a97e44108ed53..d6c3e306fc3fd09cf7a8cf220fe1035287f4bc6e
@@@ -6,10 -6,10 +6,10 @@@ Author: G. Raver
  For more details about this component, please refer to the documentation at
  https://github.com/xraver/mercedes_me_api/
  """
--from datetime import datetime
--import logging
  import json
++import logging
  import os
++from datetime import datetime
  
  from config import MercedesMeConfig
  from const import *
@@@ -18,8 -18,8 +18,11 @@@ from query import 
  # Logger
  _LOGGER = logging.getLogger(__name__)
  
++
  class MercedesMeResource:
--    def __init__( self, name, vin, version, href, state=None, timestamp=None, valid=False ):
++    def __init__(
++        self, name, vin, version, href, state=None, timestamp=None, valid=False
++    ):
          self._name = name
          self._version = version
          self._href = href
          self._state = state
          self._timestamp = timestamp
          self._valid = valid
--        if(timestamp != None):
--            self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
++        if timestamp != None:
++            self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
          else:
              self._lastupdate = 0
  
      def __str__(self):
--        return json.dumps({ 
--            "name" : self._name,
--            "vin" : self._vin,
--            "version" : self._version,
--            "href" : self._href,
--            "state" : self._state,
--            "timestamp" : self._timestamp,
--            "valid" : self._valid,
--            })
++        return json.dumps(
++            {
++                "name": self._name,
++                "vin": self._vin,
++                "version": self._version,
++                "href": self._href,
++                "state": self._state,
++                "timestamp": self._timestamp,
++                "valid": self._valid,
++            }
++        )
  
      def getJson(self):
--        return ({ 
--            "name" : self._name,
--            "vin" : self._vin,
--            "version" : self._version,
--            "href" : self._href,
--            "state" : self._state,
--            "timestamp" : self._timestamp,
--            "valid" : self._valid,
--            })
++        return {
++            "name": self._name,
++            "vin": self._vin,
++            "version": self._version,
++            "href": self._href,
++            "state": self._state,
++            "timestamp": self._timestamp,
++            "valid": self._valid,
++        }
  
      def UpdateState(self, state, timestamp):
          """Update status of the resource."""
          self._state = state
          self._timestamp = timestamp
--        self._lastupdate = datetime.fromtimestamp(self._timestamp/1000)
++        self._lastupdate = datetime.fromtimestamp(self._timestamp / 1000)
          self._valid = True
  
      def unique_id(self):
  
      def device_state_attributes(self):
          """Return attributes for the sensor."""
--        return ({
--                "valid": self._valid,
--                "timestamp": self._timestamp,
--                              "last_update": self._lastupdate,
--                })
++        return {
++            "valid": self._valid,
++            "timestamp": self._timestamp,
++            "last_update": self._lastupdate,
++        }
++
  
  class MercedesMeResources:
  
  
          if not os.path.isfile(self.resources_file):
              # Resources File not present - Retrieving new one from server
--            _LOGGER.error ("Resource File missing - Creating a new one.")
++            _LOGGER.error("Resource File missing - Creating a new one.")
              found = False
          else:
--            with open(self.resources_file, 'r') as file:
++            with open(self.resources_file, "r") as file:
                  try:
                      resources = json.load(file)
--                    if (not self.CheckResources(resources)):
++                    if not self.CheckResources(resources):
                          raise ValueError
                      else:
                          found = True
                  except ValueError:
--                    _LOGGER.error ("Error reading resource file - Creating a new one.")
++                    _LOGGER.error("Error reading resource file - Creating a new one.")
                      found = False
  
--        if ( not found ):
++        if not found:
              # Not valid or file missing
              resources = self.RetrieveResourcesList()
--            if( resources == None ):
++            if resources == None:
                  # Not found or wrong
--                _LOGGER.error ("Error retrieving resource list.")
++                _LOGGER.error("Error retrieving resource list.")
                  return False
              else:
                  # import and write
      ########################
      def CheckResources(self, resources):
          if "reason" in resources:
--            _LOGGER.error (f"Error retrieving available resources - {resources['reason']} ({resources['code']})")
++            _LOGGER.error(
++                f"Error retrieving available resources - {resources['reason']} ({resources['code']})"
++            )
              return False
          if "error" in resources:
              if "error_description" in resources:
--                _LOGGER.error (f"Error retrieving resources: {resources['error_description']}")
++                _LOGGER.error(
++                    f"Error retrieving resources: {resources['error_description']}"
++                )
              else:
--                _LOGGER.error (f"Error retrieving resources: {resources['error']}")
++                _LOGGER.error(f"Error retrieving resources: {resources['error']}")
              return False
          if len(resources) == 0:
--            _LOGGER.error ("Empty resources found.")
++            _LOGGER.error("Empty resources found.")
              return False
          return True
  
          resURL = f"{URL_RES_PREFIX}/vehicles/{self.mercedesConfig.vin}/resources"
          resources = GetResource(resURL, self.mercedesConfig)
          if not self.CheckResources(resources):
--            _LOGGER.error ("Error retrieving available resources")
++            _LOGGER.error("Error retrieving available resources")
              return None
          else:
              return resources
      ########################
      def ImportResourcesList(self, resources):
          for res in resources:
--            if("state" in res):
--                self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"], res["state"], res["timestamp"], res["valid"]) )
++            if "state" in res:
++                self.database.append(
++                    MercedesMeResource(
++                        res["name"],
++                        self.mercedesConfig.vin,
++                        res["version"],
++                        res["href"],
++                        res["state"],
++                        res["timestamp"],
++                        res["valid"],
++                    )
++                )
              else:
--                self.database.append( MercedesMeResource (res["name"], self.mercedesConfig.vin, res["version"], res["href"]) )
++                self.database.append(
++                    MercedesMeResource(
++                        res["name"],
++                        self.mercedesConfig.vin,
++                        res["version"],
++                        res["href"],
++                    )
++                )
  
      ########################
      # Write Resources File
          output = []
          # Extract List
          for res in self.database:
--            output.append( res.getJson() )
++            output.append(res.getJson())
          # Write File
--        with open(self.resources_file, 'w') as file:
++        with open(self.resources_file, "w") as file:
              json.dump(output, file)
  
      ########################
      # Print Available Resources
      ########################
      def PrintAvailableResources(self):
--        print (f"Found {len(self.database)} resources:")
++        print(f"Found {len(self.database)} resources:")
          for res in self.database:
--            print (f"{res._name}: {URL_RES_PREFIX}{res._href}")
++            print(f"{res._name}: {URL_RES_PREFIX}{res._href}")
  
      ########################
      # Print Resources State
      ########################
--    def PrintResourcesState(self, valid = True):
++    def PrintResourcesState(self, valid=True):
          for res in self.database:
--            if((not valid) | res._valid):
--                print (f"{res._name}:")
--                print (f"\tvalid: {res._valid}")
--                print (f"\tstate: {res._state}")
--                print (f"\ttimestamp: {res._timestamp}")
--                print (f"\tlast_update: {res._lastupdate}")
++            if (not valid) | res._valid:
++                print(f"{res._name}:")
++                print(f"\tvalid: {res._valid}")
++                print(f"\tstate: {res._state}")
++                print(f"\ttimestamp: {res._timestamp}")
++                print(f"\tlast_update: {res._lastupdate}")
  
      ########################
      # Update Resources State
          for res in self.database:
              result = GetResource(f"{URL_RES_PREFIX}{res._href}", self.mercedesConfig)
              if not "reason" in result:
--                res.UpdateState(result[res._name]["value"], result[res._name]["timestamp"])
++                res.UpdateState(
++                    result[res._name]["value"], result[res._name]["timestamp"]
++                )
          # Write Resource File
          self.WriteResourcesFile()