[GIS] How to write output from subprocess to disk (using multiprocessing)

arcpyparallel processingpythonsubprocess

This code was developed from another question I posted about how to kill an arcpy tool if it takes longer than a specified time. See How to cancel tool execution in python. Using the multiprocssing module as a subprocess of the function seems to work, the only problem is i can't see how to write the output to disk. The function specifies a path to write the data to, and when I run the function without the subprocess, I can see the files on disk. But when the subprocess is included it doesn't look the files are being written out to disk. Can anyone help?

import os, arcpy, arcgisscripting, time, sys
gp = arcgisscripting.create()

def ConvertCADtoGDB(in_dgn,out_gdb):
    gp.ImportCAD_conversion(in_dgn,out_gdb,'','Explode_Complex')
    if not arcpy.Exists(out_gdb):raise RuntimeError('%s does not exist'%out_gdb)

def main(srch_dir,gdb_dir,timeout):
    for dirpath, dirnames, files in os.walk(srch_dir):
        for f in files:
            f_name, f_ext = os.path.splitext(f)
            in_dgn = os.path.join(dirpath,f)
            out_gdb = os.path.join(gdb_dir,f_name+'.gdb')
            if f_ext == '.dgn':
                if not os.path.isdir(out_gdb):
                    print 'creating',out_gdb
                    #####################################################
                    # Create a separate process to run the tool in
                    p = multiprocessing.Process(target=ConvertCADtoGDB,
                                            args=(in_dgn,out_gdb))
                    p.start()
                    p.join(timeout)   # Wait for process to complete
                    if p.is_alive():  # Terminate process if it is 
                        p.terminate() # still running after the timeout
                        print('Conversion of %s terminated'%in_dgn)
                    else:
                        print('Conversion of %s successful'%in_dgn)
                    #####################################################

if __name__ == '__main__':
    srch_dir=r'C:\Temp\dgns'
    gdb_dir=r'C:\Temp\gdbs'
    timeout=5 #Seconds
    main(srch_dir,gdb_dir,timeout)

UPDATE:

I tested Luke's code, and the message I receive tellS me the process was successful, but still I get nothing written out to disk:

import os, arcpy, arcgisscripting, time, sys
from multiprocessing import Process
from multiprocessing.queues import SimpleQueue
copy_dir = r'D:\test3'
gdb_dir = copy_dir+'\\'+'CAD_Export_to_GDB'

def ConvertCADtoGDB(msgs,in_dgn,out_gdb):
    try:
        gp = arcgisscripting.create()
        gp.ImportCAD_conversion(in_dgn,out_gdb,'','Explode_Complex')
        if not arcpy.Exists(out_gdb):raise RuntimeError('%s does not exist'%out_gdb)
        msgs.put(gp.GetMessages())
    except Exception as err:
        msgs.put(gp.GetMessages())
        msgs.put(err)

def main(srch_dir,gdb_dir,timeout):
    for dirpath, dirnames, files in os.walk(srch_dir):
        for f in files:
            f_name, f_ext = os.path.splitext(f)
            in_dgn = os.path.join(dirpath,f)
            out_gdb = os.path.join(gdb_dir,f_name+'.gdb')
            if f_ext == '.dgn':
                if not os.path.isdir(out_gdb):
                    print 'creating',out_gdb
                    #####################################################
                    # Create a separate process to run the tool in
                    # and a Queue to pass messages back
                    m = SimpleQueue()
                    p = Process(target=ConvertCADtoGDB,
                                   args=(m,in_dgn,out_gdb))
                    p.start()
                    p.join(timeout)   # Wait for process to complete
                    err = None
                    while not m.empty(): # Check if messages in queue
                        msg = m.get()
                        if isinstance(msg, Exception):err= msg
                        else: print msg
                    if p.is_alive():  # Terminate process if it is
                        p.terminate() # still running after the timeout
                        print('terminated')
                    elif err:
                        print('unsuccessful: %s'%err)
                    else:
                        print('successful')
                    #####################################################

if __name__ == '__main__':
    srch_dir=r'C:\Temp\dgns'
    gdb_dir=r'C:\Temp\gdbs'
    timeout=60 #Seconds
    main(copy_dir,gdb_dir,timeout)

Best Answer

You're probably getting some sort of exception being raised. Perhaps use a Queue to pass messages back to the parent process.

Tested working code:

import os, arcpy, arcgisscripting, time, sys
from multiprocessing import Process
from multiprocessing.queues import SimpleQueue

def ConvertCADtoGDB(msgs,in_dgn,out_gdb):
    try:
        gp = arcgisscripting.create()
        gp.ImportCAD_conversion(in_dgn,out_gdb,'','Explode_Complex')
        if not arcpy.Exists(out_gdb):raise RuntimeError('%s does not exist'%out_gdb)
        msgs.put(gp.GetMessages())
    except Exception as err:
        msgs.put(gp.GetMessages())
        msgs.put(err)

def main(srch_dir,gdb_dir,timeout):
    for dirpath, dirnames, files in os.walk(srch_dir):
        for f in files:
            f_name, f_ext = os.path.splitext(f)
            in_dgn = os.path.join(dirpath,f)
            out_gdb = os.path.join(gdb_dir,f_name+'.gdb')
            if f_ext == '.dgn':
                if not os.path.isdir(out_gdb):
                    print 'creating',out_gdb
                    #####################################################
                    # Create a separate process to run the tool in
                    # and a Queue to pass messages back
                    m = SimpleQueue()
                    p = Process(target=ConvertCADtoGDB,
                                       args=(m,in_dgn,out_gdb))
                    p.start()
                    p.join(timeout)   # Wait for process to complete
                    err = None
                    while not m.empty(): # Check if messages in queue
                        msg = m.get()
                        if isinstance(msg, Exception):err= msg
                        else: print msg
                    if p.is_alive():  # Terminate process if it is
                        p.terminate() # still running after the timeout
                        print('terminated')
                    elif err:
                        print('unsuccessful: %s'%err)
                    else:
                        print('successful')
                    #####################################################

if __name__ == '__main__':
    srch_dir=r'C:\Temp\dgns'
    gdb_dir=r'C:\Temp\gdbs'
    timeout=60 #Seconds
    main(srch_dir,gdb_dir,timeout)

The output is:

creating C:\Temp\gdbs\smalltest.gdb
Executing: ImportCAD C:\Temp\dgns\smalltest.dgn C:\Temp\gdbs\smalltest.gdb # Explode_Complex
Start Time: Thu Jul 24 09:37:42 2014
...Importing layers from C:\Temp\dgns\smalltest.dgn
...Importing entities from C:\Temp\dgns\smalltest.dgn
......4 entities imported from C:\Temp\dgns\smalltest.dgn
...Importing and consolidating extended data to separate table
Succeeded at Thu Jul 24 09:37:46 2014 (Elapsed Time: 4.00 seconds)
successful
Related Question