PyQGIS – Inputting Raster and Processing Zonal Statistics with Custom Script

pyqgisqgis-processingrasterzonal statistics

I'm writing a custom processing script, based on the template, using QGIS 3.22 running on Windows 11. The script runs well as console script, but I want to make it into a custom processing script. What I have attached runs as a processing script without error messages but doesn't complete the zonalstatistics calculations. Input a raster with one band and vector with building centroids, which is buffered. The output of native:zonalstatisticsfb is 'NULL' for all values, and when I use native:zonalhistogram there are no fields added. The raster is a geotif and the vector is a shapefile, I resaved and reprojected them, and they cover the same area (I couldn't work out how to attach data, I can dropbox for anybody interested).

I think the problem is how I set the input for the raster layer. I have been trying ideas for days.

Could someone point to an example of how either zonalstatistics or zonalhistogram is used in a custom processing script please, and how to correctly use a raster as input.

Thanks

The code

import os
import ogr

from PyQt5.QtCore import QCoreApplication, QVariant
from qgis.core import (QgsProcessing, QgsFeatureRequest,
                       QgsFeatureSink, QgsProcessingParameterString,
                       QgsProcessingException,
                       QgsProcessingAlgorithm,
                       QgsProcessingParameterFeatureSource,
                       QgsProcessingParameterFeatureSink,
                       QgsProcessingParameterNumber,
                       QgsProcessingParameterRasterLayer,
                       QgsProcessingParameterVectorLayer,
                       QgsProcessingParameterRasterDestination,
                       QgsRasterLayer, QgsProcessingContext,
                       QgsFields, QgsField, QgsFeature,
                       QgsVectorLayer, QgsProject, QgsProperty)
import processing


class ExampleProcessingAlgorithm(QgsProcessingAlgorithm):
    """
    This is an example algorithm that takes a vector layer and
    creates a new identical one.

    It is meant to be used as an example of how to create your own
    algorithms and explain methods and variables used to do it. An
    algorithm like this will be available in all elements, and there
    is not need for additional work.

    All Processing algorithms should extend the QgsProcessingAlgorithm
    class.
    """

    # Constants used to refer to parameters and outputs. They will be
    # used when calling the algorithm from another algorithm, or when
    # calling from the QGIS console.

    CENTROIDS = 'CENTROIDS'
    INPUT_RASTER_LAND_COVER = 'INPUT_RASTER_LAND_COVER'
    VEG_BUFFER_DIST = 'VEG_BUFFER_DIST'
    VUN_OUTPUT = 'VUN_OUTPUT'

    def tr(self, string):
        """
        Returns a translatable string with the self.tr() function.
        """
        return QCoreApplication.translate('Processing', string)

    def createInstance(self):
        return ExampleProcessingAlgorithm()

    def name(self):
        """
        Returns the algorithm name, used for identifying the algorithm. This
        string should be fixed for the algorithm, and must not be localised.
        The name should be unique within each provider. Names should contain
        lowercase alphanumeric characters only and no spaces or other
        formatting characters.
        """
        return 'myscript'

    def displayName(self):
        """
        Returns the translated algorithm name, which should be used for any
        user-visible display of the algorithm name.
        """
        return self.tr('My Script')

    def group(self):
        """
        Returns the name of the group this algorithm belongs to. This string
        should be localised.
        """
        return self.tr('Example scripts')

    def groupId(self):
        """
        Returns the unique ID of the group this algorithm belongs to. This
        string should be fixed for the algorithm, and must not be localised.
        The group id should be unique within each provider. Group id should
        contain lowercase alphanumeric characters only and no spaces or other
        formatting characters.
        """
        return 'examplescripts'

    def shortHelpString(self):
        """
        Returns a localised short helper string for the algorithm. This string
        should provide a basic description about what the algorithm does and the
        parameters and outputs associated with it..
        """
        return self.tr("Example algorithm short description")

    def initAlgorithm(self, config=None):
        """
        Here we define the inputs and output of the algorithm, along
        with some other properties.
        """

        # We add the input vector features source. It can have any kind of
        # geometry.
        self.addParameter(
            QgsProcessingParameterRasterLayer(
                self.INPUT_RASTER_LAND_COVER,
                self.tr("Input a raster with land cover as distinct bands"),
                [QgsProcessing.TypeRaster]
            )
        )
        
        self.addParameter(
            QgsProcessingParameterFeatureSource(
                self.CENTROIDS,
                self.tr('Building centroids'),
                [QgsProcessing.TypeVectorAnyGeometry]
            )
        )
        
        self.addParameter(
            QgsProcessingParameterNumber(
                self.VEG_BUFFER_DIST,
                self.tr('Buffer distance around each property to assess vegetation cover (meters)')
            )
        )
        
        # We add a feature sink in which to store our processed features (this
        # usually takes the form of a newly created vector layer when the
        # algorithm is run in QGIS).
        
        self.addParameter(
            QgsProcessingParameterFeatureSink(
                self.VUN_OUTPUT,
                self.tr('Vunrability Output')
            )
        )

    def processAlgorithm(self, parameters, context, feedback):
        """
        Here is where the processing itself takes place.
        """
        #each source layer is named, it is a two step process
        landcover = self.parameterAsRasterLayer(
            parameters,
            self.INPUT_RASTER_LAND_COVER,
            context
        )
        #if the input is invalid an error is raised
        if landcover is None:
            raise QgsProcessingException(self.invalidSourceError(parameters, self.INPUT_RASTER_LAND_COVER))
        
        #same as above, but with a vector not raster
        centroids = self.parameterAsVectorLayer(
            parameters,
            self.CENTROIDS,
            context
        )
        
        if centroids is None:
            raise QgsProcessingException(self.invalidSourceError(parameters, self.BUILDING_CENTROIDS))
        ######
        
        buffer_distance = self.addParameter(
            QgsProcessingParameterNumber(
                self.VEG_BUFFER_DIST,
                self.tr('Buffer distance around each property to assess vegetation cover (meters)')
            )
        )

        buffer_result = processing.run("native:buffer", {
            'INPUT': centroids,
            'DISTANCE': buffer_distance,
            'SEGMENTS': 5,
            'END_CAP_STYLE': 0,
            'JOIN_STYLE': 0,
            'MITER_LIMIT': 2,
            'DISSOLVE': False,
            'OUTPUT': 'memory:'
        }, context=context, feedback=feedback)['OUTPUT']
        
        zonal_statistics_result = processing.run("native:zonalstatisticsfb", {
        'INPUT': buffer_result,
        'INPUT_RASTER': landcover,
        'RASTER_BAND':1,
        'COLUMN_PREFIX':'_',
        'STATISTICS':[0,1,2],
        'OUTPUT':'memory:'
        },
        context=context, feedback=feedback)['OUTPUT']
        
        join_1 = processing.run("native:joinattributestable", {
        'INPUT':buffer_result,
        'FIELD':'OBJECTID',
        'INPUT_2': zonal_statistics_result,
        'FIELD_2':'OBJECTID',
        'FIELDS_TO_COPY':[],
        'METHOD':1,
        'DISCARD_NONMATCHING':False,
        'PREFIX':'',
        'OUTPUT':'memory:'
        }, context=context, feedback=feedback)['OUTPUT']
        
        #drop geometry of the multipolygons, so fields can be attached to point layer
        dropped_geoms = processing.run("native:dropgeometries", {
        'INPUT': join_1,
        'OUTPUT':'memory:'
        },
        context=context, feedback=feedback)['OUTPUT']
        #attach fields without geometry to point layer
        join_2 = processing.run("native:joinattributestable", {
        'INPUT':centroids,
        'FIELD':'OBJECTID',
        'INPUT_2': dropped_geoms,
        'FIELD_2':'OBJECTID',
        'FIELDS_TO_COPY':['_count'],
        'METHOD':1,
        'DISCARD_NONMATCHING':False,
        'PREFIX':'',
        'OUTPUT':'memory:'
        }, context=context, feedback=feedback)['OUTPUT']
        
        join_2.commitChanges()
            
        (sink, dest_id) = self.parameterAsSink(parameters, self.VUN_OUTPUT, context,
                                               join_2.fields(), centroids.wkbType(), centroids.sourceCrs() )
        if sink is None:
            raise QgsProcessingException(self.invalidSinkError(parameters, self.VUN_OUTPUT))
            
        total = 100.0 / join_2.featureCount() if join_2.featureCount() else 0
        features = join_2.getFeatures()

        for current, feature in enumerate(features):
            # Stop the algorithm if cancel button has been clicked
            if feedback.isCanceled():
                break

            feedback.setProgress(int(current * total))

            sink.addFeature(feature)

        results = {}
        results[self.VUN_OUTPUT] = dest_id
        return results

'''

Best Answer

In addition to not setting is_child_algorithm=True, you are not retrieving your buffer distance parameter correctly. I have also added a few other improvements and simplifications- you do not need to manually add output features to feature sink, you can just use a QgsProcessingParameterVectorDestination and set it as the 'OUTPUT' for your final algorithm. You can also use QgsProcessingMultiStepFeedback so you get a nicely updating progress bar for the overall algorithm.

Try this modified script:

from PyQt5.QtCore import QCoreApplication, QVariant
from qgis.core import (QgsProcessing,
                       QgsProcessingException,
                       QgsProcessingAlgorithm,
                       QgsProcessingParameterFeatureSource,
                       QgsProcessingParameterVectorDestination,
                       QgsProcessingParameterNumber,
                       QgsProcessingParameterRasterLayer,
                       QgsProcessingMultiStepFeedback)
import processing


class ExampleProcessingAlgorithm(QgsProcessingAlgorithm):

    CENTROIDS = 'CENTROIDS'
    INPUT_RASTER_LAND_COVER = 'INPUT_RASTER_LAND_COVER'
    VEG_BUFFER_DIST = 'VEG_BUFFER_DIST'
    VUN_OUTPUT = 'VUN_OUTPUT'

    def tr(self, string):
        return QCoreApplication.translate('Processing', string)

    def createInstance(self):
        return ExampleProcessingAlgorithm()

    def name(self):
        return 'myscript'

    def displayName(self):
        return self.tr('My Script')

    def group(self):
        return self.tr('Example scripts')

    def groupId(self):
        return 'examplescripts'

    def shortHelpString(self):
        return self.tr("Example algorithm short description")

    def initAlgorithm(self, config=None):
        self.addParameter(
            QgsProcessingParameterRasterLayer(
                self.INPUT_RASTER_LAND_COVER,
                self.tr("Input a raster with land cover as distinct bands"),
                [QgsProcessing.TypeRaster]
            )
        )
        
        self.addParameter(
            QgsProcessingParameterFeatureSource(
                self.CENTROIDS,
                self.tr('Building centroids'),
                [QgsProcessing.TypeVectorPoint]
            )
        )
        
        self.addParameter(
            QgsProcessingParameterNumber(
                self.VEG_BUFFER_DIST,
                self.tr('Buffer distance around each property to assess vegetation cover (meters)')
            )
        )
        
        self.addParameter(
            QgsProcessingParameterVectorDestination(
                self.VUN_OUTPUT,
                self.tr('Vulnerability Output')
            )
        )

    def processAlgorithm(self, parameters, context, model_feedback):
        #each source layer is named, it is a two step process
        landcover = self.parameterAsRasterLayer(
            parameters,
            self.INPUT_RASTER_LAND_COVER,
            context
        )
        #if the input is invalid an error is raised
        if landcover is None:
            raise QgsProcessingException(self.invalidSourceError(parameters, self.INPUT_RASTER_LAND_COVER))
        
        #same as above, but with a vector not raster
        centroids = self.parameterAsVectorLayer(
            parameters,
            self.CENTROIDS,
            context
        )
        
        if centroids is None:
            raise QgsProcessingException(self.invalidSourceError(parameters, self.BUILDING_CENTROIDS))

        buffer_distance = self.parameterAsInt(parameters,
                                            self.VEG_BUFFER_DIST,
                                            context)
                                            
        steps = 5
        feedback = QgsProcessingMultiStepFeedback(steps, model_feedback)
        step = 1

        results = {}
        outputs = {}
        
        feedback.setCurrentStep(step)
        step+=1
        
        outputs['buffer_result'] = processing.run("native:buffer", {
            'INPUT': centroids,
            'DISTANCE': buffer_distance,
            'SEGMENTS': 5,
            'END_CAP_STYLE': 0,
            'JOIN_STYLE': 0,
            'MITER_LIMIT': 2,
            'DISSOLVE': False,
            'OUTPUT': 'TEMPORARY_OUTPUT'
        }, context=context, feedback=feedback, is_child_algorithm=True)
        results['buffer_result'] = outputs['buffer_result']['OUTPUT']
        
        feedback.setCurrentStep(step)
        step+=1
        outputs['zonal_statistics_result'] = processing.run("native:zonalstatisticsfb", {
        'INPUT': results['buffer_result'],
        'INPUT_RASTER': landcover,
        'RASTER_BAND':1,
        'COLUMN_PREFIX':'_',
        'STATISTICS':[0,1,2],
        'OUTPUT':'TEMPORARY_OUTPUT'
        },
        context=context, feedback=feedback, is_child_algorithm=True)
        results['zonal_statistics_result'] = outputs['zonal_statistics_result']['OUTPUT']
        
        feedback.setCurrentStep(step)
        step+=1
        outputs['join_1'] = processing.run("native:joinattributestable", {
        'INPUT':results['buffer_result'],
        'FIELD':'OBJECTID',
        'INPUT_2': results['zonal_statistics_result'],
        'FIELD_2':'OBJECTID',
        'FIELDS_TO_COPY':[],
        'METHOD':1,
        'DISCARD_NONMATCHING':False,
        'PREFIX':'',
        'OUTPUT':'TEMPORARY_OUTPUT'
        }, context=context, feedback=feedback, is_child_algorithm=True)
        results['join_1'] = outputs['join_1']['OUTPUT']
        
        feedback.setCurrentStep(step)
        step+=1
        #drop geometry of the multipolygons, so fields can be attached to point layer
        outputs['dropped_geoms'] = processing.run("native:dropgeometries", {
        'INPUT': results['join_1'],
        'OUTPUT':'TEMPORARY_OUTPUT'
        },
        context=context, feedback=feedback, is_child_algorithm=True)
        results['dropped_geoms'] = outputs['dropped_geoms']['OUTPUT']
        
        feedback.setCurrentStep(step)
        step+=1
        #attach fields without geometry to point layer
        outputs['join_2'] = processing.run("native:joinattributestable", {
        'INPUT':centroids,
        'FIELD':'OBJECTID',
        'INPUT_2': results['dropped_geoms'],
        'FIELD_2':'OBJECTID',
        'FIELDS_TO_COPY':['_count'],
        'METHOD':1,
        'DISCARD_NONMATCHING':False,
        'PREFIX':'',
        'OUTPUT':parameters[self.VUN_OUTPUT]
        }, context=context, feedback=feedback, is_child_algorithm=True)
        results['join_2'] = outputs['join_2']['OUTPUT']

        return results