Source code for api_traitement.api_functions

"""
#######################################################
#
# Fonctions relatives à la connexion et requêtes faites à l'api
#
#######################################################
"""

import json
import os
import re
from time import gmtime, strftime, strptime
import requests

from django.contrib.auth import authenticate
from django.utils.html import format_html

from api_traitement.common_functions import serialize, pretty_print

# from webapps.models import User
from webapps.models import LTOUser as User


from django.utils.translation import gettext as _
from configuration.conf import TIMEOUT_VALUE

########### Token ###########

[docs] def get_token(base_url, data): """ Fonction qui permet d'avoir un token Args: base_url (str) : url de base de l'API data (json): format de données json exemple ci-dessous exemple: data = { "config.login": "username", "config.password": "password", "config.databaseName": "database", "referentialLocale": "fr_FR" } Returns: token (str) :rtype: object """ url = base_url + "/init/open" response = requests.get(url, params=data, timeout=TIMEOUT_VALUE) print(response.url) token = response.json()['authenticationToken'] return token
[docs] def is_valid(base_url, token): """ Fonction booléenne qui test si le token est encore valide Args: token (str) """ api_base = base_url + '/init/information?' # Constitution du lien url pour accéder à l'API et fermer la connexion api_url = api_base + 'authenticationToken=' + token response = requests.get(api_url, timeout=TIMEOUT_VALUE) print("reponse of is valid function ", response.status_code) return response.status_code == 200
[docs] def reload_token(username, password, base_url, database): """ Fonction qui recharge un token Args: username: identifiant de connexion password: mot de passe de connexion base_url: l'url de base du profile database: la base de données pour la connexion Returns: token """ print("user database: ", database) data_user_connect = { "config.login": username, "config.password": password, "config.databaseName": database, "referentialLocale": "FR", # "referentialLocale": data_user.ref_language, } return get_token(base_url, data_user_connect)
[docs] def get_all_referential_data(token, module, base_url): """Fonction qui récupère les données de références sur le webservice Args: token module: ps ou ll base_url: url de connexion - 'https://observe.ob7.ird.fr/observeweb/api/public' Returns: dict """ url = base_url + "/referential/" + module + "?authenticationToken=" + token ac_cap = requests.get(url, timeout=TIMEOUT_VALUE) if ac_cap.status_code == 200: dicoModule = {} for val in json.loads(ac_cap.text)["content"]: vals = val.rsplit('.', 1)[1] dicoModule[vals] = [] for valin in json.loads(ac_cap.text)["content"][val]: dicoModule[vals].append(valin) print("="*20, "get_all_referential_data", "="*20) # print(dicoModule) return dicoModule else: return _("Problème de connexion pour recuperer les données")
[docs] def load_data(token, base_url, forceUpdate=False): """ Fonction qui recupere toutes les données de refences au format JSON de la base de données et stocke ces données dans un fichier en local. Elle recupere les données de references une fois par jour et elle est utilisé pour faire mise à jour des données de references sur le site Args: token (str): token recuperé base_url (str): url de base de l'API forceUpdate (bool): True ou False => utilisée dans le cas de la mise à jour des données de references forcées par l'utilisateur Returns: allData (json) """ print("_"*20, "load_data function starting", "_"*20) day = strftime("%Y-%m-%d", gmtime()) # Si les dossiers ne sont pas existant, on les créés if not os.path.exists("media/data"): os.makedirs("media/data") if not os.path.exists("media/temporary_files"): os.makedirs("media/temporary_files") files = os.listdir("media/data") def subFunction(token, day, url): ref_common = get_all_referential_data(token, "common", url) ps_logbook = get_all_referential_data(token, "ps/logbook", url) ps_common = get_all_referential_data(token, "ps/common", url) ll_common = get_all_referential_data(token, "ll/common", url) program = { 'Program': { 'seine' :ps_common["Program"], 'longline':ll_common["Program"] } } vesselActivity = { 'VesselActivity': { 'seine' :ps_common["VesselActivity"], 'longline':ll_common["VesselActivity"] } } # Suppression des éléments suivant del ps_common["Program"] del ll_common["Program"] del ps_common["VesselActivity"] del ll_common["VesselActivity"] allData = {**ref_common, **ps_logbook, **ps_common, **ll_common, **program, **vesselActivity} # allData = {**ref_common, **ps_logbook, **ps_common} ref_common = get_all_referential_data(token, "common", url) # ref_common2 ="https://observe.ob7.ird.fr/observeweb/api/public/referential/common?authenticationToken=6811592f-bf3b-4fa0-8320-58a4a58c9ab7" ps_logbook = get_all_referential_data(token, "ps/logbook", url) ps_common = get_all_referential_data(token, "ps/common", url) ll_common = get_all_referential_data(token, "ll/common", url) print("="*20, "load_data SubFunction", "="*20) # print(ref_common[5:]) # with open('allData_load.json', 'w', encoding='utf-8') as f: # json.dump(allData, f, ensure_ascii=False, indent=4) file_name = "media/data/data_" + str(day) + ".json" with open(file_name, 'w', encoding='utf-8') as f: f.write(json.dumps(allData, ensure_ascii=False, indent=4)) return allData if (0 < len(files)) and (len(files) <= 1) and (forceUpdate == False): last_date = files[0].split("_")[1].split(".")[0] last_file = files[0] formatted_date1 = strptime(day, "%Y-%m-%d") formatted_date2 = strptime(last_date, "%Y-%m-%d") # Verifier si le jour actuel est superieur au jour precedent if (formatted_date1 > formatted_date2): allData = subFunction(token, day, base_url) # Suprimer l'ancienne os.remove("media/data/" + last_file) print("="*20, "allData updated", "="*20) # print(allData[5:]) else: file_name = "media/data/" + files[0] # Opening JSON file f = open(file_name , encoding='utf-8') # returns JSON object as a dictionary allData = json.load(f) print("="*20, "allData already existing", "="*20) # print(allData) else: list_file = os.listdir("media/data") for file_name in list_file: os.remove("media/data/" + str(file_name)) allData = subFunction(token, day, base_url) print("="*20, "subFunction getting allData", "="*20) # print(allData[5:]) return allData
[docs] def get_one_from_ws(token, base_url, route, topiaid): """ Fonction qui interroge le web service (ws) pour récupérer toutes les données relatives à une route et un topiaid Args: token (str): token base_url: chemin d'accès à la connexion ('https://observe.ob7.ird.fr/observeweb/api/public') route: chemin d'accès plus précis (par ex : '/data/ll/common/Trip/') topiaid: topiaid avec des '-' à la place des '#' Returns: file.json: informations relatives au topiaid fourni """ headers = { 'authenticationToken': token, } params = { 'config.recursive' : 'true', } url = base_url + route + topiaid response = requests.get(url, headers=headers, params = params, timeout=TIMEOUT_VALUE) if response.status_code == 200 : # with open(file = "media/temporary_files/previoustrip.json", mode = "w") as outfile: # outfile.write(response.text) return response.content else: return None
[docs] def trip_for_prog_vessel(token, base_url, route, vessel_id, programme_topiaid): """ Pour un navire et un programme donnée, renvoie le topiaid du dernier trip saisi Args: token base_url: 'https://observe.ob7.ird.fr/observeweb/api/public' vessel_id: topiaid du navire (avec les '-') programme_topiaid: topiaid du programme choisi (avec les '-') Returns: trip topiaid """ # api_base = 'https://observe.ob7.ird.fr/observeweb/api/' api_trip = '?authenticationToken=' api_vessel_filter = '&filters.vessel_id=' api_programme_filter = '&filters.logbookProgram_id=' api_ordeer_filter = '&orders.endDate=DESC' api_trip_request = base_url + route + api_trip + token + api_vessel_filter + vessel_id + api_programme_filter + programme_topiaid + api_ordeer_filter # print("&"*30, api_trip_request) response = requests.get(api_trip_request, timeout=TIMEOUT_VALUE) return response.content
[docs] def send_trip(token, data, base_url, route): """ Fonction qui ajoute un trip (marée) dans la base Args: token (str): token data (json): json file (trip) que l'on envoie dans la base base_url (str): 'https://observe.ob7.ird.fr/observeweb/api/public' base de connexion à l'api route (str): '/data/ps/common/Trip' ou '/data/ll/common/Trip' Returns: le json inséré dans le temporary_files text message: logbook bien inséré, ou bien un json d'erreur """ data_json = json.dumps(data, default=serialize) headers = { "Content-Type": "application/json", 'authenticationToken': token } url = base_url + route print("Post - send data") pretty_print(data) response = requests.post(url, data=data_json, headers=headers, timeout=TIMEOUT_VALUE) # print(response.status_code, "\n") if response.status_code == 200: # return json.loads(res.text) return (_("Logbook inséré avec success"), 1) else: with open(file = "media/temporary_files/error.json", mode = "w", encoding="utf-8") as outfile: outfile.write(response.text) try: return (error_filter(response.text), 2) # return (error_filter(response.text), 6) # 6 pour utiliser le niveau d'erreur personnalisée # return json.loads(res.text), 2 except KeyError: try: err_data = json.loads(response.text) # Cas où on reçoit une erreur inattendue # httpCode 500 du serveur if isinstance(err_data, dict) and "exception" in err_data: raw_msg = err_data["exception"] raw_msg_type = err_data["exceptionType"] if 'java.lang.NumberFormatException' == raw_msg_type: # Vérifie si l'erreur mentionne une valeur inattendue if raw_msg['detailMessage']: # Ex: 'For input string: "2024-12-30T00"' bad_value_match = re.search(r'for input string:\s*"([^"]+)"', raw_msg['detailMessage'].lower()) bad_value = bad_value_match.group(1) if bad_value_match else "valeur inconnue" return (_(f"<strong>Erreur de format détectée :</strong> une valeur inattendue <i>\"{bad_value}\" </i> a été trouvée dans le fichier, <br> alors qu'un nombre était attendu. Veuillez corriger cette donnée et réessayer."), 2) # Autres cas d'erreur connus à capter ici si besoin... # Sinon, retourne le message brut avec un avertissement print( f"Erreur inconnue détectée : {raw_msg}.") return (_("Veuillez vérifier les données du fichier ou contacter un administrateur."), 3) # Si la structure n'est pas conforme du tout return (_("Format d'erreur inattendu. Vérifiez le contenu du fichier."), 3) except Exception as e: # Si json.loads échoue ou autre erreur print("Erreur interne non gérée :", str(e)) return (_("L'insertion de ce logbook n'est pas possible. Désolé, veuillez essayer un autre."), 3)
[docs] def update_trip(token, data, base_url, topiaid): """ Fonction qui met à jour un trip dans la base de données, donc supprime le trip existant pour insérer le nouveau data_json sous le même topiaid Args: token (str): token data (json): json file qu'on envoie dans la base base_url (str): 'https://observe.ob7.ird.fr/observeweb/api/public' base de connexion à l'api topiaid du trip que l'on veut update (l'ancienne version sera supprimée) Returns: """ data_json = json.dumps(data, default=serialize) headers = { "Content-Type": "application/json", 'authenticationToken': token,} url = base_url + '/data/ll/common/Trip/' + topiaid pretty_print(data) response = requests.put(url, data=data_json, headers=headers, timeout=TIMEOUT_VALUE) print("Code resultat de la requete", response.status_code) # if response.status_code == 200: # return (_("Logbook inséré avec success"), 1) # else: # with open(file = "media/temporary_files/errorupdate.json", mode = "w", encoding="utf-8") as outfile: # outfile.write(response.text) # return (_("L'insertion de cet logbook n'est pas possible. Désolé veuillez essayer un autre"), 3) if response.status_code == 200: # return json.loads(res.text) return (_("Logbook inséré avec success"), 1) else: with open(file = "media/temporary_files/errorupdate.json", mode = "w", encoding="utf-8") as outfile: outfile.write(response.text) try: return (error_filter(response.text), 2) # return (error_filter(response.text), 6) # 6 pour utiliser le niveau d'erreur personnalisée # return json.loads(res.text), 2 except KeyError: # Faire une fonction pour mieux traiter ce type d'erreur # print("Message d'erreur: ", json.loads(res.text)["exception"]["result"]["nodes"]) # A faire # print("Message d'erreur: ", json.loads(response.text)) # A faire # return (error_filter(response.text), 2) return (_("L'insertion de ce logbook n'est pas possible. Désolé veuillez essayer un autre"), 3)
[docs] def getId_Data(token, base_url, moduleName, argment, route): """ Fonction qui permet de retourner un id en fonction du module et de la route envoyé Args: token (str):token base_url (str): url de base de l'API moduleName (str): le module de la base de donnée argment (str): les arguments de la requete sur le module route (str): chemin de l'API de la requete en fonction de la structure de la base de données. exemple: moduleName = "Trip" route = "/data/ps/common/" argment = "startDate=" + ... + "&filters.endDate=" + ... + "&filters.vessel_id=" + ... OU argment = "startDate=" + ... Returns: id (str): """ headers = { "Content-Type": "application/json", 'authenticationToken': token } urls = base_url + route + moduleName + "?filters." + argment rep = requests.get(urls, headers=headers, timeout=TIMEOUT_VALUE) # print(rep.url) if rep.status_code == 200: return json.loads(rep.text)["content"][0]["topiaId"] else: return json.loads(rep.text)["message"]
[docs] def check_trip(token, content, base_url): """ Fonction qui permet de verifier si la marée a inserer existe déjà dans la base de donnée Args: token (str): token base_url (str): url de base de l'API content (json): fragment json de la donnée logbook Returns: id_ (str): topid de la marée si elle existe ms_ (bool): Utilisée pour verifier le statut de la fonction (True == id trouvé) """ start = content["startDate"].replace("T00:00:00.000Z", "") end = content["endDate"].replace("T00:00:00.000Z", "") vessel_id = content["vessel"].replace("#", "-") # print(start, end, vessel_id) id_ = "" ms_ = True try: id_ = getId_Data(token, base_url=base_url, moduleName="Trip", route="/data/ps/common/", argment="startDate=" + start + "&filters.endDate=" + end + "&filters.vessel_id=" + vessel_id) except: ms_ = False return id_, ms_
# Supprimer un trip
[docs] def del_trip(base_url, token, content): """ Fonction qui permet de verifier si la marée a inserer existe déjà dans la base de donnée Args: token (str): token content (json): fragment json de la donnée logbook Returns: (json) """ dicts = json.dumps(content) headers = { "Content-Type": "application/json", 'authenticationToken': token } id_, ms_ = check_trip(token, content, base_url) if ms_ == True: id_ = id_.replace("#", "-") url = base_url + '/data/ps/common/Trip/' + id_ print(id_) print("Supprimer") res = requests.delete(url, data=dicts, headers=headers, timeout=TIMEOUT_VALUE) print(res.status_code, "\n") if res.status_code == 200: print("Supprimer avec succes") return json.loads(res.text) else: try: return error_filter(res.text) except KeyError: print("Message d'erreur: ", json.loads(res.text))
[docs] def error_filter(response): """ Permet de simplifier l'affichage des erreurs dans le programme lors de l'insertion des données """ error = json.loads(response) text_l = [] # Liste pour stocker les textes d'erreur def error_message(nodes, text_list): if 'children' in nodes: # Appel récursif pour explorer les sous-nœuds child_text = str(nodes['datum']['text']) if child_text not in text_list: text_list.append(child_text) return error_message(nodes['children'][0], text_list) if 'messages' in nodes: temp = nodes['messages'] text = nodes['datum']['text'] # Ajout du texte d'erreur dans la liste si pas déjà présent if text not in text_list: text_list.append(text) # text_list = text_list[-1] # Expression régulière pour extraire la date et l'heure time_pattern = r"(\d{2}/\d{2}/\d{4})" # pour la date date_pattern = r"##(\d{2}:\d{2})##" # pour l'heure # Variables pour stocker la date et l'heure date = "" heure = "" # Recherche du motif dans text_list for idx, value in enumerate(text_list): match = re.search(time_pattern, value) # Recherche de la date match2 = re.search(date_pattern, value) # Recherche de l'heure if match: date = match.group(1) # Extraction de la date text_list[idx] = date # Remplacer la date dans text_list elif match2: heure = match2.group(1) # Extraction de l'heure text_list[idx] = heure # Remplacer l'heure dans text_list try: text_list.remove(heure) text_list.remove(date) except ValueError: print("heure ou date non presente dans la liste") # Génération du format HTML sous forme de tableau Tailwind if date != "" and heure != "": return f""" <div class="p-4 mb-4 bg-red-100 border-t-4 border-red-500 dark:bg-red-200 rounded-lg" role="alert"> <div class="overflow-x-auto"> <table class="w-full text-sm text-left text-red-700 border border-red-400 rounded-lg"> <thead class="text-xs uppercase bg-red-200 text-red-800"> <tr> <th scope="col" class="px-4 py-2">Date</th> <th scope="col" class="px-4 py-2">Heure</th> <th scope="col" class="px-4 py-2">Champs Erreur</th> <th scope="col" class="px-4 py-2">Message Erreur</th> </tr> </thead> <tbody> <tr class="border-t border-red-400"> <td class="px-4 py-2">{date}</td> <td class="px-4 py-2">{heure}</td> <td class="px-4 py-2">{temp[0]['fieldName']}</td> <td class="px-4 py-2">{temp[0]['message']}</td> </tr> </tbody> </table> </div> </div> """ else: return f""" <div class="p-4 mb-4 bg-red-100 border-t-4 border-red-500 dark:bg-red-200 rounded-lg" role="alert"> <div class="overflow-x-auto"> <table class="w-full text-sm text-left text-red-700 border border-red-400 rounded-lg"> <thead class="text-xs uppercase bg-red-200 text-red-800"> <tr> <th scope="col" class="px-4 py-2">Champs Erreur</th> <th scope="col" class="px-4 py-2">Message Erreur</th> </tr> </thead> <tbody> <tr class="border-t border-red-400"> <td class="px-4 py-2">{temp[0]['fieldName']}</td> <td class="px-4 py-2">{temp[0]['message']}</td> </tr> </tbody> </table> </div> </div> """ all_message = [] # Vérifie si le premier nœud contient des messages if 'messages' in error['exception']['result']['nodes'][0]: all_message.append(error_message(error['exception']['result']['nodes'][0], text_l)) # Parcours des enfants si existants try: for val in error['exception']['result']['nodes'][0].get('children', []): all_message.append(error_message(val, text_l)) except KeyError: pass print("Len : ",len(all_message)) return all_message