[P] Importing Pyspark PipelineModel with custom transformers into Scala
I recently created a PipelineModel with a few custom transformers to generate features not doable with the native Spark transformers. Here’s an example of one of my transformers:
class newLabelMap( Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable, ): inputCol = Param(Params._dummy(),"inputCol","The input column",TypeConverters.toString) outputCol = Param(Params._dummy(),"outputCol","The output column",TypeConverters.toString) def __init__(self, inputCol = "", outputCol=""): super(newLabelMap, self).__init__() self._setDefault(inputCol="") self._setDefault(outputCol="") self._set(inputCol=inputCol) self._set(outputCol=outputCol) def getInputCol(self): return self.getOrDefault(self.inputCol) def setInputCol(self, inputCol): self._set(inputCol=inputCol) def getOutputCol(self): return self.getOrDefault(self.outputCol) def setOutputCol(self, outputCol): self._set(outputCol=outputCol) def _transform(self, dataset): @udf("string") def findLabel(labelVal): new_label_dict = {'oldLabel0' : 'newLabel0', 'oldLabel1' : 'newLabel1', 'oldLabel2' : 'newLabel1', 'oldLabel3' : 'newLabel1', 'oldLabel4' : 'newLabel2', 'oldLabel5' : 'newLabel2', 'oldLabel6' : 'newLabel2', 'oldLabel7' : 'newLabel3', 'oldLabel8' : 'newLabel3', 'oldLabel9' : 'newLabel4', 'oldLabel10' : 'newLabel4'} try: labelKey = new_label_dict[labelVal] return labelKey except: return 'other' out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] return dataset.withColumn(out_col, findLabel(in_col))
The transformer works fine in the Pipeline, I can save it, load it back into a pyspark session, and transform. The issue comes when I try to import it into a scala environment. When I try to load the model, I receive this error output:
Name: java.lang.IllegalArgumentException Message: requirement failed: Error loading metadata: Expected class name org.apache.spark.ml.PipelineModel but found class name pyspark.ml.pipeline.PipelineModel StackTrace: at scala.Predef$.require(Predef.scala:224) at org.apache.spark.ml.util.DefaultParamsReader$.parseMetadata(ReadWrite.scala:638) at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:616) at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:267) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:348) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:342)
If I remove the custom transformer, it loads just fine in Scala, so I’m curious how to be able to use custom transformers written in pyspark that can be ported in a PipelineModel to a Scala environment? Do I need to append my code in any way? Any help is greatly appreciated 🙂
submitted by /u/Octosaurus
[link] [comments]