微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python多处理和共享变量

我不是python的专家,但我已经设法写下了一个多处理代码,它在我的PC中使用了我所有的cpus和内核.我的代码加载了一个非常大的数组,大约1.6 GB,我需要在每个进程中更新数组.幸运的是,更新包括在图像中添加一些人造恒星,每个过程都有一组不同的图像位置,可以添加人造恒星.

图像太大,每次调用一个进程时我都无法创建一个新图像.我的解决方案是在共享内存中创建一个变量,我节省了大量内存.由于某种原因,它适用于90%的图像,但有些区域是我的代码在我之前发送到流程的某些位置添加随机数.它与我创建共享变量的方式有关吗?在我的代码执行过程中,这些进程是否相互干扰?

奇怪的是,当使用单个cpu和单核时,图像是100%完美的,并且图像中没有添加随机数.你建议我在多个进程之间共享一个大型数组吗?这是我的代码的相关部分.请在定义变量im_data时读取行.

import warnings
warnings.filterwarnings("ignore")

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing,Queue
import ctypes

class Worker(multiprocessing.Process):


def __init__(self,work_queue,result_queue):

    # base class initialization
    multiprocessing.Process.__init__(self)

    # job management stuff
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.kill_received = False

def run(self):
    while not self.kill_received:

        # get a task
        try:
            i_range,psf_file = self.work_queue.get_Nowait()
        except Queue.Empty:
            break

        # the actual processing
        print "Adding artificial stars - index range=",i_range

        radius=16
        x_c,y_c=( (psf_size[1]-1)/2,(psf_size[2]-1)/2 )
        x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
        distance = np.sqrt(x**2 + y**2)

        for i in range(i_range[0],i_range[1]):
            psf_xy=np.zeros(psf_size[1:3],dtype=float)
            j=0
            for i_order in range(psf_order+1):
                j_order=0
                while (i_order+j_order < psf_order+1):
                    psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
                    j_order+=1
                    j+=1


            psf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(psf_xy)
            psf_xy *= psf_factor

            npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_lanczos4)
            npsf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
            npsf_xy *= npsf_factor

            im_rangex=[max(mock_x[i]-npsf_size[1]/2,0),min(mock_x[i]-npsf_size[1]/2+npsf_size[1],im_size[1])]
            im_rangey=[max(mock_y[i]-npsf_size[0]/2,min(mock_y[i]-npsf_size[0]/2+npsf_size[0],im_size[0])]
            npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
            npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]

            im_data[im_rangey[0]:im_rangey[1],im_rangex[0]:im_rangex[1]] = 10.


        self.result_queue.put(id)

if __name__ == "__main__":

  n_cpu=2
  n_core=6
  n_processes=n_cpu*n_core*1
  input_mock_file=sys.argv[1]

  print "Reading file ",im_file[i]
  hdu=pyfits.open(im_file[i])
  data=hdu[0].data
  im_size=data.shape

  im_data_base = multiprocessing.Array(ctypes.c_float,im_size[0]*im_size[1])
  im_data = np.ctypeslib.as_array(im_data_base.get_obj())
  im_data = im_data.reshape(im_size[0],im_size[1])
  im_data[:] = data
  data=0
  assert im_data.base.base is im_data_base.get_obj()

  # run
  # load up work queue
  tic=time.time()
  j_step=np.int(np.ceil( mock_n*1./n_processes ))
  j_range=range(0,mock_n,j_step)
  j_range.append(mock_n)


  work_queue = multiprocessing.Queue()
  for j in range(np.size(j_range)-1):
    if work_queue.full():
      print "Oh no! Queue is full after only %d iterations" % j
    work_queue.put( (j_range[j:j+2],psf_file[i]) )

  # create a queue to pass to workers to store the results
  result_queue = multiprocessing.Queue()

  # spawn workers
  for j in range(n_processes):
    worker = Worker(work_queue,result_queue)
    worker.start()

  # collect the results off the queue
  while not work_queue.empty():
    result_queue.get()

  print "Writing file ",mock_im_file[i]
  hdu[0].data=im_data
  hdu.writeto(mock_im_file[i])
  print "%f s for parallel computation." % (time.time() - tic)
最佳答案
我认为问题(正如你在你的问题中建议的那样)来自于你从多个线程编写相同数组的事实.

im_data_base = multiprocessing.Array(ctypes.c_float,im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0],im_size[1])
im_data[:] = data

虽然我很确定你可以用“进程安全”方式写入im_data_base(python使用隐式锁来同步对数组的访问),但我不确定你是否可以以过程安全的方式写入im_data .

因此我会(尽管我不确定我会解决你的问题)建议你创建一个围绕im_data的显式锁

# disable python implicit lock,we are going to use our own
im_data_base = multiprocessing.Array(ctypes.c_float,im_size[0]*im_size[1],lock=False)
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0],im_size[1])
im_data[:] = data
# Create our own lock
im_data_lock = Lock()

然后在进程中,每次需要修改im_data时获取

self.im_data_lock.acquire()
im_data[im_rangey[0]:im_rangey[1],im_rangex[0]:im_rangex[1]] = 10
self.im_data_lock.release()

为了简洁起见,我省略了将锁传递给进程的构造函数并将其存储为成员字段(self.im_data_lock)的代码.您还应该将im_data数组传递给进程的构造函数,并将其存储为成员字段.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


我最近重新拾起了计算机视觉,借助Python的opencv还有face_recognition库写了个简单的图像识别demo,额外定制了一些内容,原本想打包成exe然后发给朋友,不过在这当中遇到了许多小问题,都解决了,记录一下踩过的坑。 1、Pyinstaller打包过程当中出现warning,跟d
说到Pooling,相信学习过CNN的朋友们都不会感到陌生。Pooling在中文当中的意思是“池化”,在神经网络当中非常常见,通常用的比较多的一种是Max Pooling,具体操作如下图: 结合图像理解,相信你也会大概明白其中的本意。不过Pooling并不是只可以选取2x2的窗口大小,即便是3x3,
记得大一学Python的时候,有一个题目是判断一个数是否是复数。当时觉得比较复杂不好写,就琢磨了一个偷懒的好办法,用异常处理的手段便可以大大程度帮助你简短代码(偷懒)。以下是判断整数和复数的两段小代码: 相信看到这里,你也有所顿悟,能拓展出更多有意思的方法~
文章目录 3 直方图Histogramplot1. 基本直方图的绘制 Basic histogram2. 数据分布与密度信息显示 Control rug and density on seaborn histogram3. 带箱形图的直方图 Histogram with a boxplot on t
文章目录 5 小提琴图Violinplot1. 基础小提琴图绘制 Basic violinplot2. 小提琴图样式自定义 Custom seaborn violinplot3. 小提琴图颜色自定义 Control color of seaborn violinplot4. 分组小提琴图 Group
文章目录 4 核密度图Densityplot1. 基础核密度图绘制 Basic density plot2. 核密度图的区间控制 Control bandwidth of density plot3. 多个变量的核密度图绘制 Density plot of several variables4. 边
首先 import tensorflow as tf tf.argmax(tenso,n)函数会返回tensor中参数指定的维度中的最大值的索引或者向量。当tensor为矩阵返回向量,tensor为向量返回索引号。其中n表示具体参数的维度。 以实际例子为说明: import tensorflow a
seaborn学习笔记章节 seaborn是一个基于matplotlib的Python数据可视化库。seaborn是matplotlib的高级封装,可以绘制有吸引力且信息丰富的统计图形。相对于matplotlib,seaborn语法更简洁,两者关系类似于numpy和pandas之间的关系,seabo
Python ConfigParser教程显示了如何使用ConfigParser在Python中使用配置文件。 文章目录 1 介绍1.1 Python ConfigParser读取文件1.2 Python ConfigParser中的节1.3 Python ConfigParser从字符串中读取数据
1. 处理Excel 电子表格笔记(第12章)(代码下载) 本文主要介绍openpyxl 的2.5.12版处理excel电子表格,原书是2.1.4 版,OpenPyXL 团队会经常发布新版本。不过不用担心,新版本应该在相当长的时间内向后兼容。如果你有新版本,想看看它提供了什么新功能,可以查看Open