Skip to main content

Blog

Learn About Our Meetup

5000+ Members

MEETUPS

LEARN, CONNECT, SHARE

Join our meetup, learn, connect, share, and get to know your Toronto AI community. 

JOB POSTINGS

INDEED POSTINGS

Browse through the latest deep learning, ai, machine learning postings from Indeed for the GTA.

CONTACT

CONNECT WITH US

Are you looking to sponsor space, be a speaker, or volunteer, feel free to give us a shout.

[P] Importing Pyspark PipelineModel with custom transformers into Scala

I recently created a PipelineModel with a few custom transformers to generate features not doable with the native Spark transformers. Here’s an example of one of my transformers:

class newLabelMap( Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable, ): inputCol = Param(Params._dummy(),"inputCol","The input column",TypeConverters.toString) outputCol = Param(Params._dummy(),"outputCol","The output column",TypeConverters.toString) def __init__(self, inputCol = "", outputCol=""): super(newLabelMap, self).__init__() self._setDefault(inputCol="") self._setDefault(outputCol="") self._set(inputCol=inputCol) self._set(outputCol=outputCol) def getInputCol(self): return self.getOrDefault(self.inputCol) def setInputCol(self, inputCol): self._set(inputCol=inputCol) def getOutputCol(self): return self.getOrDefault(self.outputCol) def setOutputCol(self, outputCol): self._set(outputCol=outputCol) def _transform(self, dataset): @udf("string") def findLabel(labelVal): new_label_dict = {'oldLabel0' : 'newLabel0', 'oldLabel1' : 'newLabel1', 'oldLabel2' : 'newLabel1', 'oldLabel3' : 'newLabel1', 'oldLabel4' : 'newLabel2', 'oldLabel5' : 'newLabel2', 'oldLabel6' : 'newLabel2', 'oldLabel7' : 'newLabel3', 'oldLabel8' : 'newLabel3', 'oldLabel9' : 'newLabel4', 'oldLabel10' : 'newLabel4'} try: labelKey = new_label_dict[labelVal] return labelKey except: return 'other' out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] return dataset.withColumn(out_col, findLabel(in_col)) 

The transformer works fine in the Pipeline, I can save it, load it back into a pyspark session, and transform. The issue comes when I try to import it into a scala environment. When I try to load the model, I receive this error output:

Name: java.lang.IllegalArgumentException Message: requirement failed: Error loading metadata: Expected class name org.apache.spark.ml.PipelineModel but found class name pyspark.ml.pipeline.PipelineModel StackTrace: at scala.Predef$.require(Predef.scala:224) at org.apache.spark.ml.util.DefaultParamsReader$.parseMetadata(ReadWrite.scala:638) at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:616) at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:267) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:348) at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:342) 

If I remove the custom transformer, it loads just fine in Scala, so I’m curious how to be able to use custom transformers written in pyspark that can be ported in a PipelineModel to a Scala environment? Do I need to append my code in any way? Any help is greatly appreciated 🙂

submitted by /u/Octosaurus
[link] [comments]