AI と機械学習のための大規模な特徴量エンジニアリング

特徴量エンジニアリングは、機械学習のプロセスの中で最も重要なステップの 1 つであり、多くの時間を要します。データサイエンティストやアナリストは、さまざまな特徴量を組み合わせた実験を重ねてモデルを改善し、ビジネスに有益な情報を提供する BI レポートの作成を目指します。そのような状況下で、データサイエンティストが扱うデータの規模および複雑さが増大し、次のような事柄が課題となっています。

  1. 特徴量をシンプルかつ一貫性のある方法で定義すること
  2. 既存の特徴量の識別と再利用
  3. 既存の特徴量を利用した拡張
  4. 特徴量やモデルのバージョン管理
  5. 特徴量定義のライフサイクルの管理
  6. 特徴量の計算と保存の効率化
  7. 大規模テーブル(>1000 列)の効率的な計算と永続化
  8. 意思決定につながるモデルのもとになった特徴量の再現(例:監査や解釈可能性などの実証)

In this blog, we present design patterns for generating large-scale features. A reference implementation of the design patterns is provided in the attached notebooks to demonstrate how first-class design patterns can simplify the feature engineering process and facilitate efficiency across the silos in your organization. The approach can be integrated with the recently-launched Databricks Feature Store, the first of its kind co-designed with an MLOps and data platform, and can leverage the storage and MLOps capabilities of Delta Lake and MLFlow.

今回の例では、TPC-DS のトレーニングデータセットを使用して、Apache Spark™ による大規模なファーストクラス特徴量エンジニアリングのワークフローの利点を示しています。これらの複雑な変換は自己文書化されており、効率的で拡張も可能です。売上や取引などの基本的な指標を、顧客や時間などのディメンションを対象に修正・変換し、モデル化可能な特徴量を作成しました。これらの複雑な変換は自己文書化されており、効率的で拡張も可能です。ファーストクラス特徴量エンジニアリングのフレームワークは業界に特化したものではありませんが、それぞれの組織の個別の目標に合わせて容易に拡張できるものでなければなりません。このブログでは、フレームワーク内で高階関数を適用して適合させることで、拡張が可能であることを示しています。

今回の特徴量エンジニアリングのアプローチは、データの大規模化に起因する困難な課題にも対応できるように設計されています。ほぼ全てのビジネスにおいてデータが爆発的に増加しており、特徴量そのものだけでなく、その作成や管理も含めた作業の指数関数的な増加が、業界を問わず大きな課題となっています。このブログで紹介するフレームワークは既に複数の業界で検討・導入されており、その一部について以下で解説します。

今回の特徴量エンジニアリングのアプローチは、データの大規模化に起因する困難な課題にも対応できるように設計されています。ほぼ全てのビジネスにおいてデータが爆発的に増加しており、特徴量そのものだけでなく、その作成や管理も含めた作業の指数関数的な増加が、業界を問わず大きな課題となっています。このブログで紹介するフレームワークは既に複数の業界で検討・導入されており、その一部について以下で解説します。

ファーストクラス特徴量エンジニアリングのフレームワークは業界に特化したものではありませんが、それぞれの組織の個別の目標に合わせて容易に拡張できるものでなければなりません。

アーキテクチャの概要

このブログで紹介しているデザインパターンは、Feature Factory にまとめられたものに基づいています。以下の図は、典型的なワークフローです。まず、未加工のデータからベースとなる特徴量が定義され、他の特徴量の構成要素となります。例えば、特徴量 total_sales を基本特徴量として定義し、顧客ごとにグループ化して sales_value の合計を求めることが可能です。新たに生成された特徴量は、基本特徴量に対してより複雑な操作を行うための入力値として使用できます。多数の特徴量の生成、文書化、テスト、検証、永続化を、数行のコードで迅速に行うことができます。

Feature Factory をベースにした特徴量エンジニアリングの典型的なワークフロー。

Feature Store API を使用し、特徴量の定義を未加工のデータに適用して、特徴量をデータフレームとして生成することも可能です。データフレームを Feature Registry に保存することもできます。また、Delta Lake では、特徴量生成エンジンで利用する複数の最適化機能を提供しています。特徴量の定義はバージョン管理されており、トレーサビリティ、再現性、時間的な解釈可能性が必要になる場面や、監査の際に利用できます。

以下のコード例では、特徴量の定義をマテリアライズし、Feature Store に登録する方法を示しています。

  def compute_customer_features(data):
  features = StoreSales()
  fv_months = features.total_sales.multiply("i_category", ["Music", "Home", "Shoes"]).multiply("month_id", [200012, 200011, 200010])
  df = append_features(src_df, [features.collector], fv_months)
  return df
customer_features_df = compute_customer_features(src_df)


Feature Store にマテリアルズ、登録された特徴量定義の例。

シンプルな特徴量の乗算による結果

fs = feature_store.FeatureStoreClient()
fs.create_feature_table(
    name="db.customer_features",
    keys=["customer_id"],
    features_df=customer_features_df,
    description="customer feature table",


Feature Store に作成された特徴量の例。

Feature Store で生成された特徴量

例として、上記の表では、total_sales_Music_200012customer_id 46952 )が 1752.68 になっています。つまり、この顧客は、2000年 12 月時点の定義における total_sales として、1752.68 ドルに相当する楽曲を購入したことになります。

データセット

リファレンス実装では、原則として TPC-DS の3つの販売チャネル(Web、Store、Catalog)に基づいています。このブログのコード例では、date_dim で結合されたStoreSales テーブルと item テーブルから作成された特徴量を示しています。各テーブルで定義する内容は以下のとおりです。

  • Store_Sales:実店舗内での製品取引から生じた収益
  • Date_Dim:日付のディメンションを表すカレンダー型のテーブル
  • Item:販売可能な SKU

基本特徴量の定義

Spark API には、特徴量エンジニアリングに利用できるデータエンジニアリング向けの強力な機能が用意されており、ラッパーといくつかのコンテキスト定義を使って、複雑な内容を抽象化し、簡単に再利用できるようにします。Feature クラスは、統一されたインタフェースを提供し、次のコンポーネントを使って特徴量を定義します。

  • _base_col :シンプルな列型の表現のカラムやその他の特徴量。
  • _filter:条件リスト、または True/False の列型表現。True の場合は、 _base_col で定義されたロジックが特徴量として使用され、そうでない場合には _negative_value を使用して特徴量が計算される。
  • _negative_value :_filter が False を返す場合に評価される表現。
  • _agg_func :ベースカラムの集計に使用する Spark SQL 関数を定義。_agg_func が定義されていない場合、その特徴量は集約表現ではない(つまり、特徴量は 1 つのみ)。
class Feature:
    def __init__(self,
                 _name: str,
                 _base_col: Union[Column, Feature],
                 _filter=[],
                 _negative_value=None,
                 _agg_func=None):

以下は、2019年上半期の売上を集計する集約特徴量の定義方法の例です。

total_sales = Feature(_name="total_sales",
			_base_col="sales_value",
			_filter=[col("month_id").between(201901, 201906)],
			_agg_func=sum)

以下は、上と同等の表現です。

sum(when(col("month_id").between(201901, 201906), col("sales_value")).otherwise(None))

特徴量のモジュール化

特徴量エンジニアリングにおいてよくみられる問題は、データサイエンスチームが独自に特徴量を定義する一方、その特徴量定義の文書化や可視化がされておらず、他のチームと簡単に共有できないことです。その結果、取り組みやコードの重複だけでなく、最悪の場合には、同じ意図に基づいた特徴量であってもロジックや結果が異なるということが起こります。チーム間でバグの修正、改善、文書化を行うことは、非常に困難です。特徴量の定義をモジュール化することで、これらの一般的な課題を軽減することができます。

組織や部門間で共有する場合、組織内での担当領域が異なると同様の概念に対しても計算方法が違ってくるため、抽象化の程度をさらに深める必要があります。例えば、net_sales の計算は、店舗、ウェブ、通信販売の事業部門で必要ですが、入力だけでなく計算についても事業部門ごとに異なる可能性があります。net_sales をより多様な組織を対象に異なる形で導き出せるようにするには、net_sales とその共通項を common_module(例:sales_common)に昇格させ、そこにユーザーが事業ごとのルールを反映させられるようにする必要があります。ほとんどの特徴量は、他の事業部門と大きく重複することはなく、共通項となることはありません。しかし、そのような特徴量が別の事業部門では価値がないということにはなりません。異なる概念を組み合わせて扱う特徴量を生成することは可能です。ただし、共通の上位概念が存在しない場合、使用に際しては、元の概念(channel 等)のルールに従う必要があります。例えば、店舗の売上を予測する機械学習(ML)モデルでは、通信販売の特徴量から値を取得することがよくあります。catalog_sales を店舗の売上の主要指標として想定する場合、通信販売と店舗という概念上の相違を超えて特徴量を定義づけることはできますが、この場合、ユーザーが外部モジュール(例:名前空間)の構造を通じて定義ルールを理解できるようになっていなければなりませんこのような抽象化の詳細については、これ以上はこのブログ記事では取り上げません。

リファレンス実装では、Feature Family(特徴量の集まり)としてモジュールを実装しています。各特徴量には読み取り専用のプロパティが定義されており、簡単にアクセスできるようになっています。Feature Familyは、汎用性のある ImmutableDictBase クラスから拡張されており、特徴量コレクションのベースクラス、フィルター、その他のオブジェクトとして機能します。下のコード例では、フィルターの定義が特徴量から抽出され、別の Filters クラスを構成します。複数のファミリーで共有される共通の特徴量も、再利用のために別の共通 Features クラスに抽出されます。フィルターと共通の特徴量の両方が、StoreSales ファミリークラスに継承され、共通の定義に基づいて新しい特徴量が定義されます。

コード例では、チャンネルは 1 つだけですが、複数のチャンネルで同じ CommonFeatures を共有しています。特定のチャンネルから特徴量の定義を取得するには、store_channel.total_sales などのように、そのファミリークラスのプロパティにアクセスするだけです。

class CommonFeatures(ImmutableDictBase):
   def __init__(self):
       self._dct["CUSTOMER_NUMBER"] = Feature(_name="CUSTOMER_NUMBER", _base_col=f.col("CUSTOMER_ID").cast("long"))
       self._dct["trans_id"] = Feature(_name="trans_id", _base_col=f.concat("ss_ticket_number","d_date"))

   @property
   def customer(self):
       return self._dct["CUSTOMER_NUMBER"]

   @property
   def trans_id(self):
       return self._dct["trans_id"]


class Filters(ImmutableDictBase):
   def __init__(self):
       self._dct["valid_sales"] = f.col("sales_value") > 0

   @property
   def valid_sales(self):
       return self._dct["valid_sales"]


class StoreSales(CommonFeatures, Filters):
   def __init__(self):
       self._dct = dict()
       CommonFeatures.__init__(self)
       Filters.__init__(self)

       self._dct["total_trans"] = Feature(_name="total_trans",
                                          _base_col=self.trans_id,
                                          _filter=[],
                                          _negative_value=None,
                                          _agg_func=f.countDistinct)

       self._dct["total_sales"] = Feature(_name="total_sales",
                                          _base_col=f.col("sales_value"),
                                          _filter=self.valid_sales,
                                          _negative_value=0,
                                          _agg_func=f.sum)

   @property
   def total_sales(self):
       return self._dct["total_sales"]

   @property
   def total_trans(self):
       return self._dct["total_trans"]

特徴量のオペレーション

特徴量の生成については、大抵の場合、共通のパターンが見受けられます。より高次の関数を含むように特徴量をスケーリングし、冗長な部分を減らして可読性と定義の質を向上させ、より容易に再利用できるようにすることなどです。例えば、以下のようなケースがあります。

  • アナリストが、前月、前四半期、前年など、さまざまな期間における複数の製品の傾向を測定し、比較することを希望する。
  • データサイエンティストが、広告出稿のためのレコメンドシステムを開発する際に、商品カテゴリや市場セグメントごとに顧客の購買パターンを分析する。

多様なユースケースで、類似する一連の操作(フィルター等)を、類似または同等の基本特徴量の上に実装することで、より深く、強力で、具体的な特徴量を生成します。

リファレンス実装では、特徴量がFeature クラスとして定義されており、Feature クラスのメソッドとしてオペレーションが実装されています。より多くの特徴量を生成するために、基本特徴量に乗数を掛けて、個別の時間範囲、値、またはデータ列(Spark Sql Expression 等)のリストとして使用できます。例えば、総売上高の特徴量に月の範囲を掛け合わせて、月別の総売上高の特徴量ベクトルを生成できます。

total_sales * [1M, 3M, 6M] => [total_sales_1M, total_sales_3M, total_sales_6M]

この乗算は、カテゴリ値にも適用できます。次の例では、カテゴリ別の特徴量から、特徴量 total_sales を導き出す方法を示しています。

total_sales * [home, auto] => [total_sales_home, total_sales_auto]

なお、これらの処理では、さまざまな乗算の結果として得られる特徴量を組み合わせて、さらに特徴量を変換するような組合せも可能です。

total_sales_1M * [home, auto] => [total_sales_1M_home, total_sales_1M_home, total_sales_1M_home]

必要に応じて、高次のラムダを適用して、特徴量のリストに特徴量のリストを掛け合わせたリスト内包表記を利用することもできます。例えば、以下の出力変数 total_sales_1M_home は、過去 1 か月間の家庭用品の店舗総売上高を導き出したものです。データサイエンティストが、自分だけが読める何百行もの非効率なコードを使って、何日もかけてデータを処理することがありますが、このフレームワークによりその煩雑な作業が大幅に軽減されます。

total_sales_by_time = total_sales * [1M, 3M, 6M]
categorical_total_sales_by_time = total_sales_by_time * [home, auto] => 
[
total_sales_1M_home, total_sales_1M_home, total_sales_1M_home, total_sales_1M_auto, total_sales_1M_auto, total_sales_1M_auto,
total_sales_3M_home, total_sales_3M_home, total_sales_3M_home, total_sales_3M_auto, total_sales_3M_auto, total_sales_3M_auto,
total_sales_6M_home, total_sales_6M_home, total_sales_6M_home, total_sales_6M_auto, total_sales_6M_auto, total_sales_6M_auto
]

特徴量ベクトル

同じオペレーションで使用する複数の特徴量を 1 つのベクトルに格納することで、特徴量のオペレーションをさらに簡単にすることができます。特徴量ベクトルは、Feature Dictionary の特徴量名を列挙することで作成できます。

features = Features()
fv = FeatureVector.create_by_names(features, ["total_sales", "total_trans"])

特徴量ベクトルでは、単純な乗算や除算、あるいは既存のベクトルに対する統計関数によって、別の特徴量ベクトルを作り出すことが可能です。乗算、除算、統計解析などのメソッドを実装しており、既存の基本特徴量のリストから特徴量を生成するプロセスを簡略化できます。同様に、Spark の特徴量トランスフォーマーをラッピングして、スケーラーや二値化などの一般的な特徴量化手法を実行することもできます。以下は One-hot(ワンホット)エンコーディング(ダミー変数)の例です。

fv2d = fv.multiply_categories("category", ["GROCERY", "MEAT", "DAIRY"])

上記の結果、各カテゴリー(grocery、meat、dairy)ごとの total_sales と total_trans に新しい特徴量が生成されます。より動的な処理を行うには、カテゴリの値をハードコーディングするのではなく、ディメンションテーブルの列から読み取ります。なお、乗算の出力は 2 次元ベクトルです。

GROCERY MEAT DAIRY
total_sales total_sales_grocery total_sales_meat total_sales_dairy
total_trans total_trans_grocery total_trans_meat total_trans_dairy

以下は、FeatureVector の実装方法です。

class FeatureVector:

def __init__(self, features: List[Feature] = None):
if not features:
self._features = []
else:
self._features = features

def __add__(self, other):
"""
Overrides default add so that two feature vectors can be added to form a new feature vector.
e.g. fv1 = fv2 + fv3 in which fv1 contains all features from both fv2 and fv3
:param other:
:return:
"""
return FeatureVector(self._features + other._features)

@classmethod
def create_by_names(cls, feature_collection, feature_names: List[str]):
feat_list = [feature_collection[fn] for fn in feature_names]
return FeatureVector(feat_list)

def multiply(self, multiplier_col: str, multiplier_values: List[str]):
feats = FeatureVector()
for feature in self._features:
fv = feature.multiply(multiplier_col, multiplier_values)
feats += fv
return feats

def create_stats(self, base_name: str, stats=["min", "max", "avg", "stdev"]):
cols = [f.col(feat.name) for feat in self._features]
fl = []
for stat in stats:
if stat == "min":
fl.append(Feature(_name=base_name + "_min", _base_col=f.array_min(f.array(cols))))
elif stat == "max":
fl.append(Feature(_name=base_name + "_max", _base_col=f.array_max(f.array(cols))))
elif stat == "avg":
fl.append(Feature(_name=base_name + "_avg", _base_col=avg_func(f.array(cols))))
elif stat == "stdev":
fl.append(Feature(_name=base_name + "_stdev", _base_col=stdev_func(f.array(cols))))
return FeatureVector(fl)

def to_cols(self):
return [f.col(feat.name) for feat in self._features]

def to_list(self):
return self._features[:]

One-hot encoding(ダミー変数)

One-hot(ワンホット)エンコーディングは、以下のコードで説明するように、特徴量の乗算で簡単に行うことができます。特徴量のエンコーディングでは、base_col を 1negative_value を 0 と定義することで、その特徴量をカテゴリ列で乗算した際に一致したカラムを 1 に、それ以外は 0 になるようにしています。これは、one hot vector やダミー変数とも呼ばれます。

src_df = spark.createDataFrame([(1, "iphone"), (2, "samsung"), (3, "htc"), (4, "vivo")], ["user_id", "device_type"])
encode = Feature(_name="device_type_encode", _base_col=f.lit(1), _negative_value=0)
onehot_encoder = encode.multiply("device_type", ["iphone", "samsung", "htc", "vivo"])
df = append_features(src_df, ["device_type"], onehot_encoder)

AI と機械学習のための大規模な特徴量エンジニアリング

推定を使用した乗算

動的なアプローチを採用することで、わずか数行のコードで多数の特徴量を生成できます。例えば、10 個の特徴量を持つ特徴量ベクトルと、100 個の個別値を持つカテゴリ列を乗算すると、1,000 個のカラムが生成され、さらに 3 つの時間枠を乗算することで 3,000 個のカラムが生成されます。このようなケースは、時間に基づいた統計的な観測(最大値、最小値、平均値、年間トランザクションの傾向など)を行う場合に非常によく見られます。

特徴量の数が増えるほど、Spark のジョブの完了に時間がかかります。また、特徴量の範囲が重複している場合には、再計算が頻繁に行われます。例えば、月間、四半期、半年、年間のトランザクションを計算するため、同じデータセットを複数回カウントする場合、特徴量生成のプロセスを簡単なものにしつつ、パフォーマンスを制御するにはどうすればいいでしょうか。

モデルのパフォーマンスを向上させる方法の一つとして、ソーステーブルを事前に集計し、事前に集計されたデータから特徴量を計算する方法があります。Total_sales は、カテゴリごと、月ごとに事前に集計することができ、後続のロールアップはこの小規模なデータセットに基づいて行われます。この方法は、sum や count のような狭い範囲の変換には簡単に実装できますが、median や distinct count のような広い範囲の変換ではどうでしょうか。以下では、大規模なデータセットに対して誤差を低く抑えながらパフォーマンスを維持するために、推定を利用する方法を2つ紹介します。

HyperLogLog

HyperLogLog(HLL)は、データセット内の distinct count の近似値を求める際に使用される機械学習アルゴリズムです。distinct count 値の高速な算出方法を提供するだけでなく、特筆すべき点として、小さなメモリフットプリントで固定サイズのバイナリスケッチを生成できるということがあります。HLL では、複数のスケッチの union を効率的に求めることができ、スケッチのカーディナリティとして distinct count の近似値を算出します。月ごとの総トランザクションをあらかじめスケッチとして集計しておくことができ、3 か月分のスケッチの union で四半期の総トランザクションが求められます。

total_trans_quarter = cardinality(total_trans_1m_sketch U total_trans_2m_sketch U total_trans_3m_sketch)

計算は以下の図で表現できます。

HLL では、複数のスケッチの union を効率的に求めることができ、スケッチのカーディナリティとして distinct count の近似値を算出します。

MinHash

乗算結果の近似値を求めるもう一つの方法は、2 つのスケッチを交差させ、その交点のカーディナリティを計算することです。

例えば、total_trans_grocery_12m =cardinality(total_trans_12m total_trans_grocery) となります。

乗算結果の近似値を求めるもう一つの方法は、2 つのスケッチを交差させ、その交点のカーディナリティを計算することです。

交点は、包除原理により求めることができます。

|AB| = |A| + |B|-|AB|

しかし、この方法で導き出された交点には、推定による複合的な誤差が生じます。

MinHash は、2 つの集合間の Jaccard 類似度を推定する高速なアルゴリズムです。異なるハッシュ関数や順列関数を使用することで、精度とコンピューティング/ストレージリソースの兼ね合いを調整することができます。MinHash を使用することで、結合された2つの集合の distinct count の計算を線形時間に沿って実行できます。また、メモリのフットプリントも少量で固定できます。

MinHash を使った乗算の詳細については、ブログ最後にある Notebook で確認できます。

特徴量定義のガバナンス

Databricks Notebook では、特徴量定義のバージョン管理やトラッキングと、Github リポジトリでのリンクやバックアップが行えます。ジョブが自動化されており、新しい特徴量に対する機能テストや統合テストを実行できます。追加のストレステストを統合して、異なるシナリオでのパフォーマンスへの影響を判別することもできます。パフォーマンスが一定の範囲を超過した場合、その特徴量にはパフォーマンス警告フラグが付加されます。全てのテストに合格し、コードが承認されると、新しい特徴量定義を実運用できるようになります。

MLflow Tracking により、特徴量からモデルを構築する際に、コードのバージョンとソースデータをトラッキングし、ログに記録できます。.mlflow.spark.autolog()で、Spark のデータソースパス、バージョン、フォーマットのロギングが行えるように設定できます。このモデルは、トレーニングデータやコードリポジトリにある特徴量定義とリンクさせることが可能です。

実験を再現するためには、整合性を保ったデータセットを使用する必要もあります。Delta Lake の タイムトラベル 機能を使用すると、過去のデータの特定のスナップショットからクエリを実行できます。なおタイムトラベル機能は、長期に渡る永続的な時系列でのバージョン管理に使用することは想定されていません。Delta を使用する場合でも、長期間の履歴の保存には標準的なアーカイブプロセスが必要です。

特徴量の探索

特徴量の数が増えるほど、特定の特徴量定義の閲覧や検索が難しくなります。

抽象化された特徴量クラスでは、それぞれの特徴量に説明属性を追加でき、特徴量の説明にテキストマイニングアルゴリズムを適用して、特徴量を自動的に複数カテゴリにクラスタリングすることができます。

データブリックスでは、特徴量の自動クラスタリングのアプローチについて実験し、有望な結果を得ました。このアプローチでは、特徴量が適切に記述され、入力値として提供されることを前提としています。説明を TF-IDF 特徴量空間に変換し、Birch クラスタリングを適用することで、類似した説明文を同じグループに集めることができます。各グループのトピックが、特徴量グループ内における高ランクのキーワードになります。

特徴量のクラスタリングは多目的に利用できます。例えば、類似の特徴量をグループ化して、開発者がより簡単に探せるようにできます。他にも、特徴量の説明の検証に使用することなどが挙げられます。特徴量が適切に文書化されていないと、特徴量が同じグループに想定どおりにクラスタリングされません。

まとめ

このブログでは、特徴量を生成するデザインパターンを紹介し、特徴量の定義や管理を大規模に実現する方法について解説しました。この自動化された特徴量エンジニアリングを採用することで、特徴量の乗算による新たな特徴量の動的な生成や、特徴量ベクトルを使った特徴量の効率的な格納や操作が可能になります。また、推定に基づいた方法で事前に計算された特徴量を統合や交差することにより、特徴量の計算を改善できます。さらに、特徴量をスケーリングして、非常にコストのかかる特徴量の複雑な操作をシンプルかつ効率的に実装し、さまざまなバックグラウンドを持つユーザーに容易に提供することが可能です。

ワークフローの合理化、文書化、重複の最小化、特徴量セット間での一貫性を保証する Feature Factory をご自分で実装にする際に、このブログの内容が参考になれば幸いです。ぜひお試しください。

Notebook のプレビュー

Notebook を使ってみる

Databricks 無料トライアル 使ってみる

ご登録