일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 머신러닝 파이프라인
- COFIBA
- AWS
- chatGPT
- 클라우드
- HTTP
- aws자격증
- docker
- 클라우드자격증
- 추천시스템
- MSCS
- BERT
- MAB
- transformer
- 중국플랫폼
- 머신러닝
- 미국석사
- RecSys
- 네트워크
- nlp
- 메타버스
- MLOps
- TFX
- Collaborative Filtering Bandit
- 자연어처리
- 언어모델
- BERT이해
- llm
- 플랫폼
- BANDiT
- Today
- Total
Julie의 Tech 블로그
ML Ops, 머신러닝 파이프라인 설계 - (3) Data Ingestion 본문
이번 글부터는 머신러닝 파이프라인 단계별로 좀 더 상세하게 살펴볼 것이다.
일단 가장 첫 단추라고 할 수 있는 데이터 Ingestion 부터 알아보도록 하자.
TFX 는 데이터를 파일 혹은 어느 서비스 형태로 파이프라인에 삽입할 수 있도록 Component를 제공한다.
그 중 하나가 TFRecord , 즉 사이즈가 큰 데이터를 스트리밍하는 용도로 최적화된 경량화된 format 이다.
TFRecord 파일은 여러개의 tf.Example 레코드로 구성되어 있는데, 각 레코드는 하나 이상의 feature 로 구성되어있다.
feature는 데이터에서 컬럼에 대응하는 단위로 생각하면 된다. 이 레코드들이 바이너리 형식의 TFRecord로 저장되게 된다.
따라서 큰 데이터를 write하거나 다운로드할 때 최적화되어있고, tf.Example은 TFX 내 다른 component와도 호환이 된다는 점에서 장점이 있다.
import tensorflow as tf :
with tf.io.TFRecordWriter("test.tfrecord") as w:
w.write(b"First record")
w.write(b"Second record")
for record in tf.data.TFRecordDataset("test.tfrecord"):
print(record)
//tf.Tensor(b'First record', shape=(), dtype=string)
//tf.Tensor(b'Second record', shape=(), dtpye=string)
ExampleGen은 데이터셋에 대한 ingest, split, convert 기능을 제공하는 component이다.
만약 로컬 파일시스템에서 데이터를 ingest 한다고 할 경우, 아래와 같이 가능하다.
유의해야하는 점은 데이터를 읽어오라고 지정할 경로 내에는 데이터만 있어야한다는 것이다.
다른 메타 데이터 파일들이 있을 경우 정상적으로 읽어들이지 않는다.
우선 데이터를 읽어올 경로를 지정하고, CsvExampleGen이라는 component에 데이터를 넣어 context 로 실행시켜준다.
import os
from tfx.components import CsvExampleGen
from tfx.utils.dsl_utils import external_input
basedir = os.getcwd()
datadir = os.path.join(os.pardir, "data")
examples = external_input(os.path.join(base_dir, data_dir))
example_Gen = CsvExampleGen(input=examples)
context.run(example_gen)
만약 앞서 배운 interactive pipline으로 생성하였다면, 아래와 같이 시각적으로 CsvExampleGen 결과를 살펴볼 수 있다.
만약 csv와 같이 일반적으로 정형화된 포맷의 데이터가 아닐 경우 변환하여 ingestion을 해주어야한다.
이럴 때는 데이터를 TFRecord 데이터 형식으로 변환해주고, 해당 파일들을 ImportExampleGen component를 사용하여 ingest해주면 된다.
위 코드에서 import해오는 것과, instance만 변경해주면 된다.
...
from tfx.components import ImportExampleGen
...
example_gen = ImportExampleGen(input=examples)
...
만약 parquet과 같이 새로운 타입의 파일을 데이터로 사용하기 위해서는, 새로운 Component를 구성할 필요 없이 Executor를 건드리면 된다.
Executor는 우리가 앞서 component의 내부 구성 아키텍쳐에서 보았던 개념인데, worker라고 보면 된다.
from tfx.components import FileBasedExampleGen
from tfx.components.example_gen.custom_executors import parquet_executor
from tfx.utils.dsl_utils import external_input
examples = external_input(parquet_dir_path)
example_Gen = FileBasedExampleGen(input=examples, executor_class=parquet_executor.Executor)
import 하는 ExampleGen은 일반적인 파일 ExampleGen으로 지정하고, executor을 parquet 맞춤 executor로 지정하는 방식이다.
Avro 파일 포맷으로 데이터를 가져올 때도 마찬가지이다. avro-specific executor를 지정해주면 된다.
이외에 custom executor를 만들어 위와 같이 적용하여 데이터를 ingestion할 수 있다.
때로는 데이터를 변환하여 TFRecord에 맞게 바꾼 뒤 ImportExampleGen component에 ingest 해주는게 더 간단할 수 있다.
TFRecord에 맞게 데이터를 변환하기 위해서는 데이터셋 내에 존재하는 레코드 하나마다 tf.Example 구조로 만들어주어야한다.
tf.Example은 간단하지만 굉장히 flexible한 데이터 구조이며, key-value 매핑 포맷이다.
아래는 tf.Example의 예시이다.
Record 1:
tf.Example
tf.Features
'column A' : tf.train.Feature
'column B' : tf.train.Feature
'column C' : tf.train.Feature
데이터셋을 TFRecord로 변환하는 코드는 조금 길어서 아래 링크로 대체하겠다.
해당 링크를 확인하면 컬럼별로 하나씩 변환 함수를 지정하고, 레코드별로 변환한 뒤 TFRecord 포맷으로 파일이 최종적으로 반환되는 것을 확인할 수 있다.
이외에도 AWS S3와 같은 클라우드 저장소 서비스에서 데이터를 받아 ingest하는 것도 가능한데,
일반적으로 credentials 를 필요로하고 AWS와 같은 경우 access key와 access secret을 요구한다.
DB에 직접적으로 접근하여 테이블을 데이터로 받아오는 것도 가능하다. (BigQueryExampleGen, PrestoExampleGen)
from tfx.components import BigQueryExampleGen
query = """
SELECT * FROM `<project_id>.<db>.<table>`
"""
example_gen = BigQueryExampleGen(query=query)
Data Preparation
train, valid, test 데이터 셋으로 split하면서 가져올 수도 있다. 아래 코드를 참고해보면 hash_buckets 값을 지정함으로써 그 비율을 정한다.
from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input
base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")
output = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
]))
examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, output_config=output)
context.run(example_gen)
기존 코드와 다른 점은 output_config를 지정한다는 것이다. output_config를 지정하지 않으면 default로 2:1 (train:test) 로 나누게 된다.
만약 이미 폴더별로 train, test, eval 셋이 나누어져있을 경우 경로 패턴을 인식할 수 있도록 정규식을 지정함으로써 나눠서 읽어들일 수도 있다.
머신러닝 파이프라인을 사용하는 이유 중 하나는 신규 데이터가 인입되었을 때 새로 학습할 수 있도록 자동화하고자 하는 목적도 있다.
이 때 우리는 ExampleGen의 span 기능을 사용하는데, span이란 데이터의 스냅샷과 같은 것이다.
span은 말 그대로 확장하는 식으로 과거 데이터를 포함하여 저장할 수 있다.
예를 들어 1분전 데이터와 그간 업데이트된 데이터를 모두 합쳐 보유하는 것이다.
아래 코드를 수행하게 되면 ExampleGen이 알아서 가장 최종 span을 (=최신) 가져올 수 있도록 되어있다.
from tfx.components import CsvExampleGen
from tfx.proto import example_gen_pb2
from tfx.utils.dsl_utils import external_input
base_dir = os.getcwd()
data_dir = os.path.join(os.pardir, "data")
input = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(pattern='export-{SPAN}/*')
])
examples = external_input(os.path.join(base_dir, data_dir))
example_gen = CsvExampleGen(input=examples, input_config=input)
context.run(example_gen)
데이터 버전 관리 기능도 필요로 할 수 있는데, 데이터 버전 관리를 통해 데이터에 대한 메타 정보를 트래킹해나갈 수 있기 때문이다.
하지만 아직까지 TFX ExampleGen은 데이터 버전 관리 기능을 제공해주지 않고 있다.
따라서 데이터 버전 관리를 하기 위해서는 third party 툴을 이용해야된다. (Data Version Control(DVC), Pachyderm 등)
하지만 위 툴들 모두 TFX ML MetadataStore에 정보를 저장해주진 않는다.
이렇게 데이터를 Ingestion하는 코드를 여러 가지로 살펴보았다.
이제는 정리하는 차원에서, 각 데이터 유형별로 일반적으로 접근하는 방법에 대해 써내려갈 것이다.
보통 일반적으로 정형화된 데이터는 PrestoExampleGen이나 BigQueryExampleGen과 같이 DB에 직접적으로 접근해서 읽어들이거나,
CsvExampleGen 을 이용하여 CSV 파일을 읽고, 그 외 지원되지 않는 포맷은 TFRecord로 변환하여 읽어들여야한다.
텍스트와 같은 말뭉치 데이터는 아무래도 사이즈가 커질 확률이 다분하다보니, TFRecord를 통해 변환하여 읽어들이는 것을 추천한다.
혹은 Apache Parquet 파일 포맷으로 담는 것도 괜찮은 방법이다.
이처럼 성능에 맞는 파일 포맷을 사용해야 효율적으로 데이터를 읽어들일 수 있다는 점에 유의해야한다.
DB에서 직접적으로 가져오는 것도 가능하나, 네트워크 비용이나 병목현상이 발생할 수 있다는 점에 유의해야한다.
이미지 형태의 데이터셋도 마찬가지로 TFRecord 파일로 변환하는 것이 필요하다. 다만 압축한 형태로 올리는 것이 좋다.
압축하여 TFRecord 로 변환하여 데이터를 읽을 경우 byte단위의 string 형태로 데이터를 보관할 것이다.
참고로 라벨도 함께 tf.Example로 저장할 수 있는데, generate_label_from_path 라는 함수를 사용하면 된다.
이상 글을 마치며, 다음 편에서는 Data Validation에 대해 다룰 것이다.
본 글은 아래 책을 읽고 재구성되었습니다.
https://learning.oreilly.com/library/view/building-machine-learning/9781492053187/
'Tech > MLOps' 카테고리의 다른 글
MLOps, 머신러닝 파이프라인 설계 - (5) Data Preprocessing (0) | 2022.01.02 |
---|---|
MLOps, 머신러닝 파이프라인 설계 - (4) Data Validation (1) | 2021.09.20 |
MLOps, 머신러닝 파이프라인 설계 - (2) TFX, Apache Beam 개요 (1) | 2021.09.01 |
MLOps, 머신러닝 파이프라인 설계 - (1) 개요 (1) | 2021.09.01 |
도커 - 네트워킹 / bridge와 overlay (0) | 2021.05.30 |