今回はPySparkを使ったロジスティック回帰による分類予測のモデリングをやってみます。
前回のブログ記事ではPySparkによる線形重回帰による数値予測をやったので、PySparkシリーズということで分類予測をするロジスティック回帰にも挑戦します。
前回同様、ちなみにこちらの本をよく参考にさせていただきました。
使うデータはBankデータとして、その中の"y"列:定期預金したかどうかを目的変数としてモデリングをしていきたいと思います。
説明変数はデータを全部使うわけではなく、簡単のため一部としたいと思います。
PySparkのMLlibでロジスティック回帰
CSVデータを読み込む
まずはSparkを使うためにSparkSessionを立ち上げます。
1 2 3 4 5 6 7 |
from pyspark.sql import SparkSession filename = 'bank/bank-full.csv' spark = SparkSession.builder \ .master("local") \ .appName("logistic_reg_pipeline") \ .getOrCreate() data = spark.read.csv(filename, header=True, inferSchema=True, sep=';') |
spark sessionが立てられたら、CSVファイルを読み込みましょう。
CSV読み込みはread.csvで行うことができ、データにヘッダーがあるのでheaderはTrue、スキーマは勝手に推測してくれた方が楽なので、inferSchema=Trueにしておきます。
最後にCSVデータはセミコロンで区切られているので、sep=";"とします。
これでdataにデータが格納されます。
データ処理からモデリングまでのPipeline作成
使うデータが格納されたので、モデリングをするためのデータ処理をしていきます。
まずは目的変数を作る必要がありますので、現在Yes, Noになっているy列をモデリングができるように、1, 0の値に変換します。
1 2 3 |
#目的変数作成 from pyspark.sql.functions import lit, when, col data1 = data.withColumn("y1", when(col("y") == 'yes' ,lit(1.0)).otherwise(lit(0.0))) |
このあとの流れとしては以下の処理内容を順にパイプラインに登録していきます。
- 文字列カラムをインデックス化
- モデリング用にassemble
- 標準化(standard scaler)
- モデリング
文字列カラムをインデックス化
説明変数の一つで文字列になっているdefault列をインデックス化したいと思います。
1 2 3 |
#String encoding -> default from pyspark.ml.feature import OneHotEncoder, StringIndexer default_index = StringIndexer(inputCol="default", outputCol="default_index") |
モデリング用に説明変数のassemble
モデリングをするためには説明変数を一つにまとめたlistを作る必要がありましたので、それを行うためにassembleをしておきます。
1 2 3 4 |
#assemble from pyspark.ml.feature import VectorAssembler assemble = VectorAssembler(inputCols=['age','balance','duration','campaign', 'previous','default_index'] , outputCol='features') |
標準化
前回の線形重回帰をしたときは簡単のために標準化しませんでしたが、今回はその方法もやっておきたいと思います。
標準化はStandardScalerを使って簡単にできます。
インプットはassembleしたfeatures列で、標準化したあとのアウトプットはscaled_featuresとしています。
1 2 3 |
#standard scaler from pyspark.ml.feature import StandardScaler scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False) |
ロジスティック回帰によるモデリング
さて、ようやくモデリングの段階になります。
ロジスティック回帰によるモデリングも1行でできます。
1 2 3 |
#logistic regression from pyspark.ml.classification import LogisticRegression logistic_regression = LogisticRegression(featuresCol='scaled_features', labelCol='y1') |
パイプラインに登録
これでデータ処理系が完了なので、これらを順にパイプラインのステージに登録していきます。
本来はこの後のAUC計算を入れてもいいですが、ここではわかりやすくモデリングまでとしたいと思います。
1 2 3 |
#pipeline化 from pyspark.ml import Pipeline pipeline = Pipeline(stages=[default_index, assemble, scaler, logistic_regression]) |
訓練データによるモデリング
これでパイプラインへの登録ができたので、実際にデータを投入して予測をしてみましょう!
使うデータを抽出して、訓練用・テスト用に7:3で分割してみます。
ランダムシードは何でもいいので、とりあえず123とかにしておきます。
1 2 3 4 5 |
df = data1.select('age','balance','duration','campaign', 'previous','default','y', 'y1') # train test split #df = df.repartition(100, 'age') train_df, test_df = df.randomSplit([0.7,0.3], seed = 123) |
これでデータを準備できたので、pipelineをfitしてモデリングを行います。
モデルの係数確認と精度評価(AUC)
モデリングができたので、モデルの係数を確認しましょう
モデル係数はstageの中からモデリングをした3番目のを指定して、coefficient, interceptを指定すれば取得できます。
1 2 3 |
#モデリング結果の係数の確認 print("coefficient:",fit_model.stages[3].coefficients) print("intercept:",fit_model.stages[3].intercept) |
モデルを使った推論も実施しておきます。
1 2 |
#訓練データでパイプラインを実施して推論 pred_train = fit_model.transform(train_df) |
assembleした結果の列がfeaturesになっており、それを標準化したのがscaled_features、そして推論結果がprediction列に格納されています。
このpredictionを計算するための確率値がprobabilityというわけです。
それでは最後に、訓練データを使った推論結果をもとに精度評価ということでAUCを計算します。
1 2 3 4 5 |
#訓練データでAUC計算 from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator(labelCol='y1') AUC = evaluator.evaluate(pred_train) AUC |
これで訓練データを使った処理は完了です!
パイプラインを作っておくと便利でいいですね!
テストデータによる精度評価(AUC)
では、次にテストデータを使って推論をして、その結果を使ってAUCの計算をしていきましょう。
もうパイプラインができているので、そこにテストデータを突っ込むだけです。
1 2 3 4 5 6 7 8 |
#テストデータで推論 pred_test = fit_model.transform(test_df) #テストデータでAUC計算 from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator = BinaryClassificationEvaluator(labelCol='y1') AUC = evaluator.evaluate(pred_test) AUC |
これでテストデータを使った精度評価も完了です!
凝った特徴量エンジニアリングとかはしていませんが、基本的な流れはこのようにMLlibを使ってできることがわかったのではないでしょうか!