[GIS] How to write to a single table via Python multiprocessing

arcpyparallel processingpython

A comment in the post Can multiprocessing with arcpy be run in a script tool? got me thinking, as I often need to do exactly this:

Just beware of deadlocking situations (two Insert cursors in the same
table for instance)

My question is, how can you write to a single table when using multiprocessing?

Here's an example script, which iterates through the sample City layer, and uses multiprocessing to copy each city's values to an output table via an insertCursor (this simplifies the more complicated scenario I have in mind).

# Testing how to write to an output fGDB table via multiple threads
import os, sys, arcpy, multiprocessing
from multiprocessing import Process, Queue

def Worker(input, output):
    for func in iter(input.get, 'STOP'):
        inputs = func
        doProcess(inputs)

def doProcess(inputs):
    outTblName = inputs[0]
    city = inputs[1]
    pop = inputs[2]
    with arcpy.da.InsertCursor(os.path.join(arcpy.env.scratchGDB, outTblName), ["NAME", "POPULATION"]) as iCursor:
        try:
            iCursor.insertRow([city, pop])
        except:
            print("Problem inserting " + city + " : " + str(pop) + " : trying again" )
            doProcess(inputs)

if __name__ == '__main__':
    NUMBER_OF_PROCESSES = 8
    task_queue = Queue()
    done_queue = Queue()

    inFC = "C:\Program Files (x86)\ArcGIS\Desktop10.2\TemplateData\TemplateData.gdb\World\City"
    outTblName = "testTable"

    #Create the empty table
    outTable = os.path.join(arcpy.env.scratchGDB, outTblName)
    if(arcpy.Exists(outTable)):
       arcpy.Delete_management(outTable)
    arcpy.CreateTable_management(arcpy.env.scratchGDB, outTblName)
    arcpy.AddField_management(outTable, "Name", "TEXT")
    arcpy.AddField_management(outTable, "Population", "DOUBLE")

    #Iterate through the cities. Send each one to the multiprocessor
    with arcpy.da.SearchCursor(inFC, ["NAME", "POPULATION"]) as sCursor:
        for city in sCursor:
            cityName = city[0]
            pop = city[1]
            task_queue.put([outTblName, cityName, pop])

    for i in range(NUMBER_OF_PROCESSES):
        Process(target=Worker, args=(task_queue, done_queue)).start()

    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

As expected it runs into problems when multiple threads try to access the output table simultaneously. Even when calling the doProcess function again recursively after errors are detected, the output table contains fewer rows than the input table.

An idea is for each thread to create a new table, and to append them all at the end. Are there any best-practise suggestions?

Best Answer

Never tried multiprocessing, decided to give it a go. This script:

import os, sys, arcpy, multiprocessing
from arcpy import env
env.overwriteoutput=1
scratchGDB=r'd:\rubbish\TEST.gdb'

def function(inputs):
    print ("got arg %s" % inputs)
    outTblName = inputs[0]
    city = inputs[1]
    pop = inputs[2]
    with arcpy.da.InsertCursor(os.path.join(scratchGDB, outTblName), ["NAME", "POPULATION"]) as iCursor:
        try:
            iCursor.insertRow([city, pop])
        except:
            print("Problem inserting " + city + " : " + str(pop) + " : trying again" )

if __name__ == "__main__":
    number_of_cpus = 5
    outTblName = "testTable"
    outTable = os.path.join(scratchGDB, outTblName)
    if(arcpy.Exists(outTable)):
       arcpy.Delete_management(outTable)
    arcpy.CreateTable_management(scratchGDB, outTblName)
    arcpy.AddField_management(outTable, "Name", "TEXT")
    arcpy.AddField_management(outTable, "Population", "DOUBLE")

    bList=[]
    for i in range (number_of_cpus):
        bList.append([outTblName,chr(65+i),i*i])
    pool = multiprocessing.Pool(number_of_cpus)
    for i in pool.map(function, bList):
        print("Writing")
    rows=arcpy.da.TableToNumPyArray(os.path.join(scratchGDB, outTblName),["NAME", "POPULATION"])
    print (rows)

Gave me this output: enter image description here

It works as expected.