Delta Lake でのスキーマ(schema)DB の適用・展開とは

データブリックスの Notebook シリーズを試す

Delta Lakeのロゴ

データは常に進化し、蓄積されていきます。私たち人間の日々の経験と似ているかもしれません。私たちは、自身の周りの世界の変化についていくために、常に新しいデータを取り込み、認識し、ときにはその中から新たな概念や解釈を得ます。このような認識モデルは、まさにテーブルのスキーマそのものです。どちらも、新しく得る情報の分類と処理のしかたを決める役割を持っています。

データベースにおけるスキーマとは :そもそも「スキーマ(schema)」とは、日本人にとっても馴染みのある「スキーム(scheme)」という言葉の派生語です。計画や図などの意味を持ち、データベース関連だけでなく、哲学や心理学で使われている言葉でもあります。この記事で説明するデータベーススキーマ(DBスキーマ)とは、簡単に言えばデータベースの構造や整理の仕方のことです。細かな定義は、データベースの種類や会社によって異なりますので、今回は Databricks の次世代型データレイク・データウェアハウスである、Delta Lake における DB スキーマの考え方について解説をしていきます。

それでは DB スキーマの説明に入りましょう。ビジネスの問題や要件が時間の経過と共に変化するのと同じように、データの構造も変化します。Delta Lake では、データの変化に応じた新たな概念を容易に取り入れることができます。ユーザーは、シンプルなセマンティクスを使ってテーブルのスキーマを制御します。Delta Lake では、操作ミスや不要なデータの混入によるテーブルの汚染を防ぐスキーマの適用や、有用なデータ列を新たに適切な場所に自動追加できるスキーマの展開など、テーブルのスキーマ(schema)管理ツールが利用できます。このブログでは、これらのツールの使用方法について詳しく見ていきます。

テーブルのスキーマの意味/定義を理解する

Apache Spark™ では、すべての DataFrame にスキーマが定められます。データベース(DB)スキーマとは、データ型、列、メタデータなどのデータ形式を定義した構造を指します。Delta Lake では、テーブルのスキーマ(schema)はトランザクションログ内に JSON 形式で保存されます。

スキーマ適用とは

スキーマ適用の意味とは、スキーマ検証とも呼ばれる Delta Lake の予防手段のことです。テーブルへの書き込み時に、スキーマに一致しないものを拒否することでデータの品質を確保します。予約客だけが入れるレストランで受付係が予約の有無を確認するように、データ列がテーブルに挿入される際に、定められたリストを確認して該当のないものは拒否します。

スキーマ適用の動作

Delta Lake でのスキーマ検証は writeの実行時に行われます。つまり、テーブルへの新規書き込みの際に、書き込み先テーブルのスキーマとの整合性が必ず確認されます。スキーマに一致しない場合、トランザクションが完全にキャンセルされ、データの書き込みは一切行われません。また、例外がスローされ、不一致のあった場合はユーザーに通知されます。

Delta Lake では、テーブルに書き込まれる DataFrame について、次のルールに基づいて整合性の判別を行います。

  • 書き込み先テーブルのスキーマに一致しない列を含んでいない:追加されるデータが、書き込み先テーブルの全ての列に一致しない場合は拒否されません。そのような列には単に null 値が割り当てられます。
  • 書き込み先テーブルの列データ型に一致しない列データ型を含んでいない:たとえば、書き込み先テーブルの列に StringTypeのデータがあり、DataFrame の対応する列のデータが IntegerType である場合は、スキーマ適用によって例外がスローされ、書き込み操作の実行が拒否されます。
  • 大文字と小文字で列名を区別した列を含んでいない:同じテーブルに「Foo」列と「foo」列を含めることはできません。Spark では、大文字と小文字を区別するモードと区別しないモード(デフォルト)を使用できます。一方、Delta Lake では、スキーマの保存時に大文字と小文字の違いが維持されるものの、区別はされません。列情報を Parquet 形式で保存、または同形式に変換する際には、大文字と小文字が区別されます。この制限は、データブリックス社内でデータの破損や損失の問題を経験したことから、そのような恐れを未然に防ぐために導入されたものです。

次に、Delta Lake のテーブルに、スキーマと一致しない列を書き込もうとした場合にどうなるかを示します。下のコードを確認してください。

# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Show original DataFrame's schema
original_loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
"""
 
# Show new DataFrame's schema
loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
"""
 
# Attempt to append new DataFrame (with new column) to existing table
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

""" Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

"""

Delta Lake のスキーマ適用により、新しい列が自動で追加されることなく、書き込みの実行が停止されました。不一致のあった列が分かるように、スタックトレースに両方の列が出力され、比較できるようになっています。

スキーマ適用の効果

スキーマ適用は非常に厳格なチェックが行えるツールです。そのため、データセットを完全に変換して実運用や利活用が行える品質の高いデータが必要な場合に適しています。例えば、以下のような用途のデータ入力テーブルに使用できます。

  • 機械学習アルゴリズム
  • BI ダッシュボード
  • データ分析と視覚化ツール
  • 高度な構造化、強い型付け、セマンティックスキーマを必要とする実運用システム

これらの用途にデータを対応させるため、テーブルに構造を徐々に追加していく「マルチホップ」アーキテクチャが多く採用されています。詳細については、ブログ「Delta Lakeを使用した機械学習の実運用化(Productionizing Machine Learning with Delta Lake)」をご覧ください。

スキーマ適用(schema)はパイプラインのどの段階でも行えますが、注意が必要な場合もあります。例えば、新規データに 1 列追加するのを失念したため、ストリーミングデータのテーブルへの書き込みが失敗するケースなどです。

スキーマ適用の必要性:データの希薄化の防止

「スキーマの不一致」は煩わしいものです。特に Delta Lake を使い始めた頃には、予期せぬエラーでワークフローが立ち行かなくなり、混乱することもあるかも知れません。そうなってくると、いっその事、無条件にDataFrameを書き込めるようにして、必要なスキーマ変更を都度行えばいいのではないかという気もしてきます。

ただ、これは賢明な方法ではありません。スキーマに関しては「転ばぬ先の杖」、問題の事前予防が正解です。スキーマ適用を行わない場合、ある時点でデータ型の整合性の問題が顕在化するでしょう。一見したところは同質のデータソースであっても、エッジケースの混入、列やマッピングの破綻など、気づきにくい問題が生じている可能性があります。コードの運用が始まってから問題が明らかになっても、その時点では原因箇所の追求が難しくなってしまいます。ここはやはり、スキーマ適用を使用してデータ投入の段階で問題を解決しておくべきです。

スキーマ適用を使用しておけば、自身で明示的に変えない限り、テーブルスキーマに変更のないことが保証されます。また、新しい列が頻繁に追加されることで、元々のデータの有用性や明瞭性が損なわれる「データの希薄化」を防ぐこともできます。意図的に厳格なルールを定め、高い品質を前提とすることで、テーブルの状態を真正に保つことができるのです。そして、それこそがスキーマ適用の狙いです。

もし、熟慮のうえで新しい列の追加が必要だと判断される場合には、コードを1行変更するだけで簡単に行える方法があります。それがスキーマ展開です。

スキーマ展開とは

スキーマ展開とは、テーブルの既存スキーマの変更を容易に行い、時間の経過につれて発生する変更についてもカバーする機能です。一般的な用途として、データの追加や上書きの操作時に使用することが想定されており、スキーマに新しい列が含まれている場合に自動的に機能します。

スキーマ展開の動作

スキーマ適用においては、スキーマが一致しない場合に列の追加が拒否されました。スキーマ展開を使用すれば、そのような場合でも新しい列の追加が行えます。スキーマ適用は、 .option('mergeSchema','true') を、Sparkの .write コマンドまたは.writeStreamコマンドに追加することで有効になります。

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

次の Spark SQL ステートメントを実行すれば、プロットを表示させて、書き込みできたことが確認できます。

# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

スキーマの適用と展開を正常に使用した、米国の州ごとのローンの数を示す棒グラフ

このオプションは、spark.databricks.delta.schema.autoMerge= True を Spark の設定に追加して、Spark セッション全体に設定することもできます。ただし、意図的でないスキーマの不一致についても、スキーマ適用からの警告が上がらないようになるため、注意して使用してください。

mergeSchema オプションをクエリに含めることで、DataFrameに書き込み先テーブルと一致しない列があっても、書き込みトランザクションの一部としてスキーマの最後に列が追加されます。ネストされたフィールドを追加することもでき、それらのフィールドは各構造体の列の最後に追加されます。

このオプションを使用することで、既存の列を参照するモデルに影響を及ぼすことなく、運用中のテーブルに新しい列を追加できます。例えば、データサイエンティストやエンジニアが、新しい追跡指標や直近の売上データの列を追加したい場合などが想定されます。

テーブルへの追加や上書きの際に、次のようなスキーマ変更が生じる場合は、スキーマ展開を使用できます。

  • 新しい列の追加(最も一般的なシナリオ)
  • NullType からあらゆる型へのデータ型変更。もしくは、ByteType、ShortType、IntegerType へのアップキャスト

スキーマ展開が使用できない変更に対しては、スキーマとデータを上書きするために、.option("overwriteSchema","true") を追加する必要があります。たとえば、「Foo」列のデータ型が integer で、新しいスキーマでは文字列型になる場合は、Parquet 形式の全てのデータファイルを再度書き込まなければなりません。次のような変更が該当します。

  • 列のドロップ
  • 既存の列のデータ型変更
  • 大文字と小文字で区別された列名の変更(例:「Foo」と「foo」)

新たにリリースされる Spark3.0 では、明示的な DDL(ALTER TABLE を使用)が完全にサポートされます。ユーザーがテーブルスキーマに対して次のアクションを実行できるようになります。

  • 列の追加
  • 列のコメントの変更
  • トランザクションログの保持期間の設定など、テーブルの動作を定義するテーブルプロパティの設定

スキーマ展開の効果

データベース(DB)のスキーマ展開は、DataFrame に意図せずに不適切な列を追加するのとは逆で、意図的にテーブルのスキーマを変更したい場合に使用するものです。正しい列名やデータ型が自動的に追加され、それらを明示的に宣言する必要もないため、スキーマの変更を簡単に行えます。

まとめ

スキーマ適用では、テーブルとの整合性が取れない列の追加やその他のスキーマ変更を拒否することで高い基準を設定して維持します。これにより、アナリストやエンジニアがデータの整合性に依拠して合理的な考察を行い、適切に判断できるようになります。

一方、スキーマ展開を使用することで、意図したスキーマ変更を自動で行えるようになり、スキーマ適用を補完することができます。そして最終的には、列の追加が簡単に行えるようになるはずです。

スキーマ適用を陰とすれば、スキーマ展開は陽です。この2つのツールを組み合わせて使用することで、スムーズに不要なデータを排除し、適切な状態に調整できるようになります。

このブログは、Mukul Murthy と Pranav Anand の協力により執筆したものです。

オープンソースの Delta Lake についてさらに詳しく
さらに詳しい情報は、Delta Lake オンラインハブからアクセスできます。最新コードのダウンロード、Delta Lake コミュニティへの参加も可能です。ぜひご覧ください。

関連リンク

このシリーズの記事:
Delta Lake 詳細編 #1: トランザクションログの展開
Delta Lake 詳細編 #2: スキーマの適用と展開
Delta Lake 詳細編 #3: DML の内部(更新,、削除、マージ)

Delta Lake を使用した機械学習の実運用化
データレイクとは

Databricks 無料トライアル 使ってみる

ご登録