Apache Spark™ 3.0 のデータ型:日付とタイムスタンプ

Apache Spark は、構造化データと非構造化データの処理に使用される非常に一般的なツールです。構造化データの処理に関しては、整数、LONG、DOUBLE、STRING といった多くの基本的なデータ型をサポートしています。Spark は、開発者が理解するのが難しいことが多い DATETIMESTAMP などの複雑なデータ型もサポートしています。このブログでは、日付型とタイムスタンプ型について深く掘り下げ、その動作と一般的な問題を回避する方法を解説します。主に、次の 4 つの部分をカバーしています。

  1. 日付型と関連する暦法の定義と Spark 3.0 から適用された暦法の変更について
  2. タイムスタンプ型の定義とタイムゾーンとの関係(タイムゾーンオフセットの解消に関する詳細と、Spark 3.0 で使用される Java 8 の新しい Time API の若干の動作変更についても説明します。)
  3. Spark で日付値とタイムスタンプ値を作成するための共通 API
  4. Spark ドライバで日付とタイムスタンプのオブジェクトを収集する際のよくある落とし穴とベストプラクティス

日付と暦法

日付型(DATE)の定義は非常にシンプルで、YEAR = 2012、MONTH = 12、DAY = 31 というように年、月、日のフィールドの組み合わせです。ただし、年、月、日の各フィールド値には制約があり、日付値は現実世界で有効な日付になります。例えば、MONTH 値は 1~12 でなければならず、DAY 値は 1~28/29/30/31(年と月によって異なる)である必要があります。

これらの制約は、使用可能な多くの暦法のいづれかによって定義されます。太陰暦のような特定の地域でのみ使用されるものや、ユリウス暦のように歴史の中でのみ使用されているものもあります。現在用いられているグレゴリオ暦 は事実上の国際標準であり、世界中のほぼ全ての場所で民間目的で使用されています。グレゴリオ暦は 1582 年に導入され、1582 年以前の日付にも適用されています。この 1582 年以前の日付に適用した暦法は、先発グレゴリオ暦と呼ばれています。

Spark 3.0 以降では、pandas、R、Apache Arrow などの他のデータシステムで既に用いられている先発グレゴリオ暦を使用しています。Spark 3.0 以前では、1582 年より前の日付ではユリウス暦、1582 年より後の日付にはグレゴリオ暦というように、ユリウス暦とグレゴリオ暦の組み合わせを使用していました。これは、レガシー java.sql.Date API から継承され、Java 8 では先発グレゴリオ暦を使用する java.time.LocalDate に置き換えられました。

日付型はタイムゾーンを考慮しないことは注目すべき点です。

タイムスタンプとタイムゾーン

タイムスタンプ型(TIMESTAMP)は、日付型を新しいフィールドを用いて拡張します。フィールド値には、時、分、秒(小数部分を保有)と、グローバル(セッションスコープ)タイムゾーンがあります。これにより、地球上の絶対的な時刻を定義します。例えば、YEAR = 2012、MONTH = 12、DAY = 31、HOUR = 23、MINUTE = 59、SECOND = 59.123456 と、セッションタイムゾーンが UTC+01:00 というように表します。タイムスタンプ値を Parquet のような非テキストのデータソースに書き出す場合、その値は UTC のタイムスタンプなどのタイムゾーン情報を持たない単なる瞬間です。異なるセッションタイムゾーンでタイムスタンプ値を読み書きすると、時、分、秒フィールド値が異なることがありますが、実際には同じ絶対的な時刻です。

時、分、秒の各フィールドの標準範囲は、時間の場合は 0~23、分と秒の場合は 0~59 です。Spark では、最大マイクロ秒の精度で秒の小数部分をサポートします。小数部分の有効範囲は、0 から 999,999 マイクロ秒です。

どんな絶対的な時刻でも、タイムゾーンに応じたさまざまな値をウォールクロック(実時間)で観察できます。

例えば、ウォールクロックのセットは、さまざまな時刻を表すことができます。Apache Spark 3.0 のタイムゾーンオフセット機能を使用すると、ローカルタイムスタンプを時刻に明確に結び付けます。

反対に、ウォールクロックの値は、多くの異なる時刻を表すことも可能です。タイムゾーンのオフセットを使用すると、ローカルタイムスタンプを時刻に明確に結び付けます。通常、タイムゾーンのオフセットは、グリニッジ標準時(GMT)または UTC+0協定世界時)からの時間単位のオフセットとして定義されます。このようなタイムゾーン情報の表現はあいまいさを排除しますが、エンドユーザーにとっては不便です。ユーザーは、America/Los_AngelesEurope/Paris などの地域で示すことを好みます。

ゾーンのオフセットを用いた追加レベルの抽象化は、作業を容易にしますが、新たな問題をもたらします。例えば、タイムゾーン名をオフセットにマッピングするための特別なタイムゾーンデータベースを維持する必要があります。Spark は JVM(Java 仮想マシン)上で実行されるため、マッピングを Java 標準ライブラリに委任し、Java 標準ライブラリは インターネット番号割当機関のタイムゾーンデータベース(IANA TZDB) からデータをロードします。さらに、Java の標準ライブラリのマッピングメカニズムには、Spark の動作に影響を与える微妙な差異があります。以下では、Spark の動作に影響を与える差異のいくつかをご紹介します。

Java 8 以降、JDK(Java 開発キット)が日時の操作とタイムゾーンのオフセットを解消するための新しい API を公開しており、Spark 3.0 でこの新しい API に移行しました。タイムゾーン名からオフセットへのマッピングのソースが IANA TZDB であることは同じですが、Java 8 以降と Java 7 では実装方法が異なります。

例として、 America/Los_Angeles のタイムゾーンで、1883 年以前のタイムスタンプ 1883-11-10 00:00:00 を見てみましょう。この年は他の年よりも際立っています。なぜなら、1883年11月18日に北米の全ての鉄道で新しい標準時制に切り替わり、それ以降、時刻表を管理するようになりました。Java 7 の日時 API を使用すると、ローカルタイムスタンプのタイムゾーンのオフセットを -08:00 として取得できます。

scala> java.time.ZoneId.systemDefault
res0: java.time.ZoneId = America/Los_Angeles
scala> java.sql.Timestamp.valueOf("1883-11-10 00:00:00").getTimezoneOffset / 60.0
res1: Double = 8.0

しかし、Java 8 の API 関数は、異なる結果を返します。

scala> java.time.ZoneId.of("America/Los_Angeles")
.getRules.getOffset(java.time.LocalDateTime.parse("1883-11-10T00:00:00"))
res2: java.time.ZoneOffset = -07:52:58

1883年11月18日以前において、時刻は地域の問題でした。ほとんどの都市や町では、何らかの形でその地域の太陽時を使用しており、教会の尖塔や宝石商の窓といったよく知られた時計によって維持されていました。そのため、このような奇妙なタイムゾーンのオフセットが生じます。

この例は、Java 8 の関数がより正確で、IANA TZDB からの履歴データを考慮に入れていることを示しています。Java 8 の日時 API に切り替えたあと、Spark 3.0 は自動的に改善の恩恵を受け、タイムゾーンのオフセットを解消する方法がさらに正確になりました。

前述したように、Spark 3.0 では、日付型が先発グレゴリオ暦に切り替わりました。タイムスタンプ型についても同じことが言えます。ISO SQL:2016 標準では、タイムスタンプの有効範囲が 0001-01-01 00:00:00 から 9999-12-31 23:59:59.999999 であると宣言されています。Spark 3.0 は標準に完全に準拠しており、この範囲の全てのタイムスタンプをサポートします。Spark 2.4 以前と比較すると、次のサブ範囲に注意する必要があります。

  1. 0001-01-01 00:00:00..1582-10-03 23:59:59.999999
    Spark 2.4 ではユリウス暦が使用され、標準に準拠していません。Spark 3.0 ではこの問題が修正され、年、月、日といったタイムスタンプの内部操作に先発グレゴリオ暦が適用されます。暦法が異なるため、Spark 2.4 に存在する一部の日付は、Spark 3.0 には存在しません。例えば、グレゴリオ暦において 1000 年はうるう年ではないため、1000-02-29 は無効な日付だと言えます。また、Spark 2.4 では、このタイムスタンプの範囲において、タイムゾーン名がゾーンのオフセットに誤って解消されてしまいます。
  2. 1582-10-04 00:00:00..1582-10-14 23:59:59.999999
    これは、Spark 3.0 においてローカルタイムスタンプの有効な範囲であり、このようなタイムスタンプが存在しなかった Spark 2.4 とは異なります。
  3. 1582-10-15 00:00:00..1899-12-31 23:59:59.999999
    Spark 3.0 は、IANA TZDB からの履歴データを使用してタイムゾーンのオフセットを正しく解消します。Spark 3.0 と比較すると、上記の例で示したように、Spark 2.4 ではタイムゾーン名からのゾーンのオフセットが正しく解消されない場合があります。
  4. 1900-01-01 00:00:00..2036-12-31 23:59:59.999999
    Spark 3.0 と Spark 2.4 は、どちらも ANSI SQL 標準に準拠しており、月の特定の日を取得するといった日時の操作においてグレゴリオ暦を使用します。
  5. 2037-01-01 00:00:00..9999-12-31 23:59:59.999999
    Spark 2.4 では、JDK のバグ(#8073446)が原因で、タイムゾーンのオフセット、特に夏時間のオフセットが正しく解消されないことがあります。Spark 3.0 には、この問題はありません。

タイムゾーン名をオフセットにマッピングするもう 1 つの側面として、夏時間(DST:Daylight Saving Time)または別の標準タイムゾーンのオフセットへの切り替えが原因で発生する可能性のあるローカルタイムスタンプの重複です。例えば、3 November 2019, 02:00:00 に時計の針を1 時間戻して 01:00:00 にします。America/Los_Angeles におけるローカルタイムスタンプ Spark 3.0 でタイムゾーン名をオフセットにマッピングする場合、夏時間に切り替えると、ローカルタイムスタンプが重複する可能性があります。可能であれば、タイムスタンプを作成するときに正確なタイムゾーンのオフセットを指定することをお勧めします。2019-11-03 01:30:00 は、2019-11-03 01:30:00 UTC-08:00 または 2019-11-03 01:30:00 UTC-07:00 にマッピングされます。オフセットを指定せず、タイムゾーン名(例えば '2019-11-03 01:30:00 America/Los_Angeles')だけを設定した場合、Spark 3.0 は以前のオフセット(通常は "夏" に対応)を取得します。この動作は、"冬" のオフセットを取る Spark 2.4 とは異なります。時間を前に進める場合の有効なオフセットはありません。通常 1 時間の夏時間への変更は、Spark では "夏" の時間に対応する次の有効なタイムスタンプに移動します。

上記の例からわかるように、タイムゾーン名からオフセットへのマッピングはあいまいであり、1 対 1 ではありません。可能な場合には、タイムスタンプを作成する際に、TIMESTAMP '2019-11-03 01:30:00 UTC-07:00' のように正確なタイムゾーンのオフセットを指定することをお勧めします。

次に、ANSI SQL 標準について見てみましょう。ANSI SQL 標準では、次の 2 つのタイムスタンプ型を定義します。

  1. TIMESTAMP WITHOUT TIME ZONE または TIMESTAMP:年、月、日、時、分、秒でのローカルタイムスタンプ。これらのタイムスタンプ型は、どのタイムゾーンにも結び付けられず、実際にはウォールクロックのタイムスタンプです。
  2. TIMESTAMP WITH TIME ZONE:年、月、日、時、分、秒、TIMEZONE_HOUR、TIMEZONE_MINUTE でのゾーンタイムスタンプ。タイムスタンプは、 UTC タイムゾーンの時刻 + 各値に関連付けられたタイムゾーンのオフセット(時間と分単位)を表します。

TIMESTAMP WITH TIME ZONE のタイムゾーンオフセットは、タイムスタンプが表す物理的な時点には影響しません。これは、他のタイムスタンプのコンポーネントによって指定された UTC の時刻によって完全に表されるためです。代わりに、タイムゾーンのオフセットは、表示用タイムスタンプ値の初期設定の動作、日付/時刻コンポーネントの抽出(例:EXTRACT)、およびタイムスタンプへの月の追加というようなタイムゾーンの把握を必要とするその他の操作にのみ影響します。

Spark SQL は、タイムスタンプ型を TIMESTAMP WITH SESSION TIME ZONEとして定義します。フィールド(YEARMONTHDAYHOURMINUTESECONDSESSION TZ)の組み合わせであり、YEAR から SECOND フィールドは、UTC タイムゾーンにおける時刻を識別し、SESSION TZ は、SQL 設定 spark.sql.session.timeZone から取得されます。セッションタイムゾーンは、次のように設定できます。

  • ゾーンのオフセット '(+|-)HH:mm':この形式により、物理的な時点を明確に定義できます。
  • 地域 ID 'area/city' 形式のタイムゾーン名(例:'America/Los_Angeles'):この形式のタイムゾーン情報には、ローカルタイムスタンプの重複などの上述したいくつかの問題があります。ただし、各 UTC の時刻は、任意の地域 ID の 1 つのタイムゾーンのオフセットに明確に関連付けられているため、地域 ID ベースのタイムゾーンを持つ各タイムスタンプは、ゾーンのオフセットを持つタイムスタンプに明確に変換できます。

初期設定では、セッションタイムゾーンは、Java 仮想マシンの既定のタイムゾーンに設定されます。

Spark の TIMESTAMP WITH SESSION TIME ZONE は、以下とは異なります。

  1. TIMESTAMP WITHOUT TIME ZONE:このタイプの値は複数の物理的な時刻にマッピングできますが、TIMESTAMP WITH SESSION TIME ZONE 値は、絶対的な物理的時刻になります。SQL 型は、全てのセッションで UTC+0 などの 1 つの固定タイムゾーンのオフセットを使用してエミュレートできます。その場合、UTC のタイムスタンプをローカルタイムスタンプとみなすことができます。
  2. TIMESTAMP WITH TIME ZONE:SQL 標準によると、この型の列値は異なるタイムゾーンのオフセットを持つ可能性があります。これは、Spark SQL ではサポートされていません。

グローバル(セッションスコープ)タイムゾーンに関連付けられているタイムスタンプは、Spark SQL によって新たに発明されたものではないことに注意してください。Oracle などの RDBMS(リレーショナルデータベース管理システム)では、タイムスタンプにも同様の型(TIMESTAMP WITH LOCAL TIME ZONE)を提供します。

日付とタイムスタンプの作成

Spark SQL には、日付とタイムスタンプの値を作成するためのメソッドがいくつか用意されています。

  1. パラメータのない既定のコンストラクタ:CURRENT_TIMESTAMP()CURRENT_DATE()
  2. 他のプリミティブな Spark SQL 型:INTLONGSTRING
  3. 外部型:Python の DATETIME や Java クラスの java.time.LocalDate/Instant
  4. CSV、JSON、Avro、Parquet、ORC などのデータソースからの逆シリアル化

Spark 3.0 で導入された関数 MAKE_DATE は、YEARMONTHDAY の 3 つのパラメータから DATE 値を生成します。全ての入力パラメータは、可能な限り暗黙的に INT 型に変換されます。この関数は、変換された日付が先発グレゴリオ暦で有効な日付であるかどうかをチェックし、有効でない場合は NULL を返します。例として、PySpark では次のようになります。

>>> spark.createDataFrame([(2020, 6, 26), (1000, 2, 29), (-44, 1, 1)],
... ['Y', 'M', 'D']).createTempView('YMD')
>>> df = sql('select make_date(Y, M, D) as date from YMD')
>>> df.printSchema()
root
 |-- date: date (nullable = true)

DataFrame コンテンツを出力するには、show() アクションを呼び出します。これにより、エグゼキュータで日付を文字列に変換し、その文字列をドライバに転送してコンソールに出力します。

>>> df.show()
+-----------+
|       date|
+-----------+
| 2020-06-26|
|       null|
|-0044-01-01|
+-----------+	

同様に、MAKE_TIMESTAMP 関数を介してタイムスタンプ値を作成できます。MAKE_DATE と同様に、日付フィールドに対して同じ検証を実行し、さらに、時間フィールドとして HOUR (0-23)MINUTE (0-59)SECOND (0-60) を受け入れます。秒(SECOND)は、DECIMAL 型(精度 = 8、少数点桁数 = 6)で、マイクロ秒の精度まで小数部分を渡すことができます。例として、PySpark では次のようになります。

>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30.123456),
... (1582, 10, 10, 0, 1, 2.0001), (2019, 2, 29, 9, 29, 1.0)],
... ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND'])
>>> df.show()
+----+-----+---+----+------+---------+
|YEAR|MONTH|DAY|HOUR|MINUTE|   SECOND|
+----+-----+---+----+------+---------+
|2020|    6| 28|  10|    31|30.123456|
|1582|   10| 10|   0|     1|   2.0001|
|2019|    2| 29|   9|    29|      1.0|
+----+-----+---+----+------+---------+

>>> ts = df.selectExpr("make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND) as MAKE_TIMESTAMP")
>>> ts.printSchema()
root
 |-- MAKE_TIMESTAMP: timestamp (nullable = true)                

日付で行ったように、ts DataFrame の内容を show() アクションを使用して出力してみましょう。同様に、show() はタイムスタンプを文字列に変換しますが、SQL 設定 spark.sql.session.timeZone で定義されたセッションタイムゾーンが考慮されるようになりました。次の例で確認してみましょう。

>>> ts.show(truncate=False)
+--------------------------+
|MAKE_TIMESTAMP            |
+--------------------------+
|2020-06-28 10:31:30.123456|
|1582-10-10 00:01:02.0001  |
|null                      |
+--------------------------+

2019 年はうるう年ではないためこの日付は無効ということになり、Spark は最後のタイムスタンプを作成できません。

上記の例では、タイムゾーン情報が提供されていないことに気付かれたかもしれません。その場合、Spark は SQL 設定 spark.sql.session.timeZone からタイムゾーンを取得し、関数呼び出しに適用します。MAKE_TIMESTAMP の最後のパラメータとして渡すことで、別のタイムゾーンを選択することも可能です。PySpark の例を次に示します。

>>> df = spark.createDataFrame([(2020, 6, 28, 10, 31, 30, 'UTC'),
...     (1582, 10, 10, 0, 1, 2, 'America/Los_Angeles'),
...     (2019, 2, 28, 9, 29, 1, 'Europe/Moscow')],
...     ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'TZ'])
>>> df = df.selectExpr('make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as MAKE_TIMESTAMP')
>>> df = df.selectExpr("date_format(MAKE_TIMESTAMP, 'yyyy-MM-dd HH:mm:SS VV') AS TIMESTAMP_STRING")
>>> df.show(truncate=False)
+---------------------------------+
|TIMESTAMP_STRING                 |
+---------------------------------+
|2020-06-28 13:31:00 Europe/Moscow|
|1582-10-10 10:24:00 Europe/Moscow|
|2019-02-28 09:29:00 Europe/Moscow|
+---------------------------------+

この例が示すように、Spark は指定されたタイムゾーンを考慮に入れますが、全てのローカルタイムスタンプをセッションタイムゾーンに調整します。MAKE_TIMESTAMP 関数に渡された元のタイムゾーンは、失われます。TIMESTAMP WITH SESSION TIME ZONE 型は、全ての値が 1 つのタイムゾーンに属していると仮定し、値ごとにタイムゾーンを保存しないためです。TIMESTAMP WITH SESSION TIME ZONE の定義によると、Spark はローカルタイムスタンプを UTC タイムゾーンに保存し、日時フィールドの抽出やタイムスタンプを文字列へ変換する際には、セッションタイムゾーンを使用します。

また、タイムスタンプは、キャストを介して LONG 型から作成できます。LONG 列にエポック 1970-01-01 00:00:00Z からの秒数が含まれている場合は、Spark SQL の TIMESTAMP にキャストできます。

spark-sql> select CAST(-123456789 AS TIMESTAMP);
1966-02-02 05:26:51

残念ながら、このアプローチでは秒の小数部分を指定できません。将来的に、Spark SQL においてエポックからの秒、ミリ秒、およびマイクロ秒からタイムスタンプを作成する特別な関数(timestamp_seconds()timestamp_millis()timestamp_micros())が提供される予定です。

もう 1 つの方法では、STRING 型の値から日付とタイムスタンプを作成します。特別なキーワードを使用してリテラルを作成できます。

spark-sql> select timestamp '2020-06-28 22:17:33.123456 Europe/Amsterdam', date '2020-07-01';
2020-06-28 23:17:33.123456	2020-07-01

または、キャストを介して列内の全ての値に適用できます。

spark-sql> select cast('2020-06-28 22:17:33.123456 Europe/Amsterdam' as timestamp), cast('2020-07-01' as date);
2020-06-28 23:17:33.123456	2020-07-01

入力されたタイムスタンプの文字列は、指定されたタイムゾーンのローカルタイムスタンプとして解釈されるか、入力した文字列でタイムゾーンが省略されている場合は、セッションタイムゾーンで解釈されます。異常なパターンを持つ文字列は、to_timestamp() 関数を使用してタイムスタンプに変換できます。サポートされているパターンについては、「書式設定と解析のための Datetime パターン」をご参照ください。

spark-sql> select to_timestamp('28/6/2020 22.17.33', 'dd/M/yyyy HH.mm.ss');
2020-06-28 22:17:33

パターンを指定しない場合、関数は CAST と同様に動作します。

ユーザビリティのために、Spark SQL は、文字列を受け取り、タイムスタンプと日付を返す上述した全てのメソッドにおいて、特別な文字列値を認識します。

  • epoch:日付 '1970-01-01'、または、タイムスタンプ ‘1970-01-01 00:00:00Z’ のエイリアスです。
  • now:セッションタイムゾーンにおける現在のタイムスタンプ、または、日付です。1 つのクエリ内では、常に同じ結果が生成されます。
  • todayTIMESTAMP 型の現在の日付の始め、または、DATE 型の現在の日付です。
  • tomorrow:タイムスタンプの場合は翌日の始め、または、DATE 型の場合は翌日です。
  • yesterday:現在の日付の前日、または、TIMESTAMP 型の始めです。

例をいくつかご紹介します。

spark-sql> select timestamp 'yesterday', timestamp 'today', timestamp 'now', timestamp 'tomorrow';
2020-06-27 00:00:00	2020-06-28 00:00:00	2020-06-28 23:07:07.18	2020-06-29 00:00:00
spark-sql> select date 'yesterday', date 'today', date 'now', date 'tomorrow';
2020-06-27	2020-06-28	2020-06-28	2020-06-29

Spark の優れた機能の 1 つは、ドライバ側の既存の外部オブジェクトのコレクションから Datasets を作成し、対応する型の列を作成することです。Spark は、外部型のインスタンスを意味的に同等の内部表現に変換します。例えば、PySpark では、Python コレクションから DATE 列と TIMESTAMP 列を含む Dataset を作成できます。

>>> import datetime
>>> df = spark.createDataFrame([(datetime.datetime(2020, 7, 1, 0, 0, 0),
...     datetime.date(2020, 7, 1))], ['timestamp', 'date'])
>>> df.show()
+-------------------+----------+
|          timestamp|      date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+

PySpark は、システムタイムゾーンを使用して、Python の日時オブジェクトをドライバ側の内部 Spark SQL 表現に変換しますが、Spark のセッションタイムゾーン設定 spark.sql.session.timeZone とは異なる場合があります。内部値には、元のタイムゾーンに関する情報は含まれていません。並列化された日付値とタイムスタンプ値に対する今後の操作では、TIMESTAMP WITH SESSION TIME ZONE 型定義に従って Spark SQL のセッションタイムゾーンのみが考慮されます。

上記で Python コレクションについて示したのと同様の方法で、Spark は次の型を Java/Scala API の外部日時型として認識します。

  • java.sql.Date および java.time.LocalDate:Spark SQL の日付型の外部型として認識
  • java.sql.Timestamp および java.time.Instant:タイムスタンプ型として認識

java.tsql.* 型と java.time.* 型には、違いがあります。java.time.LocalDate java.time.Instant は Java 8 で追加され、その型は先発グレゴリオ暦(Spark 3.0 から使用される暦法と同じ)に基づいています。java.sql.Datejava.sql.Timestamp は、Spark 3.0 以前に使用されていた別の暦法(ユリウス暦と 1582-10-15 以降はグレゴリオ暦を組み合わせたハイブリッド暦)に基づいています。暦法が異なるため、Spark は内部の Spark SQL 表現への変換中に追加の操作を実行し、入力された日付とタイムスタンプをある暦法から別の暦法へリベース(再配置)する必要があります。リベース操作は、1900 年以降の最新のタイムスタンプには多少のオーバーヘッドが発生し、古いタイムスタンプではさらに重要になる可能性があります。

以下の例は、Scala コレクションからタイムスタンプを作成する方法を示しています。最初の例では、文字列から java.sql.Timestamp オブジェクトを作成します。valueOf メソッドは、入力された文字列を既定の JVM タイムゾーンのローカルタイムスタンプとして解釈し、Spark のセッションタイムゾーンとは異なる場合があります。特定のタイムゾーンの java.sql.Timestampjava.sql.Date インスタンスの作成が必要な場合は、java.text.SimpleDateFormat(およびその setTimeZone メソッド)、または java.util.Calendar を確認することをお勧めします。

scala> Seq(java.sql.Timestamp.valueOf("2020-06-29 22:41:30"), new java.sql.Timestamp(0)).toDF("ts").show(false)
+-------------------+
|ts                 |
+-------------------+
|2020-06-29 22:41:30|
|1970-01-01 03:00:00|
+-------------------+
scala> Seq(java.time.Instant.ofEpochSecond(-12219261484L), java.time.Instant.EPOCH).toDF("ts").show
+-------------------+
|                 ts|
+-------------------+
|1582-10-15 11:12:13|
|1970-01-01 03:00:00|
+-------------------+

同様に、java.sql.Date または java.LocalDate のコレクションから日付列を作成できます。java.LocalDate インスタンスの並列化は、Spark のセッションタイムゾーン、または JVM の既定タイムゾーンのいずれからも完全に独立していますが、java.sql.Date インスタンスの並列化について同じことを言うことはできません。若干の違いがあります。

  1. java.sql.Date インスタンスは、ドライバ既定の JVM タイムゾーンにおけるローカル日付を表します。
  2. Spark SQL 値への正しい変換を行うには、ドライバとエグゼキュータにおける既定の JVM タイムゾーンが同じである必要があります。
scala> Seq(java.time.LocalDate.of(2020, 2, 29), java.time.LocalDate.now).toDF("date").show
+----------+
|      date|
+----------+
|2020-02-29|
|2020-06-29|
+----------+

暦法とタイムゾーンに関連する問題を回避するために、タイムスタンプや日付の Java/Scala コレクションの並列化において、Java 8 の java.LocalDate/Instant 型を外部型として推奨します。

日付とタイムスタンプの収集

並列化のリバース操作では、エグゼキュータから日付とタイムスタンプを収集しドライバに戻し、外部型のコレクションを返します。上記の例では、 collect() アクションを介して DataFrameをドライバに戻すことができます。

>>> df.collect()
[Row(timestamp=datetime.datetime(2020, 7, 1, 0, 0), date=datetime.date(2020, 7, 1))]

Spark は、日付列とタイムスタンプ列の内部値を UTC タイムゾーンの時刻としてエグゼキュータからドライバに転送し、Spark SQL のセッションタイムゾーンを使用せずに、ドライバのシステムタイムゾーンで Python の日時オブジェクトへの変換を実行します。collect() は、前のセクションで説明した show() アクションとは異なります。show() は、タイムスタンプを文字列に変換するときにセッションタイムゾーンを使用し、ドライバで結果の文字列を収集します。

Java および Scala API では、Spark は初期設定で次の変換を実行します。

  • Spark SQL の DATE 値は、java.sql.Date のインスタンスに変換されます。
  • タイムスタンプは、java.sql.Timestamp のインスタンスに変換されます。

どちらの変換も、ドライバ既定の JVM タイムゾーンで実行されます。このように、Date.getDay()getHour()、および Spark SQL の DAY 関数や HOUR 関数を介して取得できるのと同じ日時フィールドを持つには、ドライバ既定の JVM タイムゾーンとエグゼキュータのセッションタイムゾーンが同じである必要があります。

java.sql.Date/Timestamp から日付とタイムスタンプを作成するのと同様に、Spark 3.0 は先発グレゴリオ暦からハイブリッド暦(ユリウス暦 + グレゴリオ暦)へのリベースを実行します。この操作は、最新の日付(1582 年以降)とタイムスタンプ(1900 年以降)ではほぼありませんがが、古い日付とタイムスタンプには若干のオーバーヘッドが発生する可能性があります。

このような暦法関連の問題を回避し、Spark に Java 8 以降に追加された java.time 型を返すように依頼できます。SQL 設定 spark.sql.datetime.java8API.enabled を true に設定すると、Dataset.collect() アクションが返されます。

  • java.time.LocalDate:Spark SQL の DATE
  • java.time.Instant:Spark SQL の TIMESTAMP

現在は、Java 8 型と Spark SQL 3.0 は、どちらも先発グレゴリオ暦に基づいているため、変換で暦法関連の問題に悩まされることはありません。collect() アクションは、既定の JVM タイムゾーンに依存しません。タイムスタンプの変換は、タイムゾーンに全く依存しません。日付の変換に関しては、SQL 設定 spark.sql.session.timeZone のセッションタイムゾーンを使用します。例として、既定の JVM タイムゾーンを Europe/Moscow に、セッションタイムゾーンを America/Los_Angeles に設定し、DATE 列と TIMESTAMP 列を持つデータセットを見てみましょう。

scala> java.util.TimeZone.getDefault
res1: java.util.TimeZone = sun.util.calendar.ZoneInfo[id="Europe/Moscow",...]

scala> spark.conf.get("spark.sql.session.timeZone")
res2: String = America/Los_Angeles

scala> df.show
+-------------------+----------+
|          timestamp|      date|
+-------------------+----------+
|2020-07-01 00:00:00|2020-07-01|
+-------------------+----------+

show() アクションは、America/Los_Angeles のセッションタイムにタイムスタンプを出力しますが、データセットを収集すると、java.sql.Timestamp に変換され、toString メソッドにより Europe/Moscow が出力されます。

scala> df.collect()
res16: Array[org.apache.spark.sql.Row] = Array([2020-07-01 10:00:00.0,2020-07-01])

scala> df.collect()(0).getAs[java.sql.Timestamp](0).toString
res18: java.sql.Timestamp = 2020-07-01 10:00:00.0

実際には、ローカルタイムスタンプ 2020-07-01 00:00:00 は、UTC で 2020-07-01T07:00:00Z です。Java 8 API を有効にしてデータセットを収集すると、次のことがわかります。

scala> df.collect()
res27: Array[org.apache.spark.sql.Row] = Array([2020-07-01T07:00:00Z,2020-07-01])

java.time.Instant オブジェクトは、グローバル JVM タイムゾーンとは無関係に、追って任意のローカルタイムスタンプに変換できます。これは、java.sql.Timestamp に対する java.time.Instant の利点の 1 つです。前者は、グローバル JVM 設定を変更する必要があり、同じ JVM 上の他のタイムスタンプに影響を与えます。したがって、アプリケーションが異なるタイムゾーンの日付またはタイムスタンプを処理している場合、Java/Scala Dataset.collect() API を介してドライバにデータを収集する際にアプリケーションが互いに衝突ないようにするには、SQL 設定 spark.sql.datetime.java8API.enabled を使用して Java 8 API に切り替えることをお勧めします。

まとめ

今回のブログでは、Spark SQL の DATE 型と TIMESTAMP 型について解説しました。また、他のプリミティブな Spark SQL 型と外部 Java 型から日付列とタイムスタンプ列を作成する方法と、日付列とタイムスタンプ列を外部 Java 型としてドライバに収集する方法を示しました。Spark 3.0 以降、ユリウス暦とグレゴリオ暦を組み合わせたハイブリッド暦から、先発グレゴリオ暦に切り替わりました。(詳細は、SPARK-26651 をご参照ください。)これにより、Spark は上述したような多くの問題を排除できるようになりました。以前のバージョンとの下位互換性を保つため、Spark は、collect のようなアクションからハイブリッド暦(java.sql.Datejava.sql.Timestamp)のタイムスタンプと日付を返します。Java または Scala の collect アクションを使用する際の暦法とタイムゾーンの解消の問題の回避には、Java 8 API を SQL 設定 spark.sql.datetime.java8API.enabled で有効にできます。Databricks Runtime 7.0 の一部として、Databricks の無料トライアル でお試しいただけます。

Sark学習ブック(O’Reilly)

無料の第2版には、pandas UDFのPython型ヒント、新しい日付/時刻の実装など、Spark 3.0のアップデートが含まれています。

Databricks 無料トライアル 使ってみる

ご登録

Data Brew の vidcast で、ぜひ Data + AI の進化を探求ください
動画をみる