メインコンテンツへジャンプ

Python ユーザー定義テーブル関数(UDTFs)の紹介

Pythonのユーザー定義テーブル関数(UDTFs)とは何か?
アリソン・ワン
ダニエル�・テネドリオ
上新 卓也
アラン・フォルティング
Share this post

Apache Spark™ 3.5とDatabricks Runtime 14.0は、エキサイティングな機能をもたらした:Pythonのユーザー定義テーブル関数(UDTFs)です。 このブログでは、UDTFとは何か、なぜUDTFは強力なのか、そしてどのようにUDTFを使うことができるのかについて説明する。

Pythonのユーザー定義テーブル関数(UDTF)とは?

Pythonのユーザー定義テーブル関数(UDTF)は、出力として単一のスカラー結果値の代わりにテーブルを返す新しい種類の関数です。 一度登録されると、SQLクエリのFROM句に登場させることができる。

各Python UDTFは0個以上の引数を受け入れ、各引数は整数や文字列のような定数スカラー値である。 関数本体は、これらの引数の値を調べて、どのデータを返すべきかを決定することができる。

PythonのUDTFを使うべき理由

要するに、複数の行や列を生成する関数が必要で、Pythonの豊富なエコシステムを活用したいのであれば、Python UDTFが適しているということです。

Python UDTFとPython UDFの比較

SparkのPython UDFは、それぞれ0個以上のスカラー値を入力として受け入れ、単一の値を出力として返すように設計されているが、UDTFはより柔軟性がある。 これらは複数の行や列を返すことができ、UDFの機能を拡張する。

Python UDTFとSQL UDTFの比較

SQLのUDTFは効率的で汎用性が高いが、Pythonはより豊富なライブラリとツールを提供している。 高度な技術を必要とする変換や計算(統計関数や機械学習の推論など)には、Pythonが際立っている。

Python UDTFの作成方法

基本的なPythonのUDTFを見てみよう:

from pyspark.sql.functions import udtf

@udtf(returnType="num: int, squared: int")
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

上のコードでは、入力として2つの整数を受け取り、出力として2つの列(元の数とその2乗)を生成する単純なUDTFを作成した。

UDTFを実装する最初のステップは、クラスを定義することである。

class SquareNumbers:

次に、UDTFのevalメソッドを実装する必要がある。 これは計算を行い行を返すメソッドで、関数の入力引数を定義する。

def eval(self, start: int, end: int):
    for num in range(start, end + 1):
        yield (num, num * num)

Python UDTFでは、結果が適切に処理されるように、戻り値の型がタプルか オブジェクトである必要が あります 。

最後に、クラスを UDTF としてマークするには、@udtfデコレーターを使用して UDTF の戻り値の型を定義します。 戻り値の型は、ブロック書式を持つStructTypeか、Sparkでブロック書式を持つStructTypeを表すDDL文字列でなければならないことに注意してください。

@udtf(returnType="num: int, squared: int")

Python UDTFの使い方

Pythonにて

クラス名を使ってUDTFを直接呼び出すことができる。

from pyspark.sql.functions import lit

SquareNumbers(lit(1), lit(3)).show()

+---+-------+
|num|squared|
+---+-------+
|  1|      1|
|  2|      4|
|  3|      9|
+---+-------+

SQLの場合

まず、Python UDTFを登録する:

spark.udtf.register("square_numbers", SquareNumbers)

そして、SQLでクエリのFROM句の中でテーブル値関数として使うことができる:

spark.sql("SELECT * FROM square_numbers(1, 3)").show()

+---+-------+
|num|squared|
+---+-------+
|  1|      1|
|  2|      4|
|  3|      9|
+---+-------+

Arrowに最適化されたPython UDTF

Apache Arrowは、JavaとPythonのプロセス間で効率的なデータ転送を可能にする、インメモリーのカラム型データフォーマットである。 UDTFが多くの行を出力する場合、パフォーマンスを大幅に向上させることができる。 矢印の最適化はuseArrow=Trueで有効にできます。

from pyspark.sql.functions import lit, udtf

@udtf(returnType="num: int, squared: int", useArrow=True)
class SquareNumbers:
    ...

LangChainを使った実際の使用例

上記の例は基本的なものだと感じるかもしれない。 PythonのUDTFとLangChainを統合する楽しい例で、さらに深く掘り下げてみよう。

from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from pyspark.sql.functions import lit, udtf

@udtf(returnType="keyword: string")
class KeywordsGenerator:
    """
    Generate a list of comma separated keywords about a topic using an LLM.
    Output only the keywords.
    """
    def __init__(self):
        llm = OpenAI(model_name="gpt-4", openai_api_key=<your-key>)
        prompt = PromptTemplate(
            input_variables=["topic"],
            template="generate a couple of comma separated keywords about {topic}. Output only the keywords."
        )
        self.chain = LLMChain(llm=llm, prompt=prompt)

    def eval(self, topic: str):
        response = self.chain.run(topic)
        keywords = [keyword.strip() for keyword in response.split(",")]
        for keyword in keywords:
            yield (keyword, )

これでUDTFを呼び出すことができる:

KeywordsGenerator(lit("apache spark")).show(truncate=False)

+-------------------+
|keyword            |
+-------------------+
|Big Data           |
|Data Processing    |
|In-memory Computing|
|Real-Time Analysis |
|Machine Learning   |
|Graph Processing   |
|Scalability        |
|Fault Tolerance    |
|RDD                |
|Datasets           |
|DataFrames         |
|Spark Streaming    |
|Spark SQL          |
|MLlib              |
+-------------------+

Python UDTFを今すぐ始めよう

複雑なデータ変換を行うにせよ、データセットを充実させるにせよ、単にデータを分析する新しい方法を探るにせよ、Python UDTFはあなたのツールキットに加える価値のあるものです。 このノートブックを試してみて ください。

今後の課題

この機能はPython UDTFプラットフォームの始まりに過ぎない。 Apache Sparkでは現在、さらに多くの機能が開発中で、将来のリリースで利用可能になる予定だ。 例えば、「サポート」が可能になる:

  • UDTF呼び出しが、各呼び出しに提供された特定の引数(提供された入力引数のタイプやリテラルスカラー引数の値を含む)に応答して、その出力スキーマを動的に計算することができる多相性解析。
  • TABLE キーワードを使用した SQL FROM 節で、入力リレーション全体を UDTF 呼び出しに渡す。 これは、カタログテーブルの直接参照だけでなく、任意のテーブルのサブクエリでも動作します。 evalメソッドでUDTFクラスの同じインスタンスが消費する入力テーブルのサブセットを定義するために、各クエリで入力テーブルのカスタムパーティショニングを指定することが可能になります。
  • 任意のUDTF呼び出しに対して、クエリーのスケジューリング時に一度だけ任意の初期化を実行し、その状態を将来のすべてのクラス・インスタンスに伝搬して、将来消費する。 これは、最初の静的な"analyze" メソッドによって返されるUDTF出力テーブルスキーマは、同じクエリに対する将来のすべての__init__呼び出しによって消費されることを意味します。
  • 他にも興味深い機能がたくさんあります!ぜひいろいろお試しください!
Databricks 無料トライアル

関連記事

Apache Spark™ 3.5のご紹介

翻訳:Junichi Maruyama. - Original Blog Link 本日、Databricks Runtime 14.0の一部として、Databricks上でApache Spark™ 3.5が利用可能になったことを発表いたします。Spark 3.5のリリースに多大な貢献をしていただいたApache Sparkコミュニティに深く感謝いたします。 Sparkをこれまで以上にアクセスしやすく、多用途で効率的なものにするという我々のミッションに沿った今回のアップデートには、以下のような新機能と改良が盛り込まれています: The English SDK for Apache Spark enables users to...
エンジニアリングのブログ一覧へ