Skip to main content


Learn About Our Meetup

5000+ Members



Join our meetup, learn, connect, share, and get to know your Toronto AI community. 



Browse through the latest deep learning, ai, machine learning postings from Indeed for the GTA.



Are you looking to sponsor space, be a speaker, or volunteer, feel free to give us a shout.

[P] Importing Pyspark PipelineModel with custom transformers into Scala

I recently created a PipelineModel with a few custom transformers to generate features not doable with the native Spark transformers. Here’s an example of one of my transformers:

class newLabelMap( Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable, ): inputCol = Param(Params._dummy(),"inputCol","The input column",TypeConverters.toString) outputCol = Param(Params._dummy(),"outputCol","The output column",TypeConverters.toString) def __init__(self, inputCol = "", outputCol=""): super(newLabelMap, self).__init__() self._setDefault(inputCol="") self._setDefault(outputCol="") self._set(inputCol=inputCol) self._set(outputCol=outputCol) def getInputCol(self): return self.getOrDefault(self.inputCol) def setInputCol(self, inputCol): self._set(inputCol=inputCol) def getOutputCol(self): return self.getOrDefault(self.outputCol) def setOutputCol(self, outputCol): self._set(outputCol=outputCol) def _transform(self, dataset): @udf("string") def findLabel(labelVal): new_label_dict = {'oldLabel0' : 'newLabel0', 'oldLabel1' : 'newLabel1', 'oldLabel2' : 'newLabel1', 'oldLabel3' : 'newLabel1', 'oldLabel4' : 'newLabel2', 'oldLabel5' : 'newLabel2', 'oldLabel6' : 'newLabel2', 'oldLabel7' : 'newLabel3', 'oldLabel8' : 'newLabel3', 'oldLabel9' : 'newLabel4', 'oldLabel10' : 'newLabel4'} try: labelKey = new_label_dict[labelVal] return labelKey except: return 'other' out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] return dataset.withColumn(out_col, findLabel(in_col)) 

The transformer works fine in the Pipeline, I can save it, load it back into a pyspark session, and transform. The issue comes when I try to import it into a scala environment. When I try to load the model, I receive this error output:

Name: java.lang.IllegalArgumentException Message: requirement failed: Error loading metadata: Expected class name but found class name StackTrace: at scala.Predef$.require(Predef.scala:224) at$.parseMetadata(ReadWrite.scala:638) at$.loadMetadata(ReadWrite.scala:616) at$SharedReadWrite$.load(Pipeline.scala:267) at$PipelineModelReader.load(Pipeline.scala:348) at$PipelineModelReader.load(Pipeline.scala:342) 

If I remove the custom transformer, it loads just fine in Scala, so I’m curious how to be able to use custom transformers written in pyspark that can be ported in a PipelineModel to a Scala environment? Do I need to append my code in any way? Any help is greatly appreciated 🙂

submitted by /u/Octosaurus
[link] [comments]