微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在Python中将Pygame与并行性结合使用 “将pygame用作从同一进程衍生的不同线程中的多个不同实例的一种方式” “一种与pygame的显示安全并发并更新时钟的方法” “一种使用multiprocessing.Process的方法,它无需腌制方法的类,但仍可以访问类变量” “具有以下功能的多处理库:” 1 不需要腌制lambda函数或者可以 您需要使用序列化Python对象才能在进程之间发送它们 2 有一种方法可以告诉子流程正在使用哪个流程工作者

如何解决在Python中将Pygame与并行性结合使用 “将pygame用作从同一进程衍生的不同线程中的多个不同实例的一种方式” “一种与pygame的显示安全并发并更新时钟的方法” “一种使用multiprocessing.Process的方法,它无需腌制方法的类,但仍可以访问类变量” “具有以下功能的多处理库:” 1 不需要腌制lambda函数或者可以 您需要使用序列化Python对象才能在进程之间发送它们 2 有一种方法可以告诉子流程正在使用哪个流程工作者

我正在尝试训练神经网络来玩使用Pygame制作的SMB1游戏。为了加快速度,我想使用并行处理,以便由不同人群(以及不同的训练数据)一次玩多个游戏副本。

我的问题的根源在于Pygame本质上不是基于实例的。也就是说,它将只生成一个带有一个显示对象的窗口。因为我不能为每个进程创建多个Pygame窗口并显示对象,所以这些进程必须共享一个显示对象。这引出了我的第一个问题:是否有一种方法可以拥有多个pygame实例,如果没有,是否可以同时使用一种(性能轻量)方法绘制到显示器上?也就是每个游戏都绘制到整个窗口的不同部分。

但是,我并不是真的需要渲染所有游戏。我只关心至少渲染了一个游戏实例,以便可以监视其进度。然后,我的解决方案是为每个游戏分配一个进程ID,并且只有进程ID为0的游戏才会真正显示显示器上。并发问题解决了! 为此,我使用了multiprocessing.Process:

processes = [];
genome_batches = np.array_split(genomes,self.runconfig.parallel_processes);
for i in range(runconfig.parallel_processes):
    process = multiprocessing.Process(target=self.eval_genome_batch_Feedforward,args=(genome_batches[i],config,i));
    processes.append(process);
for process in processes:
    process.start();
for process in processes:
    process.join();

但是,这在多处理使对象腌制时引起了自己的问题: AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>' 注意:config和RunnerConfig是两件事。一个来自我正在使用的简洁库,这是传递给函数的配置,另一个是我自己的类,这是进程从其启动的类的属性

经过一些研究,似乎因为我使用的是类方法,所以多处理使该类腌制,该类包括上面的RunnerConfig,其中包含不能腌制的lambda函数。这很难解决,因为这些lambda函数是在 self.eval_genome_batch中专门使用的。这就引出了第二个问题:是否有可能使用multiprocessing.Process而不需要对外部类进行酸洗,从而不会对lambda函数进行酸洗?

最后,经过更多研究,我发现我可以使用pathos.multiprocessing而不是使用腌制的多重处理,而使用莳萝。莳萝可以腌制lambda函数。哇!

但是没有。最后有一个家伙。 Pathos.multiprocessing仅具有来自multiprocessing的.map和.map等效函数,这无法让我自己控制进程。这意味着在调用函数时,无法(afaik)告诉程序正在运行游戏的哪个进程ID,以及是否将其渲染到屏幕上。所以最后一个问题是:有没有一种方法可以使用pathos.multiprocessing.map(或者,实际上是任何库并行函数),即a)不破坏lambda函数,并且b)可以告诉被调用函数正在使用进程ID?

最后一点:我知道最简单的答案就是不渲染给Pygame。这将解决所有问题。但是,能够看到程序的进展和学习对我来说是非常有用和重要的。

因此,我列出了不同的问题,如果解决了其中的任何一个,便可以解决所有问题:

  • 一种将pygame用作在同一进程中衍生的不同线程中的多个不同实例的方法
  • 一种与pygame的显示安全并发工作的方式(并更新时钟)
  • 一种使用multiprocessing.Process的方法,它无需使方法的类腌制,但仍可以访问类变量
  • 一个多处理库,该库:
    • 不需要腌制lambda函数或者可以
    • 有一种方法可以告诉子流程正在使用哪个流程工作者

编辑:这是大多数相关代码。因为类很长,所以只包括相关的方法。如果您愿意,可以在my github

中找到源代码

game_runner_neat.py:开始并行处理的类

import neat
import baseGame
import runnerConfiguration
import os.path
import os
import visualize
import random
import numpy as np
#import concurrent.futures
import multiprocessing
from logReporting import LoggingReporter
from renderer import Renderer as RendererReporter
from videofig import videofig as vidfig
from neat.six_util import iteritems,itervalues

class GameRunner:

    #if using default version,create basic runner and specify game to run
    def __init__(self,game,runnerConfig):
        self.game = game;
        self.runconfig = runnerConfig;

    #skip some code


    #parallel version of eval_genomes_Feedforward
    def eval_genome_batch_Feedforward(self,genomes,processNum):
        for genome_id,genome in genomes:
            genome.fitness += self.eval_genome_Feedforward(genome,processNum=processNum);

    
    def eval_training_data_batch_Feedforward(self,data,processNum,lock):
        for datum in data:
            for genome_id,genome in genomes:
                genome.increment_fitness(lock,self.eval_genome_Feedforward(genome,processNum=processNum,trainingDatum=datum)); #increment_fitness allows multiple threads to change the fitness of the same genome safely

    #evaluate a population with the game as a Feedforward neural net
    def eval_genomes_Feedforward(self,config):
        for genome_id,genome in genomes:
            genome.fitness = 0; #sanity check
        if (self.runconfig.training_data is None):
            if (self.runconfig.parallel):
                processes = [];
                genome_batches = np.array_split(genomes,self.runconfig.parallel_processes);
                for i in range(runconfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_genome_batch_Feedforward,i));
                    processes.append(process);
                for process in processes:
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for genome_id,genome in genomes:
                    genome.fitness += self.eval_genome_Feedforward(genome,config)
        else:
            if (self.runconfig.parallel):
                processes = [];
                data_batches = np.array_split(self.runconfig.training_data,self.runconfig.parallel_processes);
                lock = multiprocessing.Lock();
                for i in range(self.runconfig.parallel_processes):
                    process = multiprocessing.Process(target=self.eval_training_data_batch_Feedforward,args=(genomes,data_batches[i],i,lock));
                    processes.append(process);
                    process.start();
                for process in processes:
                    process.join();
                return;
            else:
                for datum in self.runconfig.training_data:
                    for genome_id,genome in genomes:
                        genome.fitness += self.eval_genome_Feedforward(genome,trainingDatum=datum)

runnerConfiguration.py(带有lambda函数的类,已通过 init 传递给GameRunner):

class RunnerConfig:

    def __init__(self,gamefitnessFunction,gameRunningFunction,logging=False,logPath='',recurrent=False,trial_fitness_aggregation='average',custom_fitness_aggregation=None,time_step=0.05,num_trials=10,parallel=False,returnData=[],gameName='game',num_generations=300,fitness_collection_type=None):

        self.logging = logging;
        self.logPath = logPath;
        self.generations = num_generations;
        self.recurrent = recurrent;
        self.gameName = gameName;
        self.parallel = parallel;
        self.time_step = time_step;
        self.numTrials = num_trials;
        self.fitnessFromGameData = gamefitnessFunction;
        self.gameStillRunning = gameRunningFunction;
        self.fitness_collection_type = fitness_collection_type;

        self.returnData = returnData;
##        for (datum in returnData):
##            if (isinstance(datum,IOData)):
##                [returnData.append(x) for x in datum.getSplitData()];
##            else:
##                returnData.append(datum);
##        
        if (trial_fitness_aggregation == 'custom'):
            self.fitnessFromArray = custom_fitness_aggregation;

        if (trial_fitness_aggregation == 'average'):
            self.fitnessFromArray = lambda fitnesses : sum(fitnesses)/len(fitnesses);

        if (trial_fitness_aggregation == 'max'):
            self.fitnessFromArray = lambda fitnesses : max(fitnesses);

        if (trial_fitness_aggregation == 'min'):
            self.fitnessFromArray = lambda fitnesses : min(fitnesses);

gamefitnessFunction和gameRunningFunction是用于自定义训练行为的函数

当程序尝试使用RunnerConfig.parallel = True运行eval_genomes_Feedforward时,我收到以下完整错误消息:

Traceback (most recent call last):
  File "c:/Users/harrison_truscott/Documents/GitHub/AI_game_router/Neat/smb1Py_runner.py",line 94,in <module>
    winner = runner.run(config,'run_' + str(currentRun));
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 75,in run
    winner = pop.run(self.eval_genomes,self.runconfig.generations);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\neat\population.py",line 102,in run
    fitness_function(list(iteritems(self.population)),self.config)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 204,in eval_genomes
    self.eval_genomes_Feedforward(genomes,config);
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\game_runner_neat.py",line 276,in eval_genomes_Feedforward
    process.start();
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\process.py",line 121,in start
    self._popen = self._Popen(self)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\context.py",line 224,in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\context.py",line 326,in _Popen
    return Popen(process_obj)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\popen_spawn_win32.py",line 93,in __init__
    reduction.dump(process_obj,to_child)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\reduction.py",line 60,in dump
    ForkingPickler(file,protocol).dump(obj)
AttributeError: Can't pickle local object 'RunnerConfig.__init__.<locals>.<lambda>'

当第一个进程中断时,当下一个进程由于第一个进程的不完整启动而中断时,我又收到第二条错误消息:

Traceback (most recent call last):
  File "<string>",line 1,in <module>
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 116,in spawn_main
    exitcode = _main(fd,parent_sentinel)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 125,in _main
    prepare(preparation_data)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 236,in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 287,in _fixup_main_from_path
    main_content = runpy.run_path(main_path,File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\runpy.py",line 263,in run_path
    return _run_module_code(code,init_globals,run_name,line 96,in _run_module_code
    _run_code(code,mod_globals,line 86,in _run_code
    exec(code,run_globals)
  File "c:\Users\harrison_truscott\Documents\GitHub\AI_game_router\Neat\smb1Py_runner.py",line 45,in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 154,in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\harrison_truscott\AppData\Local\Programs\Python\python38\lib\multiprocessing\spawn.py",line 134,in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

顺便说一句,multiprocessing.freeze_support()是我在正在运行的主文件调用的第一个函数

解决方法

我将尝试解决主要问题。我对您的实际问题的了解非常有限,因为我不知道您的代码实际上是做什么的。

“将pygame用作从同一进程衍生的不同线程中的多个不同实例的一种方式”

这不起作用,因为pygame建立在SDL2上,该状态指出“您不应期望能够在主线程以外的任何线程上创建窗口,渲染或接收事件。 “

“一种与pygame的显示安全并发(并更新时钟)的方法”

与上述相同,显示仅在主线程中工作。

“一种使用multiprocessing.Process的方法,它无需腌制方法的类,但仍可以访问类变量”

您可以使用dill之类的方法来腌制这些方法,但是(在我看来)在进程之间完全复制python对象是错误的。我会寻求另一种解决方案。

“具有以下功能的多处理库:”

1。 不需要腌制lambda函数或者可以

您需要使用序列化Python对象才能在进程之间发送它们。

2。 有一种方法可以告诉子流程正在使用哪个流程工作者

我不明白这是什么意思。


在我看来,可以通过更好地分离数据和可视化来解决问题。培训应该不了解任何可视化,因为它不依赖于您如何显示。因此,没有任何理由分享pygame的显示内容。

完成此操作后,执行您要尝试的操作就不会有太多问题(多线程总是会导致问题)。关于泡菜问题;我将尝试避免对Python对象和函数进行腌制,而只是在线程和进程之间传递基本原语。看来您应该能够给self.fitnessFromArray分配一个简单的int,然后根据其值在线程/进程中进行min / avg / max计算。

如果要进行穿线,则主线程将负责渲染。它还会生成用于训练的线程。线程完成后,它们将返回结果(或将其放入线程安全存储中),主线程将轮询数据并显示结果。如果训练完成的工作花费的时间超过一帧,则将其划分为各个线程仅进行部分训练的部分,并且可以从下一帧继续进行。

如果您想要单独的进程,则主体是相同的。主要过程启动了几个训练过程,并通过套接字连接到它们。您可以从套接字中轮询有关程序状态的信息并显示它。基本上,它将是一种客户端-服务器体系结构(尽管在本地主机上),其中培训脚本是服务器,而主要过程是客户端。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。