Upgrade to Pro — share decks privately, control downloads, hide ads and more …

テーブルデータの推論結果をS3に出力するpipelineをgokartを使って書く / Write a pipeline using gokart to output inference results of table data to S3

テーブルデータの推論結果をS3に出力するpipelineをgokartを使って書く / Write a pipeline using gokart to output inference results of table data to S3

■イベント :【Sansan×エムスリー】gokartで爆速開発!MLOps勉強会
https://sansan.connpass.com/event/288525/

■登壇概要
タイトル:テーブルデータの推論結果をS3に出力するpipelineをgokartを使って書く
発表者: 技術本部 研究開発部 研究員 齋藤 慎一朗

◉ 研究開発職 採用情報
募集中のポジションや関連記事など
https://media.sansan-engineering.com/

◉ Sansan Tech Blog
Sansanのものづくりを支えるメンバーのテックブログ(R&Dメンバーの連載も多数)

https://buildersbox.corp-sansan.com/

Sansan R&D

July 12, 2023
Tweet

More Decks by Sansan R&D

Other Decks in Technology

Transcript

  1. テーブルデータの推論結果をS3に出⼒する
    pipelineを を使って書く
    2023/07/10
    Sansan株式会社 技術本部 研究開発部
    齋藤

    View Slide

  2. ⾃⼰紹介
    東北大学大学院工学研究科博士前期課程修了。日系SIer
    や外資系IT企業で、電力・保険・製薬など多様な業界に
    対する、機械学習を用いたシステムの提案やPoC・開
    発・保守運用に従事。現在は請求書データや企業情報の
    分析を担当。Kaggle Expert。
    齋藤 慎⼀朗 Shinichiro Saito
    研究員
    Twitter
    @sinchir0
    Virtual Card

    View Slide

  3. ➔ pipelineの再現性が担保される
    ➔ classの書き方がメンバー間で共通化されるため、コードレビュ
    ーの負荷が下がる
    gokartを利⽤するメリット

    View Slide

  4. - 機械学習ワークフローライブラリ
    - Luigi(Spotifyが開発しているPythonによるパイプラインパッケージ)のラッパー
    - https://github.com/spotify/luigi
    - 最小単位は下記
    gokartとは
    class ExampleTaskA(gokart.TaskOnKart):
    param = luigi.Parameter()
    int_param = luigi.IntParameter(default=2)
    def run(self):
    self.dump(f'DONE {self.param}_{self.int_param}')
    https://github.com/m3dev/gokart/blob/master/examples/gokart_notebook_example.ipynb より引用

    View Slide

  5. - Taskごとの依存関係をrequiresで表現し、再現性を担保
    gokartとは
    class ExampleTaskC(gokart.TaskOnKart):
    def run(self):
    self.dump('TASKC')
    class ExampleTaskD(gokart.TaskOnKart):
    def run(self):
    self.dump('TASKD')
    class ExampleTaskB(gokart.TaskOnKart):
    param = luigi.Parameter()
    def requires(self):
    return dict(task_c=ExampleTaskC(), task_d=ExampleTaskD())
    def run(self):
    task_c = self.load('task_c')
    task_d = self.load('task_d')
    self.dump(f'DONE {self.param}_{task_c}_{task_d}')
    https://github.com/m3dev/gokart/blob/master/examples/gokart_notebook_example.ipynb より引用
    ExampleTaskC ExampleTaskD
    ExampleTaskB

    View Slide

  6. - gokart.TaskInstanceParameterを使うとtaskを変数として受け取れる
    gokartとは
    class ExampleTaskC(gokart.TaskOnKart):
    def run(self):
    self.dump('TASKC')
    class ExampleTaskD(gokart.TaskOnKart):
    def run(self):
    self.dump('TASKD')
    class ExampleTaskB(gokart.TaskOnKart):
    param = luigi.Parameter()
    task_1 = gokart.TaskInstanceParameter()
    task_2 = gokart.TaskInstanceParameter()
    def requires(self):
    return dict(task_1=self.task_1, task_2=self.task_2) # required tasks are decided from the task parameters `task_1` and `task_2`
    def run(self):
    task_1 = self.load('task_1')
    task_2 = self.load('task_2')
    self.dump(f'DONE {self.param}_{task_1}_{task_2}')
    task_b = ExampleTaskB(param='example', task_1=ExampleTaskC(), task_2=ExampleTaskD()) # Dependent tasks are defined here
    output = gokart.build(task=task_b)
    print(output)
    https://github.com/m3dev/gokart/blob/master/examples/gokart_notebook_example.ipynb より引用

    View Slide

  7. ➔ TaskInstanceParameterを用いたpipelineの構築例、その時の
    tipsを紹介します。
    本⽇の紹介内容

    View Slide

  8. アジェンダ
    - pipelineの全体像
    - requiresにpipelineを書く
    - 精度の確認用のtaskを挟む
    - staticmethodでテストをする
    - panderaによるデータのvalidation
    - 推論ファイル配置用のS3にputをする
    - pipelineが巨大になった際に、requiresを更にnestする

    View Slide

  9. repo
    - https://github.com/sinchir0/gokart_pipeline_sample

    View Slide

  10. Pipelineの全体像
    このpipelineが、SansanのR&Dが持つ最速でリリースす
    るための基盤、Circuit上にて稼働しており、定期的に自
    動実行するようになっています。
    https://speakerdeck.com/sansan_randd/about-circuit-r-and-ds-application-platform
    - よくある機械学習のワークフロー
    - inputデータは定期的に更新される想定
    - 下記パイプラインを数ヶ月に一度実行する必要がある
    データの
    ダウンロード
    前処理
    学習・推論
    (LightGBM)
    性能の確認
    推論結果配置用
    のS3にpush

    View Slide

  11. ディレクトリ構成
    ❯ tree -L 3
    .
    ├── README.md
    ├── conf
    │ ├── logging.ini
    │ └── param.ini
    ├── gokart_pipeline_sample
    │ ├── __init__.py
    │ ├── pipeline
    │ │ ├── __init__.py
    │ │ ├── __pycache__
    │ │ ├── check.py
    │ │ ├── evaluate.py
    │ │ ├── gokart_util.py
    │ │ ├── load_data.py
    │ │ ├── make_output.py
    │ │ ├── pred.py
    │ │ ├── preprocess.py
    │ │ ├── run.py
    │ │ ├── run_middle.py
    │ │ ├── schemas
    │ │ ├── split_data.py
    │ │ ├── train.py
    │ │ └── upload.py
    │ └── utils
    │ └── template.py
    ├── main.py
    ├── poetry.lock
    ├── pyproject.toml
    └── test
    ├── __init__.py
    └── unit_test
    ├── __pycache__
    └── test_preprocess.py

    View Slide

  12. requiresにpipelineを書く
    - 処理の全体像がすぐにわかる
    class RunTask(GokartTask):
    require_acc = luigi.FloatParameter(default=0.9)
    def requires(self) -> gokart.TaskInstanceParameter:
    data_task = LoadDataTask()
    data_task = PreprocessTask(data_task=data_task)
    data_task = SplitDataTask(data_task=data_task)
    train_data_task = GetDataTask(data_task=data_task, name="train")
    valid_data_task = GetDataTask(data_task=data_task, name="valid")
    test_data_task = GetDataTask(data_task=data_task, name="test")
    model_task = TrainTask(train_task=train_data_task, valid_task=valid_data_task)
    pred_task = PredTask(model_task=model_task, test_task=test_data_task)
    output_task = MakeOutputTask(pred_task=pred_task, test_task=test_data_task)
    checked_output_task = CheckAccuracyTask(
    pred_task=output_task, test_task=test_data_task, require_acc=self.require_acc
    )
    s3_upload_task = UploadS3Task(output_task=checked_output_task)
    return s3_upload_task
    def run(self):
    self.dump(self.__class__.__name__)

    View Slide

  13. - 精度の確認用のtaskを挟むことで、ドメインシフトなどによる精度低下を自動で
    検知
    精度の確認⽤のtaskを挟む
    class CheckAccuracyTask(GokartTask):
    pred_task = gokart.TaskInstanceParameter()
    test_task = gokart.TaskInstanceParameter()
    require_acc = luigi.FloatParameter()
    def requires(self) -> dict[str, gokart.TaskInstanceParameter]:
    return {"pred": self.pred_task, "test": self.test_task}
    def run(self) -> None:
    pred = self.load("pred")
    test = self.load("test")
    y_test = test["y_test"]
    acc = accuracy_score(y_test, pred["pred"])
    if acc <= self.require_acc:
    raise Exception(f"必要な精度に達していません 必要精度:{self.require_acc} 実際の精度:{acc}")
    self.dump(pred)

    View Slide

  14. - テストが必要な関数はstaticmethodやclassmethodに切り出す
    class PreprocessTask(GokartTask):
    data_task = gokart.TaskInstanceParameter()
    def requires(self) -> gokart.TaskInstanceParameter:
    return self.data_task
    @staticmethod
    def make_area(length: pd.Series, width: pd.Series) -> pd.Series:
    return length * width
    @pa.check_types
    def add_area_feature(self, data: DataFrame[IrisFeatureSchema]) ->
    DataFrame[PreprocessedSchema]:
    # 花びらの面積を新たな特徴量として追加
    data["petal_area"] = self.make_area(data["petal_length"],
    data["petal_width"])
    return data
    def run(self) -> None:
    data = self.load_data_frame()
    df = self.add_area_feature(data=data)
    self.dump(df)
    staticmethodでテストをする
    class TestMakeAreaTask:
    @pytest.mark.parametrize(
    ("length", "width", "expect"),
    [
    (
    pd.Series([2.0, 4.0, 6.0]),
    pd.Series([1.0, 2.0, 3.0]),
    pd.Series([2.0, 8.0, 18.0])
    )
    ],
    )
    def test_make_area(
    self,
    length: pd.Series,
    width: pd.Series,
    expect: pd.Series
    ) -> None:
    actual = PreprocessTask.make_area(length, width)
    assert actual.equals(expect)

    View Slide

  15. panderaによるデータのvalidation
    - inputとoutputのpandas.DataFrameのvalidationを行うために、panderaを利用
    - add_area_featureメソッドを作成し、inputとoutputの型ヒントを付与
    class PreprocessTask(GokartTask):
    data_task = gokart.TaskInstanceParameter()
    def requires(self) -> gokart.TaskInstanceParameter:
    return self.data_task
    @staticmethod
    def make_area(length: pd.Series, width: pd.Series) -> pd.Series:
    return length * width
    @pa.check_types
    def add_area_feature(self, data: DataFrame[IrisFeatureSchema]) -> DataFrame[PreprocessedSchema]:
    # 花びらの面積を新たな特徴量として追加
    data["petal_area"] = self.make_area(data["petal_length"], data["petal_width"])
    return data
    def run(self) -> None:
    data = self.load_data_frame()
    df = self.add_area_feature(data=data)
    self.dump(df)

    View Slide

  16. - 指定したデータ以外を検知した場合に、Errorを出す
    panderaによるデータのvalidation
    class PreprocessedSchema(BaseSchema):
    """
    前処理済データのスキーマ
    """
    id: Series[int] = pa.Field(nullable=False, description="id")
    sepal_length: Series[float] = pa.Field(gt=0.0, nullable=False, description="がく片の長さ(cm)")
    sepal_width: Series[float] = pa.Field(gt=0.0, nullable=False, description="がく片の幅(cm)")
    petal_length: Series[float] = pa.Field(gt=0.0, nullable=False, description="花びらの長さ(cm)")
    petal_width: Series[float] = pa.Field(gt=0.0, nullable=False, description="花びらの幅(cm)")
    petal_area: Series[float] = pa.Field(gt=0.0, nullable=False, description="花びらの面積(cm^2)")
    target: Series[int] = pa.Field(isin=(0, 1, 2), nullable=False, description="あやめの種類")

    View Slide

  17. 推論ファイル配置⽤のS3にputをする
    - Circuit基盤上では意図的にファイル出力を制限をしているため、dataframeをスト
    リームで直接putする
    class UploadS3Task(GokartTask):
    output_task = gokart.TaskInstanceParameter()
    def requires(self) -> gokart.TaskInstanceParameter:
    return self.output_task
    def put_file(self, output_data: pd.DataFrame) -> None:
    s3 = boto3.resource("s3")
    bucket_name = os.environ["BUCKET_NAME"]
    with StringIO() as csv_buffer:
    output_data.to_csv(csv_buffer, index=False)
    s3.Object(bucket_name, "output.csv").put(Body=csv_buffer.getvalue())
    def run(self) -> None:
    output = self.load_data_frame()
    self.put_file(output)
    self.dump(self.__class__.__name__)

    View Slide

  18. pipelineが巨⼤になった際に、requiresを更にnestする
    - requiresの中身が巨大になった際には、requiresのtaskを更にnestするようなtask
    を作成
    - この程度だとあまり差はないが、巨大になった場合は見通しが改善する
    class RunTask(GokartTask):
    def requires(self) -> gokart.TaskInstanceParameter:
    data_task = LoadDataTask()
    data_task = PreprocessTask(data_task=data_task)
    data_task = SplitDataTask(data_task=data_task)
    –中略--
    return s3_upload_task
    def run(self) -> None:
    self.dump(self.__class__.__name__)
    class RunNestTask(GokartTask):
    def requires(self) -> gokart.TaskInstanceParameter:
    data_task = PrepareDataTask()
    –中略--
    return s3_upload_task
    def run(self) -> None:
    self.dump(self.__class__.__name__)
    class PrepareDataTask(GokartTask):
    def requires(self) -> gokart.TaskInstanceParameter:
    data_task = LoadDataTask()
    data_task = PreprocessTask(data_task=data_task)
    data_task = SplitDataTask(data_task=data_task)
    return data_task
    def run(self) -> None:
    data = self.load()
    self.dump(data)

    View Slide

  19. View Slide