今回は、Streamlitのコンポーネントであるbarfiを「使ってみた」です。
実際にbarfiを使って、データの前処理を行う環境を作ってみようと思います。
コードとブロックを比較しながら、barfiの理解を深めます。
「そもそもbarfiって何よ」という方、以前作成した記事に説明がありますので、そちらを見ていただけたらと思います。
barfiで作ってみた!
データの前処理には、以下の要素を含むものとして作成を進めます。
- 読み込むデータの選択
- 正規化/標準化
- NULLを含む行の削除
- データの結合
- 縦方向
- 横方向
- 加工後のデータの保存
まずは環境構築
barfiはpip install barfi
で利用可能になります。環境構築後に実行しておいてください。
環境は、WindowsのローカルでAnacondaを用いて構築しました。
バージョンは以下の通り。
- python:3.11.7
- numpy:1.26.3
- pandas:2.1.4
- streamlit:1.29.0
また、ファイル構成は以下の通りです。
┣ app.py(メインファイル、後述)
┣ data
┣ test1.csv(疑似データ)
┗ test2.csv(疑似データ)
┗ output
barfiでフローの環境を作る
コーディングはpyファイルで行います。このファイルをStreamlitで実行することでフロー図の作成に加えて実行まで可能になります。今回はapp.pyというファイルを作成します。
最初に必要なライブラリをインポートしておきます。
import pandas as pd
import streamlit as st
import glob
from barfi import st_barfi, barfi_schemas, Block
ブロックの作成
作成するブロックは先述した①読み込むデータの選択、②正規化/標準化、③NULLを含む行の削除、④データの結合、⑤データの保存の5つです。
①読み込むデータの選択
データの読み込みは最初のブロックになるはずですので、インプットはなしにしました。ただ、どのデータを読み込むかの設定は必要なため、オプションから設定しています。data配下のcsvファイルからプルダウンで選択できるようにしました。アウトプットは読み込んだデータフレームなので1つです。
関数内では、オプションで指定されたファイル情報を受け取り、pandasで読み込み、アウトプットにセットするという単純なものです。
コードは以下の通り。
#入出力
load = Block(name='load')
load.add_option(name='select data to load', type='select', items=glob.glob('data/*.csv'))
load.add_output(name='Output 1')
#関数
def load_func(self):
path = self.get_option(name='select data to load')
df = pd.read_csv(path)
self.set_interface(name='Output 1', value=df)
load.add_compute(load_func)
ブロックはこのような形になります。存在するデータがドロップボックスで読み込み可能になっています。コードを変更すれば、この辺は柔軟に対応可能ですね。
②正規化/標準化
データフレーム1つを入力、オプションとして正規化か標準化の選択、正規化/標準化後のデータフレームを出力するブロックです。
関数内では、インプットとスケーリングタイプを受け取り、それを基にインプットを変換し、アウトプットにセットしています。
#入出力
scaling = Block(name='scaling')
scaling.add_option(name='select scaling type', type='select', items=['Normalization','Standardization'])
scaling.add_input(name='Input 1')
scaling.add_output(name='Output 1')
#関数
def scaling_func(self):
df = self.get_interface(name='Input 1')
scaling_type = self.get_option(name='select scaling type')
if scaling_type == 'Normalization':
df=(df - df.min()) / (df.max() - df.min())
elif scaling_type == 'Standardization':
df=(df - df.mean()) / df.std()
self.set_interface(name='Output 1', value=df)
scaling.add_compute(scaling_func)
ブロックはこんな感じ。
先ほどと比べて、インプットが増えていることが分かりますね。
③NULLを含む行の削除
こちらは、やることが一意ですのでオプションの設定は行わず、インプットとアウトプットのみ設定しました。(オプションとして、drop対象をallかanyか設定させてもいいですね)
関数も、受け取ったインプットにdropna
を適用したものをアウトプットにセットするシンプルなものです。
#入出力
drop_NULL = Block(name='drop NULL')
drop_NULL.add_input(name='Input 1')
drop_NULL.add_output(name='Output 1')
#関数
def drop_NULL_func(self):
df = self.get_interface(name='Input 1')
self.set_interface(name='Output 1', value=df.dropna())
drop_NULL.add_compute(drop_NULL_func)
ブロックのほうは、先の2つで存在したプルダウンがなくなり、入出力のみになっています。
④データの結合
結合は入力が複数あることが前提の行為と言えるでしょう。そのため、今回は入力を2つに設定しています。また、結合方向(縦or横)をオプションで選択、結合後のデータが出力なので1つの出力としました。
関数については、特筆すべきことはないです。
#入出力
concat = Block(name='concat')
concat.add_option(name='select concat type', type='select', items=['vertical','horizontal'])
concat.add_input(name='Input 1')
concat.add_input(name='Input 2')
concat.add_output(name='Output 1')
#関数
def concat_func(self):
df_1 = self.get_interface(name='Input 1')
df_2 = self.get_interface(name='Input 2')
concat_type = self.get_option(name='select concat type')
concat_type = (1 if concat_type=='horizontal' else 0)
df = pd.concat([df_1,df_2], axis=concat_type)
df.reset_index(inplace=True, drop=True)
self.set_interface(name='Output 1', value=df)
concat.add_compute(concat_func)
ブロックを見ていただけると、入力が複数設定されていることが分かります。
⑤データの保存
データの保存は最後のブロックになるはずなので、アウトプットは設定しません。保存したいデータを受け取るインプットに加えて、ファイル名を設定するために指示文と記述式のオプションを設定しました。
関数内での記述式の読み込みですが、プルダウンの時と同様に行うことができます。
#入力
save = Block(name='save')
save.add_input(name='Input 1')
save.add_option(name='explain', type='display', value='enter a file name without extension')
save.add_option(name='file name', type='input', items='output')
#関数
def save_func(self):
df = self.get_interface(name='Input 1')
file_name=self.get_option(name='file name')
df.to_csv(f"output/{file_name}.csv", index=False)
save.add_compute(save_func)
ブロックのほうは、アウトプットがなくなり、指示文と記述式の入力枠が追加されていますね。
フロー図作成エリア
前回も説明した通り、フロー図を作成するためのコードになります。
ほぼほぼ前回と同じです。
load_schema = st.selectbox('Select a saved schema:', barfi_schemas())
compute_engine = st.checkbox('Activate barfi compute engine', value=True)
barfi_result = st_barfi(base_blocks=[load, scaling, drop_NULL, concat, save],
compute_engine=compute_engine,
load_schema=load_schema)
実際に動かしてみた
terminalで以下を実行するとStreamlitが立ちあがます。
立ち上がらない場合は、実行後にブラウザのアドレスバーにlocalhost:{ポート番号}
と打ち込んでください。そこでフローの操作ができます。
streamlit run --server.port {ポート番号} app.py
こんなフローを組んでみました。
①2つの疑似データをそれぞれ読み込み
②2つのデータを結合
③NULLを含む行の削除
④標準化
⑤output.csvとして保存
枠内左上に「Menu」と「Execute」があります。
Menuは、フロー図の保存や保存したフロー図の読み込みができます。
Executeは、作成したフロー図の実行ができます。実際に押下すると、設定したすべての処理が行われたファイルとしてoutput/output.csvが作成されました!
ここが惜しい
- 色
作者のこだわりを感じる黒を基調とした配色ですが、ブロックごとに色を付けることができれば一目で処理の内容が分かると思うのです。 - ブロックサイズ
余白が大きく、もっとコンパクトにしたいなと思うことがありました。 - 表示範囲
ズーム機能がありますが、かなり見づらくなります。
画面表示の範囲が限られていることもありますが、ブロックが増えても見やすいようにしたいなと感じました。
最後に
Streamlitのコンポーネントbarfiを実際に使ってみましたが、Pythonを用いて簡単に作れて驚きでした。積み立てていくとすごいものが作れるかもしれないです。。。
後編では、ブロックの色付けをしてみたいと思います。
今回はここまで!
コード全文
app.py
import pandas as pd
import streamlit as st
import glob
from barfi import st_barfi, barfi_schemas, Block
load = Block(name='load')
load.add_option(name='select data to load', type='select', items=glob.glob('data/*.csv'))
load.add_output(name='Output 1')
def load_func(self):
path = self.get_option(name='select data to load')
df = pd.read_csv(path)
self.set_interface(name='Output 1', value=df)
load.add_compute(load_func)
scaling = Block(name='scaling')
scaling.add_option(name='select scaling type', type='select', items=['Normalization','Standardization'])
scaling.add_input(name='Input 1')
scaling.add_output(name='Output 1')
def scaling_func(self):
df = self.get_interface(name='Input 1')
scaling_type = self.get_option(name='select scaling type')
if scaling_type == 'Normalization':
df=(df - df.min()) / (df.max() - df.min())
elif scaling_type == 'Standardization':
df=(df - df.mean()) / df.std()
self.set_interface(name='Output 1', value=df)
scaling.add_compute(scaling_func)
drop_NULL = Block(name='drop NULL')
drop_NULL.add_input(name='Input 1')
drop_NULL.add_output(name='Output 1')
def drop_NULL_func(self):
df = self.get_interface(name='Input 1')
self.set_interface(name='Output 1', value=df.dropna())
drop_NULL.add_compute(drop_NULL_func)
concat = Block(name='concat')
concat.add_option(name='select concat type', type='select', items=['vertical','horizontal'])
concat.add_input(name='Input 1')
concat.add_input(name='Input 2')
concat.add_output(name='Output 1')
def concat_func(self):
df_1 = self.get_interface(name='Input 1')
df_2 = self.get_interface(name='Input 2')
concat_type = self.get_option(name='select concat type')
concat_type = (1 if concat_type=='horizontal' else 0)
df = pd.concat([df_1,df_2], axis=concat_type)
df.reset_index(inplace=True, drop=True)
self.set_interface(name='Output 1', value=df)
concat.add_compute(concat_func)
save = Block(name='save')
save.add_input(name='Input 1')
save.add_option(name='explain', type='display', value='enter a file name without extension')
save.add_option(name='file name', type='input', value='output')
def save_func(self):
df = self.get_interface(name='Input 1')
file_name=self.get_option(name='file name')
df.to_csv(f"output/{file_name}.csv", index=False)
save.add_compute(save_func)
load_schema = st.selectbox('Select a saved schema:', barfi_schemas())
compute_engine = st.checkbox('Activate barfi compute engine', value=True)
barfi_result = st_barfi(base_blocks=[load, scaling, drop_NULL, concat, save],
compute_engine=compute_engine,
load_schema=load_schema)