本文介绍了使用Python进行大规模并行DB更新(PostGIS/PostgreSQL)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要更新空间数据库中的每条记录,在该数据库中,我的点数据集覆盖了多边形数据集.对于每个点要素,我想分配一个关键点以使其与其所在的多边形要素相关联.因此,如果我的点纽约市"位于美国多边形内,并且对于美国多边形"GID = 1",我将为我的点纽约市分配"gid_fkey = 1".

I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.

为此,我创建了以下查询.

To do this I have created the following query.

procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

目前,我正在从另一个查询中获取cityID信息,该查询只选择gid_fkey为NULL的所有cityID.本质上,我只需要遍历这些并运行前面显示的查询.理论上,由于查询仅依赖于另一个表中的静态信息,因此所有这些过程都可以立即运行.我已经实现了下面的线程处理过程,但似乎无法迁移到多处理程序

At present I am getting the cityID info from another query that just selects all cityID where gid_fkey is NULL. Essentially I just need to loop through these and run the query shown earlier. As the query only relies on static information in the other table in theory all of these processes can be run at once. I have implemented the threading procedure below but I can't seem to make the migration to multiprocessing

import psycopg2, pprint, threading, time, Queue

queue = Queue.Queue()
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()

getGID = 'SELECT cityID FROM city'
pyCursor1.execute(getGID)
gidList = pyCursor1.fetchall()

class threadClass(threading.Thread):

def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

def run(self):

        while True:
            gid = self.queue.get()

            procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

            pyCursor2 = pyConn.cursor()
            pyCursor2.execute(procQuery)

            print gid[0]
            print 'Done'

def main():

    for i in range(4):
        t = threadClass(queue)
        t.setDaemon(True)
        t.start()

        for gid in gidList:
            queue.put(gid)

    queue.join()

main()

我什至不确定多线程是否是最佳选择,但绝对比单步执行更快.

I'm not even sure if the multithreading is optimal but it is definitely faster than going through one by one.

我将使用的计算机具有四个内核(四核)和一个最小的Linux操作系统,如果没有区别,则没有GUI,PostgreSQL,PostGIS和Python.

The machine I will be using has four cores (Quad Core) and a minimal Linux OS with no GUI, PostgreSQL, PostGIS and Python if that makes a difference.

要启用此非常简单的多处理任务,我需要更改什么?

What do I need to change to get this painfully easy multiprocessing task enabled?

推荐答案

好的,这是我自己帖子的答案.干得好= D

Okay this is an answer to my own post. Well done me =D

从单核线程到四核多处理,我的系统速度提高了约150%.

Produces about a 150% increase in speed on my system going from a single core thread to quad core multiprocessing.

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue

def run(self):
    proc_name = self.name
    while True:
        next_task = self.task_queue.get()
        if next_task is None:
            print 'Tasks Complete'
            self.task_queue.task_done()
            break
        answer = next_task()
        self.task_queue.task_done()
        self.result_queue.put(answer)
    return


class Task(object):
def __init__(self, a):
    self.a = a

def __call__(self):
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConn.set_isolation_level(0)
    pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

    pyCursor1.execute(procQuery)
    print 'What is self?'
    print self.a

    return self.a

def __str__(self):
    return 'ARC'
def run(self):
    print 'IN'

if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
    w.start()

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()

cityIdList = []

for x in cityIdListTuple:
    cityIdList.append(x[0])


for i in xrange(num_jobs):
    tasks.put(Task(cityIdList[i - 1]))

for i in xrange(num_consumers):
    tasks.put(None)

while num_jobs:
    result = results.get()
    print result
    num_jobs -= 1

现在我还有一个问题已发布在这里:

Now I have another question which I have posted here:

创建数据库连接并维护多个进程(多处理)

希望我们可以摆脱一些负担,并进一步加快这个婴儿的成长速度.

Hopefully we can get rid of some overhead and speed this baby up even more.

这篇关于使用Python进行大规模并行DB更新(PostGIS/PostgreSQL)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 15:28