spark中MinMaxScaler这样的缩放器有没有”inverse_transform”方法?

Is there no "inverse_transform" method for a scaler like MinMaxScaler in spark?

在训练模型时,比如线性回归,我们可能会在训练测试数据集时进行归一化,比如 MinMaxScaler。

在我们获得经过训练的模型并使用它进行预测并将预测缩减为原始表示之后。

在python中,有"inverse_transform"方法。例如:

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

spark中有类似的方法吗?

我搜索了很多,但没有找到答案。谁能给我一些建议?

非常感谢!


在我们公司,为了解决 StandardScaler 上的相同问题,我们扩展了 spark.ml (除其他外):

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

修改它或为您的具体情况做类似的事情应该相当容易。

请记住,由于 JVM 的双重实现,您通常会在这些操作中失去精度,因此您将无法恢复转换之前的确切原始值??(例如:您可能会得到类似 1.9999999999999998 的东西,而不是2.0).


也许我来晚了,但是,最近遇到了完全相同的问题,找不到任何可行的解决方案。

假设这个问题的作者不必反转向量的 MinMax 值,而是只需要反转一列。

列的最小值最大值以及缩放器的最小值-最大值参数也是已知的。

根据 scikit 学习网站的 MinMaxScaler 背后的数学:

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

"逆向工程" MinMaxScaler 公式

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

实施

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

输出

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

这里没有直接的解决方案。

由于只有当数组是列时才能将数组传递给 UDF(lit(array) 不会这样做),所以我使用以下解决方法。

简而言之,它将倒置刻度数组转换为字符串,将其传递给 UDF,然后求解数学。

您可以在反函数(也附在此处)中使用该缩放数组(字符串),获取反转值。

代码:

from sklearn.preprocessing import MinMaxScaler

scalerModel.inverse_transform



from sklearn.preprocessing import MinMaxScaler



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler()

MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)



dataScaled = scaler.fit(data).transform(data)

print(dataScaled)



scaler.inverse_transform(dataScaled)package org.apache.spark.ml



import org.apache.spark.ml.linalg.DenseVector

import org.apache.spark.ml.util.Identifiable



package object feature {



  implicit class RichStandardScalerModel(model: StandardScalerModel) {



    private def invertedStdDev(sigma: Double): Double = 1 / sigma



    private def invertedMean(mu: Double, sigma: Double): Double = -mu / sigma



    def inverse(newOutputCol: String): StandardScalerModel = {

      val sigma: linalg.Vector = model.std

      val mu: linalg.Vector = model.mean

      val newSigma: linalg.Vector = new DenseVector(sigma.toArray.map(invertedStdDev))

      val newMu: linalg.Vector = new DenseVector(mu.toArray.zip(sigma.toArray).map { case (m, s) => invertedMean(m, s) })

      val inverted: StandardScalerModel = new StandardScalerModel(Identifiable.randomUID("stdScal"), newSigma, newMu)

        .setInputCol(model.getOutputCol)

        .setOutputCol(newOutputCol)



      inverted

        .set(inverted.withMean, model.getWithMean)

        .set(inverted.withStd, model.getWithStd)

    }

  }



}X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

X_scaled = X_std * (max - min) + minX_scaled = (X - Xmin) / (Xmax) - Xmin) * (max - min) + min

X = (max * Xmin - min * Xmax - Xmin * X_scaled + Xmax * X_scaled)/(max - min)from sklearn.preprocessing import MinMaxScaler

import pandas



data = [[-1, 2], [-0.5, 6], [0, 10], [1, 18]]



scaler = MinMaxScaler(copy=True, feature_range=(0, 1))

print(data)

dataScaled = scaler.fit(data).transform(data)



data_sp = spark.createDataFrame(pandas.DataFrame(data, columns=["x","y"]).join(pandas.DataFrame(dataScaled, columns=["x_scaled","y_scaled"])))

data_sp.show()

print("Inversing column: y_scaled")

Xmax = data_sp.select("y").rdd.max()[0]

Xmin = data_sp.select("y").rdd.min()[0]

_max = scaler.feature_range[1]

_min = scaler.feature_range[0]



print("Xmax =", Xmax,"Xmin =", Xmin,"max =", _max,"min =", _min)

data_sp.withColumn(colName="y_scaled_inversed", col=(_max * Xmin - _min * Xmax - Xmin * data_sp.y_scaled + Xmax * data_sp.y_scaled)/(_max - _min)).show()[[-1, 2], [-0.5, 6], [0, 10], [1, 18]]

+----+---+--------+--------+

|  x| y|x_scaled|y_scaled|

+----+---+--------+--------+

|-1.0| 2|   0.0|   0.0|

|-0.5| 6|  0.25|  0.25|

| 0.0| 10|   0.5|   0.5|

| 1.0| 18|   1.0|   1.0|

+----+---+--------+--------+



Inversing column: y_scaled

Xmax = 18 Xmin = 2 max = 1 min = 0

+----+---+--------+--------+-----------------+

|  x| y|x_scaled|y_scaled|y_scaled_inversed|

+----+---+--------+--------+-----------------+

|-1.0| 2|   0.0|   0.0|       2.0|

|-0.5| 6|  0.25|  0.25|       6.0|

| 0.0| 10|   0.5|   0.5|       10.0|

| 1.0| 18|   1.0|   1.0|       18.0|

+----+---+--------+--------+-----------------+from pyspark.ml.feature import VectorAssembler, QuantileDiscretizer

from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT



df = spark.createDataFrame([

  (0, 1, 0.5, -1),

  (1, 2, 1.0, 1),

  (2, 4, 10.0, 2)

], ["id", 'x1', 'x2', 'x3'])



df.show()



def Normalize(df):



  scales = df.describe()

  scales = scales.filter("summary = 'mean' or summary = 'stddev'")

  scales = scales.select(["summary"] + [col(c).cast("double") for c in scales.columns[1:]])



  assembler = VectorAssembler(

    inputCols=scales.columns[1:],

    outputCol="X_scales")



  df_scales = assembler.transform(scales)



  x_mean = df_scales.filter("summary = 'mean'").select('X_scales')

  x_std = df_scales.filter("summary = 'stddev'").select('X_scales')



  ks_std_lit = lit('|'.join([str(s) for s in list(x_std.collect()[0].X_scales)]))

  ks_mean_lit = lit('|'.join([str(s) for s in list(x_mean.collect()[0].X_scales)]))



  assembler = VectorAssembler(

  inputCols=df.columns[0:4],

  outputCol="features")



  df_features = assembler.transform(df)

  df_features = df_features.withColumn('Scaled', exec_norm_udf(df_features.features, ks_mean_lit, ks_std_lit))



  return df_features, ks_mean_lit, ks_std_lit



def exec_norm(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) - np.array(x_mean)) / np.array(x_std)

  res = list(res)



  return Vectors.dense(res)





exec_norm_udf = udf(exec_norm, VectorUDT())





def scaler_invert(vector, x_mean, x_std):

  x_mean = [float(s) for s in x_mean.split('|')]

  x_std = [float(s) for s in x_std.split('|')]



  res = (np.array(vector) * np.array(x_std)) + np.array(x_mean)

  res = list(res)



  return Vectors.dense(res)





scaler_invert_udf = udf(scaler_invert, VectorUDT())





df, scaler_mean, scaler_std = Normalize(df)

df.withColumn('inverted', scaler_invert_udf(df.Scaled, scaler_mean, scaler_std)).show(truncate=False)

相关推荐

  • Spring部署设置openshift

    Springdeploymentsettingsopenshift我有一个问题让我抓狂了三天。我根据OpenShift帐户上的教程部署了spring-eap6-quickstart代码。我已配置调试选项,并且已将Eclipse工作区与OpehShift服务器同步-服务器上的一切工作正常,但在Eclipse中出现无法消除的错误。我有这个错误:cvc-complex-type.2.4.a:Invali…
    2025-04-161
  • 检查Java中正则表达式中模式的第n次出现

    CheckfornthoccurrenceofpatterninregularexpressioninJava本问题已经有最佳答案,请猛点这里访问。我想使用Java正则表达式检查输入字符串中特定模式的第n次出现。你能建议怎么做吗?这应该可以工作:MatchResultfindNthOccurance(intn,Patternp,CharSequencesrc){Matcherm=p.matcher…
    2025-04-161
  • 如何让 JTable 停留在已编辑的单元格上

    HowtohaveJTablestayingontheeditedcell如果有人编辑JTable的单元格内容并按Enter,则内容会被修改并且表格选择会移动到下一行。是否可以禁止JTable在单元格编辑后转到下一行?原因是我的程序使用ListSelectionListener在单元格选择上同步了其他一些小部件,并且我不想在编辑当前单元格后选择下一行。Enter的默认绑定是名为selectNext…
    2025-04-161
  • Weblogic 12c 部署

    Weblogic12cdeploy我正在尝试将我的应用程序从Tomcat迁移到Weblogic12.2.1.3.0。我能够毫无错误地部署应用程序,但我遇到了与持久性提供程序相关的运行时错误。这是堆栈跟踪:javax.validation.ValidationException:CalltoTraversableResolver.isReachable()threwanexceptionatorg.…
    2025-04-161
  • Resteasy Content-Type 默认值

    ResteasyContent-Typedefaults我正在使用Resteasy编写一个可以返回JSON和XML的应用程序,但可以选择默认为XML。这是我的方法:@GET@Path("/content")@Produces({MediaType.APPLICATION_XML,MediaType.APPLICATION_JSON})publicStringcontentListRequestXm…
    2025-04-161
  • 代码不会停止运行,在 Java 中

    thecodedoesn'tstoprunning,inJava我正在用Java解决项目Euler中的问题10,即"Thesumoftheprimesbelow10is2+3+5+7=17.Findthesumofalltheprimesbelowtwomillion."我的代码是packageprojecteuler_1;importjava.math.BigInteger;importjava…
    2025-04-161
  • Out of memory java heap space

    Outofmemoryjavaheapspace我正在尝试将大量文件从服务器发送到多个客户端。当我尝试发送大小为700mb的文件时,它显示了"OutOfMemoryjavaheapspace"错误。我正在使用Netbeans7.1.2版本。我还在属性中尝试了VMoption。但仍然发生同样的错误。我认为阅读整个文件存在一些问题。下面的代码最多可用于300mb。请给我一些建议。提前致谢publicc…
    2025-04-161
  • Log4j 记录到共享日志文件

    Log4jLoggingtoaSharedLogFile有没有办法将log4j日志记录事件写入也被其他应用程序写入的日志文件。其他应用程序可以是非Java应用程序。有什么缺点?锁定问题?格式化?Log4j有一个SocketAppender,它将向服务发送事件,您可以自己实现或使用与Log4j捆绑的简单实现。它还支持syslogd和Windows事件日志,这对于尝试将日志输出与来自非Java应用程序…
    2025-04-161