by citorva » 25 Nov 2020, 14:53
Je vais essayer de la retrouver
D'ailleurs, j'ai modifié (énormément) mon script de bench en le rendant multi-processus:
Update du 29 novembre : J'ai corrigé les erreurs qui le rendait incompatible avec Windows. Le script bench_core.py et bench.py doivent être remplacé
bench_core.py, la base du système de test. Il fournit une interface entre l'interface et les tests et effectue les calculs:
- Code: Select all
import threading
import multiprocessing
from multiprocessing.managers import BaseManager
import time
import imp, sys
PARAMETRE_ECHEC_EXEPTION = 0
PARAMETRE_ECHEC_PUIT = 1
PARAMETRE_ECHEC_LEVIATHAN = 2
PARAMETRE_ECHEC_ENERGIE = 3
PARAMETRE_ECHEC_NON_REPONSE = 4
PARAMETRE_TOTAL_REUSSITE = 5
class BenchProxy:
verrou = multiprocessing.Semaphore()
verrou_maj = multiprocessing.Semaphore()
m_arret_demande = False
def __init__(self, seed, threads):
self.seed = seed
self.retour_simulation = [None] * threads
for i in range(threads):
self.retour_simulation[i] = {
"graines": [None] * 5,
"temps": [None] * 5,
"compteur": [0] * 6,
"coups_total": 0,
"compteur_coups": 0
}
def verrouiller_maj(self):
self.verrou_maj.acquire()
def deverouiller_maj(self):
self.verrou_maj.release()
def ajout_compteur(self, identifiant, evenement, graine, temps, coups=0):
"""
Ajoute au compteur un événement défini par evenement avec la graine données
:argument evenent: l'identifiant de l'événement
:argument graine: la graine pour laquelle l'événement s'est déroulé
:argument coups: le nombre de coups en cas de victoire
"""
self.retour_simulation[identifiant]["compteur"][evenement] += 1
# PARAMETRE_TOTAL_REUSSITE = 5
if evenement == 5:
self.retour_simulation[identifiant]["coups_total"] += coups
self.retour_simulation[identifiant]["compteur_coups"] += 1
else:
self.retour_simulation[identifiant]["graines"][evenement] = graine
self.retour_simulation[identifiant]["temps"][evenement] = temps
def valeurs_generales(self):
"""
Récupère les valeurs additionnées à partir des données du banc de test
"""
donnees = self.retour_simulation.copy()
graines = [None] * 5
temps = [None] * 5
compteur = [0] * 6
coups_totaux, nombre_victoires = 0, 0
for i in range(len(donnees)):
for j in range(5):
if donnees[i]["graines"][j] != None and donnees[i]["temps"][j] != None:
if graines[j] == None or temps[j] == None or temps[j] < donnees[i]["temps"][j]:
graines[j] = donnees[i]["graines"][j]
temps[j] = donnees[i]["temps"][j]
for j in range(6):
compteur[j] += donnees[i]["compteur"][j]
coups_totaux += donnees[i]["coups_total"]
nombre_victoires += donnees[i]["compteur_coups"]
coups_moyens = 0
if nombre_victoires:
coups_moyens = coups_totaux / nombre_victoires
return compteur, graines, coups_moyens
def nouvelle_graine(self):
"""
Génère et retourne une nouvelle graine.
:return: la nouvelle graine généré
"""
ret = 0
self.verrou.acquire()
self.seed = (self.seed * 214013 + 2531011) % 4294967296
ret = self.seed
self.verrou.release()
return ret
def demande_arret(self):
self.m_arret_demande = True
def arret_demande(self):
return self.m_arret_demande
class Bench:
"""
Classe générale du banc de test. Initialise les variables ainsi que fils d'exécution
"""
# nb_exec
# Compteur des événements
compteur = [0] * 6
total_ech = 0
total_compteur = 0
trajet_moyen = 0
# Graines des événements
graines = [None] * 5
arrete = False
def __init__(self, nb_fil_exec, graine, nombre, module, duree_max, web_dim, web_density, pit_density, bats_density):
"""
Initialise le banc de test
:argument nb_fil_exec: le nombre de fils d'exécution à utiliser
:argument graine: la graine du test, est utilisé par la suite afin d'identifier les tests ayant échoué
:argument nombre: le nombre de tests à effectuer
:argument module: le nom du module à charger
:argument duree_max: la durée maximale pour l'exécution d'un test. Au delà de cette durée, ce test sera annulé
:argument web_dim: la taille de la zone de jeu
:argument web_density: la densité de corniches voisine à chaque corniche
:argument pit_density: la densité de puits dans une partie
:argument bats_density: la densité de chauve-souris dans une partie
"""
fil_exec = [None] * nb_fil_exec
nombre_restant = nombre
nombre_thread = nombre_restant // nb_fil_exec
BaseManager.register('BenchProxy', BenchProxy)
self.mgr = BaseManager()
self.mgr.start()
self.proxy = self.mgr.BenchProxy(graine, nb_fil_exec)
with open(module, 'r') as f:
donnees_module = f.read()
for i in range(nb_fil_exec):
fil_exec[i] = BenchmarkThread(
i, self.proxy, nombre_thread if i < nb_fil_exec - 1 else nombre_restant,
donnees_module, duree_max, web_dim, web_density, pit_density, bats_density
)
nombre_restant -= nombre_thread
self.fil_exec = fil_exec
self.nb_exec = nombre
self.graine = multiprocessing.Value('i', graine)
def demarre(self):
"""
Démarre les tests
"""
for i in range(len(self.fil_exec)):
self.fil_exec[i].start()
def mise_a_jour_donnees(self):
"""
Met à jour les valeurs du test général à partir des valeurs données par les différents fils d'exécution
"""
self.compteur, self.graines, self.trajet_moyen = self.proxy.valeurs_generales()
self.total_compteur = 0
self.total_ech = 0
for i in range(len(self.compteur)):
self.total_compteur += self.compteur[i]
if i != len(self.compteur):
self.total_ech += self.compteur[i]
def test_en_cours(self):
"""
Dit si les tests sont terminés ou pas
:return: True si les tests sont en cours sinon False
"""
for i in self.fil_exec:
if i.is_alive():
return True
return False
def arret(self):
"""
Arrête les tests en cours et patiente jusqu'à l'arrêt définitif de ces derniers
"""
if not self.arrete:
self.proxy.demande_arret()
for i in self.fil_exec:
if i.is_alive():
i.terminate()
self.mgr.shutdown()
self.arrete = True
class BenchmarkThread(multiprocessing.Process):
"""
Fil d'exécution unique du banc de test
"""
attente_mise_a_jour = False
def __init__(self, identifiant, proxy, nombre, module, duree_max, web_dim, web_density, pit_density, bats_density):
"""
Initialise le sous processus de benchmark
:param identifiant: L'identifiant unique du sous processus afin de lui donner l'accès aux ressources compris
entre 0 et N-1 avec N le nombre de sous processus.
:param proxy: L'objet BenchProxy spécifiant la mémoire partagé entre les différents objets partagés.
:param nombre: Le nombre de tests à effectuer dans cette fonction
:param module: Le module à tester
:param duree_max: La durée maximale d'un test unitaire avant son annulation
:param web_dim: La dimension de la zone de jeu
:param web_density: La densité moyenne de corniches voisine à une corniche.
:param pit_density: La densité moyenne de puits dans la zone de jeu.
:param bats_density: La densité moyenne de chauve-souris dans la zone de jeu.
"""
super().__init__()
self.proxy = proxy
self.nb_exec = nombre
self.module = module
self.id = identifiant
# Paramètres d'une partie
self.duree_max = duree_max
self.web_dim = web_dim
self.web_density = web_density
self.pit_density = pit_density
self.bats_density = bats_density
def run(self):
compteur = 0
module = BenchmarkThread.charge_ia(self.module, "ia")
while compteur < self.nb_exec and not self.proxy.arret_demande():
graine = self.proxy.nouvelle_graine()
test = threading.Thread(target=BenchmarkThread.unitary_loop, args=[self, module, graine])
test.start()
action_annulee = False
temps_depart = time.time()
while ((test.is_alive() and not self.proxy.arret_demande() and not action_annulee)
or self.attente_mise_a_jour):
if time.time() - temps_depart > self.duree_max:
action_annulee = True
if action_annulee or self.proxy.arret_demande():
if action_annulee:
# PARAMETRE_ECHEC_NON_REPONSE = 4
self.proxy.ajout_compteur(self.id, 4, graine, time.time())
self.attente_mise_a_jour = False
del test
compteur += 1
def unitary_loop(self, module, graine):
(victoire, type_defaite, _, nombre_coups) = module.explore(
graine,
self.web_dim,
self.web_density,
self.pit_density,
self.bats_density
)
self.attente_mise_a_jour = True
if graine == 1789028489:
print(victoire, type_defaite, _, nombre_coups)
if victoire:
# PARAMETRE_TOTAL_REUSSITE = 5
self.proxy.ajout_compteur(self.id, 5, 0, 0, nombre_coups)
else:
if type_defaite & 1:
# PARAMETRE_ECHEC_PUIT = 1
self.proxy.ajout_compteur(self.id, 1, graine, time.time())
elif type_defaite & 2:
# PARAMETRE_ECHEC_LEVIATHAN = 2
self.proxy.ajout_compteur(self.id, 2, graine, time.time())
elif type_defaite & 4:
# PARAMETRE_ECHEC_EXEPTION = 0
self.proxy.ajout_compteur(self.id, 0, graine, time.time())
elif type_defaite & 8:
# PARAMETRE_ECHEC_ENERGIE = 3
self.proxy.ajout_compteur(self.id, 3, graine, time.time())
self.attente_mise_a_jour = False
@staticmethod
def charge_ia(donnees, nom):
"""
Génère un module de l'IA en fonction des données et du nom donné
:argument donnees: le code source de l'IA écrit en python
:argument nom: le nom du module. Ce dernier doit être unique afin d'éviter tout conflit entre les fils
d'exécution
:return: le module généré
"""
ret = imp.new_module(nom)
exec(donnees, ret.__dict__)
sys.modules[nom] = ret
return ret
web.py le code (un peu modifié) du système de jeu fourni par Critor
- Code: Select all
from math import pi, cos, sin, floor
import sys
rnd_seed = 0xc0ffee
web_dim = 36
web_density = .05
pits_density = .1
bats_density = .15
def initconst(seed, dim, w_d, p_d, b_d):
global rnd_seed, web_dim, web_density, pits_density, bats_density
rnd_seed = seed
web_dim = dim
web_density = w_d
pits_density = p_d
bats_density = b_d
# Implémentation de fonctions personnalisé de random
def rnd():
global rnd_seed
rnd_max = 0x7fff
rnd_seed = (rnd_seed * 214013 + 2531011) % 4294967296
return ((rnd_seed // (2*rnd_max + 1)) & rnd_max)
def random():
return rnd() / 0x7fff
def randint(a,b):
return rnd() % (b-a+1) + a
def choice(l):
return l[randint(0, len(l)-1)]
# Fin de l'implémentation
screen_h = 240
m_p, m_l, m_k, m_b, m_d, m_a, m_m = 1, 4, 16, 64, 256, 1024, 4096
def insertinto(l1, l2):
for v in l1:
if v not in l2:
l2.append(v)
return l2
def removefrom(l1, l2):
for v in l1:
try:
l2.remove(v)
except:
pass
return l2
def connectPlatforms(s1, s2):
global web
web[s1][s2], web[s2][s1] = 1, 1
def get_reachable_platforms_from_platforms(l, safe):
lv = []
for s in l:
for i in range(dimweb):
if web[s][i]:
if i not in lv and (not(safe) or not (platforms[i] & m_p)):
lv.append(i)
return lv
def cango(s1, s2, safe):
lvo1, lvi1, lvo2, lvi2, t_inter, k = [], [s1], [], [s2], 0, 0
while not (t_inter) and len(lvi1) and len(lvi2):
lvo1, lvo2 = insertinto(lvo1, lvi1), insertinto(lvo2, lvi2)
for v in lvo1:
if v in lvo2:
return k
lvi1, lvi2 = get_reachable_platforms_from_platforms(lvo1, safe), get_reachable_platforms_from_platforms(lvo2, safe)
lvi1, lvi2 = removefrom(lvo1, lvi1), removefrom(lvo2, lvi2)
k += 1
return 0
def my_bitor(a, b):
return ~(~a & ~b)
def init_web(d, p_p, p_b):
global web, platforms, screen_h
l0 = list(range(dimweb))
l0.remove(0)
web, platforms, conn, dconn, i_k = [], [0 for k in range(dimweb)], [0], list(range(1, dimweb)), choice(l0)
for j in range(dimweb):
web.append([0 for k in range(dimweb)])
while len(dconn):
s = dconn[randint(0, len(dconn) - 1)]
connectPlatforms(conn[randint(0, len(conn) - 1)], s)
dconn.remove(s)
conn.append(s)
for j in range(dimweb-1):
for i in range(j + 1, dimweb):
if floor(d + random()):
connectPlatforms(i, j)
i_d = choice(l0)
platforms[i_d] = my_bitor(platforms[i_d], m_d)
l1 = list(l0)
for v in get_reachable_platforms_from_platforms([0], 0):
l1.remove(v)
if not(len(l1)):
l1 = l0
l2 = list(l1)
for v in get_reachable_platforms_from_platforms(get_reachable_platforms_from_platforms([0], 0), 0):
try:
l2.remove(v)
except:
pass
if not(len(l2)):
l2 = l1
i_l = choice(l2)
platforms[i_l] = my_bitor(platforms[i_l], m_l)
platforms[i_k] = my_bitor(platforms[i_k], m_k)
for i in l1:
if i != i_k and i != i_d and floor(p_p*dimweb/len(l1) + random()):
if cango(0, i_k, 1) and cango(0, i_d, 1):
platforms[i] = my_bitor(platforms[i], m_p)
if floor(p_b*dimweb/len(l1) + random()):
platforms[i] = my_bitor(platforms[i], m_b)
def parcourir_selon(ia):
global dimweb, platforms, web_dim, web_density, pits_density, bats_density
dimweb = web_dim
maxcoups = dimweb**2 * 2
init_web(web_density, pits_density, bats_density)
s0, s1, s2, s3, s4, s5, s6, s7 = 0, 0, m_a, 0, 1, -1, 0, 0
pfs0, pfs5 = platforms[s0], 0
while s4 > 0 and (not (s2 & (2 * m_k)) or not (pfs0 & m_d)):
if s5 < 0:
s5 = 0
else:
try:
k, k2 = ia(s0, voisines, dimweb, s1, s2)
if pfs5 & (2 * m_b):
while s0 == s5:
s0 = randint(0, dimweb - 1)
pfs0, pfs5 = my_bitor(platforms[s0], m_b), pfs5 & ~(3 * m_b) & ~m_m
else:
if k2:
if s2 & m_a:
v = platforms[k]
if v & m_l:
v, s2 = v & ~m_l, my_bitor(s2, 2 * m_l)
platforms[k] = my_bitor(v, 2 * m_l)
s2 = s2 & ~m_a
s2 = my_bitor(s2, 2 * m_a)
else:
if k in voisines:
s0 = k
if pfs5 & m_b:
pfs5 = my_bitor(pfs5, 2 * m_b)
pfs0, pfs5 = platforms[s0], pfs5 & ~m_m
s3 += 1
if s3 >= maxcoups:
s4 = 0
if pfs0 & m_k:
pfs0 = pfs0 & ~m_k
s2 = my_bitor(s2, 2 * m_k)
if pfs0 & my_bitor(m_p, m_l):
s4 = 0
pfs0 = my_bitor(pfs0, 2 * m_m)
platforms[s5] = pfs5
except Exception as t_excpt:
s4 = -1
print(t_excpt)
pfs0 = my_bitor(pfs0, m_m)
s1, voisines = pfs0, get_reachable_platforms_from_platforms([s0], 0)
platforms[s0] = pfs0
for v in voisines:
t = my_bitor(m_p, m_l)
t = platforms[v] & my_bitor(t, m_k)
s1 = my_bitor(s1, t)
for v in get_reachable_platforms_from_platforms(voisines, 0):
t = platforms[v] & m_l
s1 = my_bitor(s1, t)
s5, s6, s7, pfs5 = s0, s1, s2, pfs0
r = s4 > 0 and s3 < maxcoups
s1 = 0
if not r:
if pfs0 & m_l:
s1 |= 2
elif pfs0 & m_p:
s1 |= 1
elif s3 >= maxcoups:
s1 |= 8
elif s4 < 0:
s1 |= 4
return r, s1, s2, s3
bench.py l'interface du système de test. Peut être remplacé en communiquant avec bench_core.py:
- Code: Select all
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import bench_core
if __name__ == "__main__":
import pygame
import time
import math
import sys
import argparse
import multiprocessing
import threading
#multiprocessing.set_start_method('spawn', True)
# Couleurs du programme. Peut être modifié à tout moment
couleur_txt = (0xc0, 0xc0, 0xc0) # Couleur gris clair pour le texte commun
couleur_vic = (0x28, 0xa7, 0x45) # Couleur verte pour la gauge et le texte associé
couleur_arp = (0x18, 0x18, 0x18) # Couleur de l'arrière plan général de l'application (Gris foncé)
couleur_gar = (0x21, 0x21, 0x21) # Couleur de l'arrière plan de la gauge en mode barre de chargement (Nuance de l'AP)
couleurs_echec = [
(0xf5, 0xe8, 0x00), # Couleur jaune pour signaler une exeption (un plantage de l'algorithme)
(0xff, 0x80, 0x3c), # Couleur orange pour signaler un puit (Le personnage prend une corniche avec un puit)
(0xf7, 0x40, 0x3b), # Couleur rouge pour signaler Léviathan (Le personnage se fait manger par ce dernier)
(0x7f, 0x7f, 0x7f), # Couleur grise pour signaler une manque d'énergie (càd le personnage tourne en rond)
(0xff, 0x00, 0x00) # Couleur rouge vif pour signaler une non réponse (L'algorithme prennds trop de temps)
]
# Modèles de texte
texte_modeles = [
"%0.00f%% à cause d'une exeption (%d, %d%% des échecs)%s",
"%0.00f%% tombé dans un puit (%d, %d%% des échecs)%s",
"%0.00f%% mangé par leviathan (%d, %d%% des échecs)%s",
"%0.00f%% par manque d'énergie (%d, %d%% des échecs)%s",
"%0.00f%% ne répondant pas (%d, %d%% des échecs)%s"
]
# Constantes de mise en page (Metriques)
metrique_mm = 8 # Marges de l'application (entre les bords de la fenêtre et le contenu ainsi que entre les éléments)
metrique_hg = 24 # Hauteur de la gauge en pixels
metrique_pt = 25 # Taille du texte de titre en points
metrique_pp = 12 # Taille du texte général en points
# Variables de benchmark (NE PAS MODIFIER)
# Variable de control de l'IHM
affichage_absolu = False
arret_demande = False
# Système de comptage du temps
heure_depart = 0
heure_fin = 0
# Initialisation de pygame (NE PAS MODIFIER)
pygame.font.init()
pygame.display.init()
# Initialisation des éléments graphiques (NE PAS MODIFIER)
ecran = None
police_titre = pygame.font.Font(pygame.font.get_default_font(), metrique_pt)
police = pygame.font.Font(pygame.font.get_default_font(), metrique_pp)
def cree_jauge(surface, donnees, couleur, rect):
"""
Dessine une gauge en fonctions des données et couleurs fournis dans une boite défini par rect.
:param surface: La surface où dessiner la gauge
:param donnees: Les données de la gauge dans un tableau de taille N
:param couleur: Les couleurs associés aux données de la gauge dans un tableau de taille N
:param rect: La boite où dessiner la gauge (coordonnées + taille)
:return: None
"""
total_donnees = 0
nombre_donnees = len(donnees)
taille_elements = [0] * nombre_donnees
largeur_donnees = 0
for i in donnees:
total_donnees += i
for i in range(nombre_donnees - 1):
t = int(rect.width * donnees[i] / total_donnees)
taille_elements[i] = t
largeur_donnees += t
taille_elements[-1] = rect.width - largeur_donnees
largeur_donnees = 0
for i in range(nombre_donnees):
surface.fill(couleur[i], (rect.x + largeur_donnees, rect.y, taille_elements[i], rect.height))
largeur_donnees += taille_elements[i]
def rendu_temps(temps):
"""
Affiche l'ordre de grandeur du temps restant
:param temps: Le temps restant en secondes
:return: Un texte donnant son ordre de grandeur en jour/heures/minutes
"""
minutes = temps // 60 % 60
heures = temps // 3600 % 24
jours = temps // 86400
if jours != 0:
return "~%d jour%s" % (jours, "s" if jours != 1 else "")
if heures != 0:
return "~%d heure%s" % (heures, "s" if heures != 1 else "")
if minutes != 0:
return "~%d minute%s" % (minutes, "s" if minutes != 1 else "")
return "<1 minute"
def format_duree(duree):
"""
Formate une durée en ensemble jours/heure/minutes/secondes
Cette durée formaté n'affiche pas les ordres de grandeurs nuls
:param duree: La durée à formater
:return: Le texte de la durée formaté sour le format <j>j <hh>h <mm>min <ss>s
"""
duree = int(math.floor(duree))
return "{}{:02d}s".format(
"{}{:02d}min".format(
"{}{:02d}h".format(
"{}j".format(duree // 86400) if duree // 86400 != 0 else "",
duree // 3600 % 24
) if duree // 3600 != 0 else "",
duree // 60 % 60
) if duree // 60 != 0 else "",
duree % 60
)
def afficher_graine(graine):
"""
Formate un texte avec la graine donnée ou ne donner rien si cette dernière est None
:param graine: La graine à afficher
:return: Un texte sous la forme ". Graine aléatoire: <graine>" si seed différent de None sinon ""
"""
if graine is None:
return ""
else:
return ". Graine aléatoire: %d" % graine
# TODO: Nettoyer et documenter cette fonction
def affichage_donnees():
# temps_restant = math.ceil(args.max_duration - temps_exec_unitaire)
duree = time.time() - heure_depart
total_tst = bench.total_compteur
temps_exec = duree * (args.number / total_tst - 1)
score = (1000 * (bench.compteur[bench_core.PARAMETRE_TOTAL_REUSSITE] - 2 * bench.compteur[bench_core.PARAMETRE_ECHEC_NON_REPONSE] - bench.compteur[bench_core.PARAMETRE_ECHEC_EXEPTION] // 2) - bench.trajet_moyen) * args.web_dim / total_tst
largeur = 512
hauteur = metrique_mm
texte_compteur = police_titre.render(
"Simulation %d sur %d (%0.00f%%)" % (total_tst, args.number, 100. * float(total_tst) / float(args.number)),
True, couleur_txt)
largeur = max(largeur, texte_compteur.get_width())
hauteur += texte_compteur.get_height() + metrique_mm
texte_score = police.render("Score: %d" % score, True, couleur_txt)
largeur = max(largeur, texte_score.get_width())
hauteur += texte_score.get_height() + metrique_mm
texte_victoire = police.render(
"%0.00f%% de victoires (%d). Trajet moyen: %d" % (
100 * bench.compteur[bench_core.PARAMETRE_TOTAL_REUSSITE] / total_tst, bench.compteur[bench_core.PARAMETRE_TOTAL_REUSSITE], bench.trajet_moyen),
True, couleur_vic)
largeur = max(largeur, texte_victoire.get_width())
hauteur += texte_victoire.get_height() + metrique_mm
# texte_temps_annulation = None
texte_temps_restant = None
if total_tst != args.number:
texte_temps_restant = police.render(
"Temps restant: %s. Écoulé %s" % (rendu_temps(math.ceil(temps_exec)), format_duree(duree)), True,
couleur_txt)
# texte_temps_annulation = police.render("Temps restant avant annulation: %d seconde%s" % (
# temps_restant if temps_restant > 0 else 0, "s" if temps_restant > 1 else ""), True,
# couleur_txt if temps_restant > 5 else couleur_lvt)
else:
texte_temps_restant = police.render("Tests effectués en %s" % (format_duree(heure_fin - heure_depart)), True,
couleur_vic)
# texte_temps_annulation = police.render("", True, couleur_txt)
# largeur = max(largeur, texte_temps_annulation.get_width())
# hauteur += texte_temps_annulation.get_height() + metrique_mm
largeur = max(largeur, texte_temps_restant.get_width())
hauteur += texte_temps_restant.get_height() + metrique_mm
texte_echec = []
valeur_gauge = [bench.compteur[bench_core.PARAMETRE_TOTAL_REUSSITE]]
couleur_gauge = [couleur_vic]
for i in range(5):
if bench.compteur[i] != 0:
texte_echec.append(
police.render(
texte_modeles[i] % (
100 * bench.compteur[i] / total_tst,
bench.compteur[i],
100 * bench.compteur[i] / bench.total_ech,
afficher_graine(bench.graines[i])
),
True, couleurs_echec[i]
)
)
valeur_gauge.append(bench.compteur[i])
couleur_gauge.append(couleurs_echec[i])
if affichage_absolu:
valeur_gauge.append(args.number - total_tst)
couleur_gauge.append(couleur_gar)
for i in texte_echec:
hauteur += i.get_height() + metrique_mm
largeur = max(largeur, i.get_width())
hauteur += metrique_hg + metrique_mm
largeur += 2 * metrique_mm
surface = pygame.Surface((largeur, hauteur))
surface.fill(couleur_arp)
y = metrique_mm
surface.blit(texte_compteur, (
largeur / 2 - texte_compteur.get_width() / 2, y, texte_compteur.get_width(), texte_compteur.get_height()))
y += texte_compteur.get_height() + metrique_mm
surface.blit(texte_score,
(largeur / 2 - texte_score.get_width() / 2, y, texte_score.get_width(), texte_score.get_height()))
y += texte_score.get_height() + metrique_mm
cree_jauge(surface, valeur_gauge, couleur_gauge,
pygame.Rect(metrique_mm, y, largeur - 2 * metrique_mm, metrique_hg))
y += metrique_hg + metrique_mm
surface.blit(texte_temps_restant, (
largeur / 2 - texte_temps_restant.get_width() / 2, y, texte_temps_restant.get_width(),
texte_temps_restant.get_height()))
y += texte_temps_restant.get_height() + metrique_mm
surface.blit(texte_victoire, (metrique_mm, y, texte_victoire.get_width(), texte_victoire.get_height()))
y += texte_victoire.get_height() + metrique_mm
for i in texte_echec:
surface.blit(i, (metrique_mm, y, i.get_width(), i.get_height()))
y += i.get_height() + metrique_mm
# surface.blit(texte_temps_annulation, (
# largeur / 2 - texte_temps_annulation.get_width() / 2, y, texte_temps_annulation.get_width(),
# texte_temps_annulation.get_height()))
return surface
def fonction_affichage():
"""
Routine d'affichage. Cette fonction tourne dans un thread indépendant
:return: None
"""
global arret_demande, affichage_absolu, ecran, heure_fin
temps_mise_a_jour = 0
duree_mise_a_jour = 1/args.update_frequency
debut_clic = False
while not arret_demande:
if time.time() - temps_mise_a_jour >= duree_mise_a_jour:
bench.mise_a_jour_donnees()
if bench.total_compteur != 0:
if bench.total_compteur < args.number:
heure_fin = time.time()
surface = affichage_donnees()
if ecran is None or surface.get_width() != ecran.get_width() or surface.get_height() != ecran.get_height():
ecran = pygame.display.set_mode((surface.get_width(), surface.get_height()))
ecran.blit(surface, (0, 0, ecran.get_width(), ecran.get_height()))
pygame.display.flip()
temps_mise_a_jour = time.time()
if ecran is not None:
for event in pygame.event.get():
if event.type == pygame.QUIT:
bench.arret()
arret_demande = True
elif event.type == pygame.MOUSEBUTTONDOWN:
debut_clic = True
elif event.type == pygame.MOUSEBUTTONUP and debut_clic:
affichage_absolu = not affichage_absolu
debut_clic = False
# Objet gérant le benchmark de l'IA
bench = None
# Parsing des options d'exécution
parser = argparse.ArgumentParser(
description="Effectue de nombreux tests dans le but de vérifier le comportement de l'IA pour le défi python "
"du Leviathan dans des cas aléatoires. Voir "
"https://tiplanet.org/forum/viewtopic.php?f=49&t=24387&p=257174#p257172 pour plus d'informations "
"sur le défi."
)
# Argument pour l'intelligence artificielle
parser.add_argument("ia", help="Fichier de l'IA à tester")
parser.add_argument('-n', "--number", default=100000, type=int, help="Nombre de tests à effectuer")
parser.add_argument('-s', "--seed", default=0xc0ffee, type=int, help="Graine aléatoire du benchmark")
parser.add_argument('-w', "--web-dim", default=36, type=int, help="Nombre de corniches")
parser.add_argument("-d", "--web-density", default=0.05, type=float,
help="Densité moyenne de voisine à chaque corniche")
parser.add_argument("-b", "--bats-density", default=0.15, type=float, help="Densité de chauve souris par parties")
parser.add_argument("-p", "--pit-density", default=0.1, type=float, help="Densité de puit par parties")
parser.add_argument("-m", "--max-duration", default=20, type=float, help="Durée maximum d'une partie en seconde")
parser.add_argument("-t", "--threads", default=1, type=int, help="Nombre de fils d'exécution pour les tests")
parser.add_argument("-f", "--update-frequency", default=24, type=int, help="Fréquence de rafraichssement de l'interface")
args = parser.parse_args(sys.argv[1:])
err = False
err_text = "\n"
if args.web_density >= 1 or args.web_density <= 0:
err_text += "La densité de corniche voisine doit être comprise entre 0 et 1, non inclu\n"
err = True
if args.bats_density >= 1 or args.bats_density <= 0:
err_text += "La densité de chauve souris doit être comprise entre 0 et 1, non inclu\n"
err = True
if args.pit_density >= 1 or args.pit_density <= 0:
err_text += "La densité de puit doit être comprise entre 0 et 1, non inclu\n"
err = True
if args.max_duration <= 0:
err_text += "La durée maximum d'une partie doit être strictement supérieure à 0\n"
err = True
if args.threads <= 0:
err_text += "Le nombre de fils d'exécution doit être supérieur à 0\n"
err = True
if args.web_dim <= 3:
err_text += "Un nombre raisonnable de corniche doit être fourni pour le bon fonctionnement de l'algorithme\n"
err = True
if args.number <= 0:
err_text += "Il faut au minimum un test pour pouvoir avoir des données exploitables\n"
err = True
if args.update_frequency <= 0:
err_text += "La fréquence de rafraichissement de l'interface doit être strictement positive"
err = True
if args.update_frequency > 60:
print("Alerte: La fréquence de rafraichissement choisi est très élevée. Cela pourra impacter négativement la vitesse du test")
if args.threads >= multiprocessing.cpu_count():
print("Alerte: Le nombre de fils d'exécution demandé est supérieur au nombre de processeurs disponibles. Cela risque d'impacter les performance totales de votre ordinateur")
bench = bench_core.Bench(
args.threads,
args.seed,
args.number,
args.ia,
args.max_duration,
args.web_dim,
args.web_density,
args.pit_density,
args.bats_density
)
if err:
parser.print_usage()
print(err_text)
quit()
del parser
# Programme principal: Crée les fils d'exécution et fait tourner l'algorithme
fil_exec_interface_utilisateur = threading.Thread(target=fonction_affichage)
heure_depart = time.time()
fil_exec_interface_utilisateur.start()
# Lance les boucles de test
bench.demarre()
fil_exec_interface_utilisateur.join()
bench.arret()
pygame.quit()
if bench.total_compteur != 0:
total_tst = bench.total_compteur
total_vic = bench.compteur[bench_core.PARAMETRE_TOTAL_REUSSITE]
total_ech = total_tst - total_vic
total_lvt = bench.compteur[bench_core.PARAMETRE_ECHEC_LEVIATHAN]
total_pit = bench.compteur[bench_core.PARAMETRE_ECHEC_PUIT]
total_nrj = bench.compteur[bench_core.PARAMETRE_ECHEC_ENERGIE]
total_exc = bench.compteur[bench_core.PARAMETRE_ECHEC_EXEPTION]
total_nrp = bench.compteur[bench_core.PARAMETRE_ECHEC_NON_REPONSE]
graine_lvt = bench.graines[bench_core.PARAMETRE_ECHEC_LEVIATHAN]
graine_pit = bench.graines[bench_core.PARAMETRE_ECHEC_PUIT]
graine_nrj = bench.graines[bench_core.PARAMETRE_ECHEC_ENERGIE]
graine_exc = bench.graines[bench_core.PARAMETRE_ECHEC_EXEPTION]
graine_nrp = bench.graines[bench_core.PARAMETRE_ECHEC_NON_REPONSE]
score = (1000 * (total_tst - 2 * total_nrp - total_exc // 2) - bench.trajet_moyen) * args.web_dim / bench.total_compteur
print(
"Statistiques finales:\n\tNombre total test: %d\n\n"
"Score final: %d\n"
"%d succès (%0.00f%%) avec un trajet moyen de %d\n"
"%d échecs (%0.00f%%) avec comme détails:\n"
"\t%d dues à un léviathan (%0.00f%%)%s\n"
"\t%d dues à un puit (%0.00f%%)%s\n"
"\t%d dues à un manque d'énergie (%0.00f%%)%s\n"
"\t%d dues à une exeption (%0.00f%%)%s\n"
"\t%d dues à un temps de réponse trop élevé (%0.00f%%)%s\n"
"" % (
total_tst,
score,
total_vic, 100 * total_vic / bench.total_compteur, bench.trajet_moyen,
total_ech, 100 * total_ech / bench.total_compteur,
total_lvt, 100 * total_lvt / bench.total_ech, afficher_graine(graine_lvt),
total_pit, 100 * total_pit / bench.total_ech, afficher_graine(graine_pit),
total_nrj, 100 * total_nrj / bench.total_ech, afficher_graine(graine_nrj),
total_exc, 100 * total_exc / bench.total_ech, afficher_graine(graine_exc),
total_nrp, 100 * total_nrp / bench.total_ech, afficher_graine(graine_nrp)
)
)
web_test.py, l'ia à tester. Il doit être adapté afin de fonctionner:
- Code: Select all
from web import *
def ia(corniche, voisines, taille, capteurs, evenements):
if evenements & (2 * m_b):
return None, 0
return voisines[randint(0, len(voisines) - 1)], 0
def explore(seed, dim, web_d, pits_d, bats_d):
# Initialise le module web avec les paramètres du programme
initconst(seed, dim, web_d, pits_d, bats_d)
# Ici seront remis à zero les variables globales
return parcourir_selon(ia)
Last edited by citorva on 29 Nov 2020, 17:30, edited 4 times in total.