일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 메타버스
- AWS
- 미국석사
- 머신러닝 파이프라인
- transformer
- BERT이해
- 언어모델
- chatGPT
- Collaborative Filtering Bandit
- 클라우드자격증
- aws자격증
- 네트워크
- 클라우드
- MSCS
- COFIBA
- 머신러닝
- MAB
- 중국플랫폼
- 추천시스템
- llm
- nlp
- HTTP
- 플랫폼
- MLOps
- 자연어처리
- BANDiT
- TFX
- docker
- BERT
- RecSys
- Today
- Total
Julie의 Tech 블로그
MLOps, 머신러닝 파이프라인 설계 - (5) Data Preprocessing 본문
본 글은 MLOps의 데이터 전처리 단계인 Preprocessing 과정에 대해 다뤄볼 것이다.
이전에는 데이터의 수집, 수집된 데이터의 정합성 검증 과정까지 다루었다면,
이제 본격적으로 데이터를 주입하여 모델에 input하기 전까지의 과정을 다루게 되는 것이다.
글을 시작하기에 앞서 본 글은 Tensorflow TFX 라이브러리를 기반으로 설명을 하고 있는데,
MLOps의 프레임워크 서비스로 꼭 모든 파이프라인 단계를 빌딩해야한다는 것은 아니다.
기존에 numpy 나 pandas로 전처리를 하고 있었다면, 그로 충분히 사용할 수도 있다.
Why Data Preprocessing?
우리가 흔히 수집하는 데이터는 모델이 인식할 수 있는 포맷대로 수집되지 않는다.
예를 들어 모델의 정답지로 사용하는 라벨 데이터도 'Yes' 혹은 'No'로 수집되는데, 이를 모델이 인식할 수 있는 0 또는 1로 변경해주어야한다.
당연한 사실이지만 TFT라이브러리를 통해 모델 전처리 단계를 배포하게 된다면,
전체 파이프라인을 TFX로 통합하여 관리할 수 있다는 장점이 있다.
TFX는 대시보드로 통계치와 같은 정보를 제공하고 있기 때문에, 각 단계별 정상적으로 수행이 되는지 모니터링도 가능하다.
앞서 살펴봤듯이, 데이터 수집 시 데이터 확인 과정에 있어 정상 범주에서 벗어난 데이터가 많이 수집될 경우 노티를 받을 수 있다.
Data Preprocessing with TFT
TFT는 Tensorflow Transform의 약자로, TFDV와 같은 TFX 프로젝트의 일환이다.
TFT는 사전에 정의된 데이터셋 스키마에 따라 수집된 데이터를 가공처리하여, 아래와 같은 두 종류의 output을 만든다:
1) TFRecord 포맷으로 학습 / 평가용 데이터셋 두 가지를 생성한다. (이후 단계인 Trainer 컴포넌트에 사용됨)
2) 전처리 그래프(preprocessing graph), 머신러닝 모델을 export할 때 사용된다
아래 그림에서도 볼 수 있듯이 TFT에서 가장 핵심적인 역할을 하는 함수는 preprocessing_fn 이다.
해당 함수는 raw 데이터에 적용하고자 하는 다양한 변형 함수들을 제공하고 있다.
우리는 우선 아래와 같이 예시용으로 정의해보자.
def preprocessing_fn(inputs):
x = inputs['x']
x_normalized = tft.scale_to_0_1(x)
return {
'x_xf': x_normalized
}
TFT는 데이터를 버켓화하는 tft.bucketize, Z스코어로 변환하는 tft.scale_to_z_score와 같은 함수를 제공하기도 한다.
TFT Installation
TFX 패키지를 설치했다면, 의존성 중 하나로 포함되기 때문에 따로 별도 설치할 필요는 없다. 별도 설치를 원한다면, 명령어 수행으로 가능하다.
Best practices
교재에서 몇 가지 주의 혹은 노하우들을 공유하였다. TFT를 이용하여 전처리 단계를 생성할 때 참고하면 좋을 것이다.
1. 자료형을 고려하라
TFT는 tf.string, tf.float32, tf.int64 중 하나로 데이터를 처리하게 되는데, 만약 머신러닝 모델이 위 데이터 타입들을 받아들이지 못한다면 (예를 들어 BERT 모델은 tf.int32 로 input값을 받음) 자료형을 변환하여 머신러닝 모델에게 주어야할 것이다.
2. 전처리는 배치로 동작한다
전처리 함수를 직접 프로그래밍할 경우, 전처리는 한 번에 한 row만 수행하는 것이 아니라, 배치형으로 수행된다는 것을 인지해야한다. 이로 인해 preprocessing_fn 함수 결과를 Tensor나 SparseTensor로 다시 처리해야될 수도 있다.
3. TF 명령어로 작성하라
preprocessing_fn 함수 동작시에는 내부 명령어는 모두 TensorFlow 명령어여야한다. 문자열의 대문자를 소문자로 변경하고 싶을 경우 lower()이 아닌 tf.strings.lower() 함수를 사용해야한다.
TFT Functions
다음은 TFT에서 제공하는 대표적인 함수들 몇 가지를 소개할텐데, 함수 이름이 직관적인 경우 설명을 생략했다.
- tft.scale_to_z_score()
- tft.bucketize()
feature를 bin으로 버킷화하기 위해 사용하는 함수. bin이나 bucket index를 반환하게 된다.
- tft.pca()
- tft.compute_and_apply_vocabulary()
feature 컬럼의 모든 유니크한 값을 조사한 뒤 가장 빈번하게 사용되는 값들을 index화하여 매핑해둔다. 이 index 매핑값은 feature를 숫자형으로 표현하려고 할 때 사용될 수 있다. NLP모델에 사용되기도 한다.
- tft.apply_saved_model()
TFT에서는 NLP 모델에 사용하기 좋은 함수들도 제공한다 (tft.ngrams(), tft.bag_of_words(), tft.tfidf() 등)
하지만 이에 대해선 따로 설명을 덧붙이지 않겠다.
이미지 데이터 처리하는데에도 사용할 수 있는 함수들이 있다.
TensorFlow는 이미 이미지 전처리 명령어들을 제공하고 있는데, tf.images와 tf.io API들이 그에 해당한다.
TFT Execution
앞서 preprocessing_fn 함수를 정의하는 방법에 대해 다루었다면, 이제는 Transform을 어떻게 수행(execute)하느냐에 대해 이야기해볼 것이다.
전처리 단계만 단독으로 수행할 수도 있고, TFX 컴포넌트 중 하나로 수행해볼 수도 있다.
양쪽의 경우 모두 Apache Beam이나 Google Cloud의 Dataflow 서비스를 이용하여 수행할 수 있다.
우선 단독으로 수행하는 경우에 대해 살펴보자.
아래와 같이 x라는 이름의 컬럼 하나로만 구성된 작은 raw데이터를 tf.float32 타입으로 지정하여 메타 데이터를 생성해보자.
raw_data = [
{'x': 1.20},
{'x': 2.99},
{'x': 100.00}
]
import tensorflow as tf
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
raw_data_metadata = dataset_metadata.DatasetMetadata(
schema_utils.schema_from_feature_spec({
'x': tf.io.FixedLenFeature([], tf.float32),
}))
이 데이터를 이제 TFT를 이용하여 변환해보자. TFRecord 데이터로 output을 만들어 저장할 것이다.
이 때 Apache Beam에서 제공하는 AnalyzeAndTransformDataset 함수를 사용할텐데, 이 함수는 데이터셋을 분석하고 변환하는 두 단계를 수행하는 함수이다. Python의 context manager인 tft_beam.Context 를 이용하여 수행된다.
* 컨텍스트 매니저란 원하는 타이밍에 정확하게 리소스를 할당하고 제공하는 역할을 한다, 우리가 가장 많이 사용하는 Python의 컨텍스트 문은 파일을 읽고 쓸 때 사용하는 with 문이다.
import tempfile
import tensorflow_transform.beam.impl as tft_beam
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
tfrecord_file = "/your/tf_records_file.tfrecord"
raw_data = (
pipeline | beam.io.ReadFromTFRecord(tfrecord_file))
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn))
이 변환 결과를 출력해보면, 아래와 같이 x_xf 컬럼이 리턴되는 것을 볼 수 있다.
transformed_data, transformed_metadata = transformed_dataset
print(transformed_data)
[
{'x_xf': 0.0},
{'x_xf': 0.018117407},
{'x_xf': 1.0}
]
이처럼 preprocessing_fn 만 잘 정의해주면 무척 손쉽게 데이터 전처리가 가능하다.
TFT Integration
우리의 머신러닝 프로젝트에 TFT를 적용하는 방법에 대해 예시를 통해 설명을 하려고 한다.
예시 데이터셋은 one-hot 인코딩용 feature, bucket화 할 feature, 문자열 feature 이렇게 세 가지가 있다.
## Feature 예시 및 정의
import tensorflow as tf
import tensorflow_transform as tft
LABEL_KEY = "consumer_disputed"
# Feature name, feature dimensionality.
ONE_HOT_FEATURES = {
"product": 11,
"sub_product": 45,
"company_response": 5,
"state": 60,
"issue": 90
}
# Feature name, bucket count.
BUCKET_FEATURES = {
"zip_code": 10
}
# Feature name, value is unused.
TEXT_FEATURES = {
"consumer_complaint_narrative": None
}
## 변형된 feature를 담을 변수명 정의하기
def transformed_name(key):
return key + '_xf'
## Sparse한 변수들은 missing value 채우기
def fill_in_missing(x):
default_value = '' if x.dtype == tf.string or to_string else 0
if type(x) == tf.SparseTensor:
x = tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value)
return tf.squeeze(x, axis=1)
## 각 변수별로 전처리할 함수 정의
# one-hot encoding
def convert_num_to_one_hot(label_tensor, num_labels=2):
one_hot_tensor = tf.one_hot(label_tensor, num_labels)
return tf.reshape(one_hot_tensor, [-1, num_labels])
# bucketize
def convert_zip_code(zip_code):
if zip_code == '':
zip_code = "00000"
zip_code = tf.strings.regex_replace(zip_code, r'X{0,5}', "0")
zip_code = tf.strings.to_number(zip_code, out_type=tf.float32)
return zip_code
# preprocessing_fn 정의
## 1. One-hot encoding : 카테고리 이름을 compute_and_apply_vocabulary 함수를 이용하여 인덱스화
## 카테고리 값을 인덱스화된 카테고리 이름별로 one-hot 인코딩
## 2. Bucketize : zip code 개별 값은 너무 sparse하기 때문에 10개의 bucket(bin)으로 만든 뒤, bucket index는 one-hot encoding
## 3. Text : 문자열은 따로 변환할 필요가 없어 sparse할 경우를 대비하여 missing value처리한 뒤 dense feature로 가공
def preprocessing_fn(inputs):
outputs = {}
for key in ONE_HOT_FEATURES.keys():
dim = ONE_HOT_FEATURES[key]
index = tft.compute_and_apply_vocabulary(
fill_in_missing(inputs[key]), top_k=dim + 1)
outputs[transformed_name(key)] = convert_num_to_one_hot(
index, num_labels=dim + 1)
for key, bucket_count in BUCKET_FEATURES.items():
temp_feature = tft.bucketize(
convert_zip_code(fill_in_missing(inputs[key])),
bucket_count,
always_return_num_quantiles=False)
outputs[transformed_name(key)] = convert_num_to_one_hot(
temp_feature,
num_labels=bucket_count + 1)
for key in TEXT_FEATURES.keys():
outputs[transformed_name(key)] = \
fill_in_missing(inputs[key])
outputs[transformed_name(LABEL_KEY)] = fill_in_missing(inputs[LABEL_KEY])
return outputs
마무리
이번 글에서는 TFT를 이용하여 모델의 전처리 과정을 셋팅하는 방법에 대해 다루었다.
preprocessing_fn 함수를 설명하며 그 과정을 디테일화하였고, 예시 코드를 통해 TFT를 어떻게 적용하는 지에 대해서도 알 수 있었다.
다음 글에서는 가장 중요한 모델 학습 단계를 MLOps로 설계 및 구현하는 방법에 대해 다룰 것이다.
'Tech > MLOps' 카테고리의 다른 글
MLOps, 머신러닝 파이프라인 설계 - (6) Model Training (0) | 2022.01.10 |
---|---|
MLOps, 머신러닝 파이프라인 설계 - (4) Data Validation (1) | 2021.09.20 |
ML Ops, 머신러닝 파이프라인 설계 - (3) Data Ingestion (0) | 2021.09.07 |
MLOps, 머신러닝 파이프라인 설계 - (2) TFX, Apache Beam 개요 (1) | 2021.09.01 |
MLOps, 머신러닝 파이프라인 설계 - (1) 개요 (1) | 2021.09.01 |