#!/usr/bin/env python3 # # $Id: python3-oauth2.py 2174 2023-08-01 13:55:34Z omaury $ # # Usage: # ./python3-oauth2.py -h __author__ = "Olivier Maury" __revision__ = "$Rev: 2174 $" import json import logging import os import requests import ssl import sys import time from urllib import parse from urllib.error import HTTPError from urllib.request import Request, urlopen from optparse import OptionGroup, OptionParser def arg_options(): """ Arguments from command line interface. """ usage = "%prog [options]" parser = OptionParser(usage=usage, version=__revision__) group_auth = OptionGroup(parser, "Authentication", u"User authentication.") group_auth.add_option("-i", "--client-id", type="string", dest="client_id", metavar="CLIENT_ID", help=u"Id of OAuth2 client application") group_auth.add_option("-t", "--client-secret", type="string", dest="client_secret", metavar="CLIENT_SECRET", help=u"Secret of OAuth2 client application for client_credentials flow") group_auth.add_option("-u", "--url", type="string", dest="url", metavar="URL", help=u"Base URL of CLIMATIK. Default: \"%default\"", default="https://agroclim.inrae.fr/climatik/") group_auth.add_option("-f", "--flow", type="string", dest="flow", metavar="FLOW", help=u"OAuth2 flow: client_credentials or device_code. Default: \"%default\"", default="client_credentials") parser.add_option_group(group_auth) (options, args) = parser.parse_args() return options class ClimatikClient(object): """ Simple OAuth2 example for CLIMATIK in client_crediental mode. """ def __init__(self, base_url, client_id=None, client_secret=None, ignore_ssl=False, logger=None): self._logger = logger or logging.getLogger(__name__) self._base_url = base_url self._client_id = client_id self._client_secret = client_secret self._resource_endpoint = base_url + "rs/" # preproduction has invalid SSL certificate if ignore_ssl : self._urlopenargs = {"context": ssl._create_unverified_context()} else: self._urlopenargs = {} self._access_token = None self._refresh_token = None def access_token(self, flow): """ Get tokens using the specified flow. """ self._logger.debug('OAuth2 flow: %s' % flow) if flow == 'client_credentials': self.client_credentials() elif flow == 'device_code': self.device_code() def client_credentials(self): """ Get tokens using client_credentials flow. """ data = { 'client_id': self._client_id, 'client_secret': self._client_secret, 'grant_type': 'client_credentials' } response = self.post('rs/oauth/token', data) self._access_token = response['access_token'] self._expires_at = response['expires_at'] self._refresh_token = response['refresh_token'] return response def deauthorize(self): return self.post('rs/oauth/deauthorize') def device_code(self): """ Get tokens using device_code flow. """ data = { 'client_id': self._client_id, 'scope': 'data:read' } response = self.post('rs/oauth/device/code', data) self._logger.debug('device_code: %s' % response['device_code']) self._logger.debug('user_code: %s' % response['user_code']) self._logger.debug('verification_uri: %s' % response['verification_uri']) self._logger.debug('verification_uri_complete: %s' % response['verification_uri_complete']) self._logger.debug('expires_in: %s' % response['expires_in']) self._logger.debug('interval: %s' % response['interval']) self._logger.info('Dear user, to activate this script to access CLIMATIK, go to %s and confirm the code "%s".' % (response['verification_uri_complete'], response['user_code'])) os.system('firefox "%s"' % response['verification_uri_complete']) device_code = response['device_code'] expires_in = int(response['expires_in']) interval = int(response['interval']) i = 0 max_calls = expires_in / interval while i < max_calls: i += 1 self._logger.debug('Try #%d/%d' % (i, max_calls)) data = { 'client_id': self._client_id, 'device_code': device_code, 'grant_type': 'device_code' } response = self.post('rs/oauth/token', data) if 'access_token' in response: self._access_token = response['access_token'] self._expires_at = response['expires_at'] self._refresh_token = response['refresh_token'] self._logger.info('Tokens are retrieved') break self._logger.debug('error: %s' % response['error']) self._logger.debug('error_description: %s' % response['error_description']) self._logger.debug('sleeping for %d seconds' % interval) time.sleep(interval) self._logger.debug('sleeping done') def get(self, resource, parameters=None): if parameters: query = '?' + parse.urlencode(parameters) else: query = "" req = Request(self._base_url + resource + query) req.add_header('Content-Type', 'application/json; charset=utf-8') return self._request(req) def post(self, resource, values=None): if values is None: values = {} self._logger.debug("POST %s %s" % (resource, values)) data = parse.urlencode(values).encode('utf-8') req = Request(self._base_url + resource, data) req.add_header('Content-Type', 'application/x-www-form-urlencoded; charset=utf-8') if self._access_token is not None: req.add_header('Authorization', 'Bearer %s' % self._access_token) return self._request(req) def _request(self, req): if self._access_token is not None: req.add_header('Authorization', 'Bearer ' + self._access_token) try: with urlopen(req, **self._urlopenargs) as response: encoding = response.info().get_content_charset('utf-8') self._logger.debug("url: %s" % response.url) self._logger.debug("headers: %s" % response.headers) self._logger.debug("reason: %s" % response.reason) self._logger.debug("status: %d" % response.status) self._logger.debug("encoding: %s" % encoding) data = response.read() self._logger.debug("data: %s" % data) return json.loads(data.decode(encoding)) except HTTPError as e: self._logger.error("error: %s" % e) self._logger.error("url: %s" % e.url) self._logger.error("headers: %s" % e.headers) self._logger.error("reason: %s" % e.reason) self._logger.error("status: %d" % e.status) data = e.read() self._logger.error("data: %s" % data) return None def refresh_token(self): data = { 'client_id': self._client_id, 'code': self._refresh_token, 'grant_type': 'refresh_token' } response = self.post('rs/oauth/token', data) self._access_token = response['access_token'] self._expires_at = response['expires_at'] self._refresh_token = response['refresh_token'] return response def series(self): return self.get('rs/series') def stations(self): return self.get('rs/stations') def variables(self, parameters=None): return self.get('rs/variables', parameters) def main(): """ Launcher. """ options = arg_options() if options.client_id is None: print("client_id missing!") sys.exit(1) if options.flow != 'device_code' and options.client_secret is None: print("client_secret missing!") sys.exit(1) # logging.basicConfig(level=logging.DEBUG) client = ClimatikClient( base_url = options.url, client_id = options.client_id, client_secret = options.client_secret, ignore_ssl=True) #print(client.get("openapi/openapi.json")) #print("%d séries disponibles" % client.series()['count']) #print("%d stations disponibles" % client.stations()['count']) #print("%d variables disponibles" % client.variables()['count']) #print("%d variables brutes" % client.variables({'type': 'RAW'})['count']) print(client.access_token(options.flow)) print(client.get("rs/data?station=84007004×cale=DAILY&variable=TM&variable=ETPP&variable=U8&start=2018-01-01&end=2018-01-31")) #print(client.get("rs/data?station=84007004×cale=DECADAL&variable=TM_MD&variable=TN_MD&start=2018-01-01&end=2018-01-31")) print(client.refresh_token()) print(client.deauthorize()) if __name__ == "__main__": main()