PyQGIS Select by Expression – Using in Processing Script Context

pyqgisqgis-processingselect-by-expression

Is there a way to select features and use them in subsequent algorithms within a processing script context?

I am looking to iteratively selectbyexpression a subset of line features in a layer based on geometry lengths, then using the selected lines in the snapgeometries processing algorithm.

I can do so in the python console with the following code:

point_layer = QgsProject.instance().mapLayersByName('points')[0]
line_layer = QgsProject.instance().mapLayersByName('lines')[0]
longest_length = max([line.geometry().length() for line in line_layer.getFeatures()])

length = 0
while length < longest_length:
  line_layer.selectbyexpression('$length >= {} and $length < {}'.format(length,length+10))
  thresh = (length+5)/2
  params = {'INPUT':QgsProcessingFeatureSourceDefinition(line_layer.id(), True),
        'REFERENCE_LAYER':point_layer,
        'TOLERANCE':thresh,
        'BEHAVIOR':3,
        'OUTPUT':'TEMPORARY_OUTPUT'}
        result = processing.run("native:snapgeometries", params)
  QgsProject.instance().addMapLayer(result['OUTPUT'])
  length += 10

When I attempt to replicate it in a processing script context with:

...
source_line = self.parameterAsSource(parameters, self.IN_LINES, context)
...
source_line.selectByExpression('$length >= {} and $length <= {}'.format(length,length+10))
...

I get this error:

'QgsProcessingFeatureSource' object has no attribute 'selectByExpression'

Best Answer

I'm not sure if these example scripts will work exactly the way you want but you are welcome to try them.

The scripts are based on the template found here.

One option is to use the materialize() method of QgsFeatureSource which returns a QgsVectorLayer. Example below:

from qgis.PyQt.QtCore import QCoreApplication, QVariant
from qgis.core import (QgsProcessing,
                        QgsProcessingAlgorithm,
                        QgsProcessingParameterFeatureSource,
                        QgsFeatureRequest,
                        QgsExpression,
                        QgsProject,
                        QgsProcessingUtils)
from qgis import processing
                    
                    
class ExAlgo(QgsProcessingAlgorithm):
    LINES = 'LINES'
    POINTS = 'POINTS'
    
    final_layers = []
 
    def __init__(self):
        super().__init__()
 
    def name(self):
        return "exalgo"
     
    def tr(self, text):
        return QCoreApplication.translate("exalgo", text)
         
    def displayName(self):
        return self.tr("Example script")
 
    def group(self):
        return self.tr("Examples")
 
    def groupId(self):
        return "examples"
 
    def shortHelpString(self):
        return self.tr("Example script with vector layer inputs")
 
    def helpUrl(self):
        return "https://qgis.org"
         
    def createInstance(self):
        return type(self)()
   
    def initAlgorithm(self, config=None):
        self.addParameter(QgsProcessingParameterFeatureSource(
            self.LINES,
            self.tr("Input line layer"),
            [QgsProcessing.TypeVectorLine]))
            
        self.addParameter(QgsProcessingParameterFeatureSource(
            self.POINTS,
            self.tr("Input point layer"),
            [QgsProcessing.TypeVectorPoint]))
 
    def processAlgorithm(self, parameters, context, feedback):

        # retrieve your input file parameter
        source_lines = self.parameterAsSource(parameters, self.LINES, context)
        source_points = self.parameterAsSource(parameters, self.POINTS, context)
        point_layer = source_points.materialize(QgsFeatureRequest())
        longest_length = max([line.geometry().length() for line in source_lines.getFeatures()])
        
        #####################################################################
        l = 0
        while l < longest_length:
            line_layer = source_lines.materialize(
                                QgsFeatureRequest(
                                QgsExpression("$length >= {} and $length < {}".format(l,l+10))))
            thresh = (l+5)/2
            params = {'INPUT':line_layer,
                'REFERENCE_LAYER':point_layer,
                'TOLERANCE':thresh,
                'BEHAVIOR':3,
                'OUTPUT':'TEMPORARY_OUTPUT'}
            result = processing.run("native:snapgeometries",
                                                    params,
                                                    is_child_algorithm=True,
                                                    context=context,
                                                    feedback=feedback)
            self.final_layers.append(QgsProcessingUtils.mapLayerFromString(result['OUTPUT'], context))
            l += 10
        ######################################################################
        
        return {'OUTPUT': result['OUTPUT']}
        
    def postProcessAlgorithm(self, context, feedback):
        for index, layer in enumerate(self.final_layers):
            layer.setName(f'lines_snapped_{index+1}')
            QgsProject.instance().addMapLayer(layer)
        self.final_layers.clear()
        return {}

Or another option is to use QgsProcessingParameterVectorLayer instead of QgsProcessingParameterFeatureSource and work directly with QgsVectorLayer objects.

Example:

from qgis.PyQt.QtCore import QCoreApplication, QVariant
from qgis.core import (QgsProcessing,
                        QgsProcessingAlgorithm,
                        QgsProcessingParameterVectorLayer,
                        QgsFeatureRequest,
                        QgsExpression,
                        QgsProject,
                        QgsProcessingUtils)
from qgis import processing
                    
                    
class ExAlgo(QgsProcessingAlgorithm):
    LINES = 'LINES'
    POINTS = 'POINTS'
    
    final_layers = []
 
    def __init__(self):
        super().__init__()
        
    def flags(self):
        return super().flags() | QgsProcessingAlgorithm.FlagNoThreading
 
    def name(self):
        return "exalgo"
     
    def tr(self, text):
        return QCoreApplication.translate("exalgo", text)
         
    def displayName(self):
        return self.tr("Example script")
 
    def group(self):
        return self.tr("Examples")
 
    def groupId(self):
        return "examples"
 
    def shortHelpString(self):
        return self.tr("Example script with vector layer inputs")
 
    def helpUrl(self):
        return "https://qgis.org"
         
    def createInstance(self):
        return type(self)()
   
    def initAlgorithm(self, config=None):
        self.addParameter(QgsProcessingParameterVectorLayer(
            self.LINES,
            self.tr("Input line layer"),
            [QgsProcessing.TypeVectorLine]))
            
        self.addParameter(QgsProcessingParameterVectorLayer(
            self.POINTS,
            self.tr("Input point layer"),
            [QgsProcessing.TypeVectorPoint]))
 
    def processAlgorithm(self, parameters, context, feedback):

        # retrieve your input file parameter
        line_layer = self.parameterAsVectorLayer(parameters, self.LINES, context)
        point_layer = self.parameterAsVectorLayer(parameters, self.POINTS, context)
        longest_length = max([line.geometry().length() for line in line_layer.getFeatures()])
        
        #####################################################################
        l = 0
        while l < longest_length:
            line_layer.selectByExpression("$length >= {} and $length < {}".format(l,l+10))
            thresh = (l+5)/2
            params = {'INPUT':line_layer,
                'REFERENCE_LAYER':point_layer,
                'TOLERANCE':thresh,
                'BEHAVIOR':3,
                'OUTPUT':'TEMPORARY_OUTPUT'}
            result = processing.run("native:snapgeometries",
                                                    params,
                                                    is_child_algorithm=True,
                                                    context=context,
                                                    feedback=feedback)
            self.final_layers.append(QgsProcessingUtils.mapLayerFromString(result['OUTPUT'], context))
            l += 10
        ######################################################################
        
        return {'OUTPUT': result['OUTPUT']}
        
    def postProcessAlgorithm(self, context, feedback):
        for index, layer in enumerate(self.final_layers):
            layer.setName(f'lines_snapped_{index+1}')
            QgsProject.instance().addMapLayer(layer)
        self.final_layers.clear()
        return {}
Related Question