Julie의 Tech 블로그

MLOps, 머신러닝 파이프라인 설계 - (4) Data Validation 본문

Tech/MLOps

MLOps, 머신러닝 파이프라인 설계 - (4) Data Validation

Julie's tech 2021. 9. 20. 17:08
728x90

이번 편은 지난 편에 이어 머신러닝 파이프라인 단계 중 하나인 Data Validation(검증) 과 관련된 기술들에 대해 소개해볼 것이다.

지난번의 Data Ingestion 단계가 이루어지면, 인풋으로 들어온 데이터가 올바른지에 대해 검증하는 단계이다.

모델로 학습하기 이전에 이상치가 있는지, 데이터 범위에 맞게 분포가 형성되어있는지 등을 확인하게 된다.

Data Validation 단계에서는 아래 세 가지를 중점적으로 살펴보게 된다:

1. Data Anomaly 확인

2. Data Schema 변경건 확인

3. 이전 버전의 데이터와 주요 통계치가 유사한 수준에 있는지

이 세가지 포인트에 있어서 차이가 크게 발생하거나 문제가 있을 경우 워크플로우를 중단하여 운영자가 점검할 수 있도록 해준다.

이상치나 결측치를 확인하여 데이터 전처리에 어떻게 진행해야하는지 점검된 정보를 미리 전달할 수 있다는 점에도 의미가 있으나,

모델 학습시에 사용될 학습/평가 데이터를 랜덤하게 구분한 뒤 두 데이터 간 분포 차이가 어떠한지 등도 미리 알 수 있어 의미가 있다.

또한 데이터가 점점 변화(drift)한다는 점도 알아챌 수 있는데, 데이터가 변화하는 것에 따라 전처리도 업데이트되어야하기에 유용하다.

데이터가 변화하는 배경으로는 사회적인 트렌드가 변화하거나, 데이터의 시즌성이 있거나, 모델의 성능개선에 따른 반응데이터 고도화를 생각해볼 수 있다.

Tensorflow는 TFDV(TFX의 하위 프로젝트)라는 라이브러리를 통해 Data Validation에 필요한 툴을 제공한다.

TFDV는 Tensorflow의 TFRecordcsv 파일, 두 가지 포맷의 데이터를 인풋으로서 허용한다.

Apache Beam을 통해 시각화 자료를 제공하기도 한다.

Feature 관련 정보나 분포도 차트를 플랏팅한 결과를 볼 수 있는 TFDV Visualization

 

라이브러리 설치는 아래와 같이 간단하게 진행할 수 있다.

사실 TFX를 설치했다면 자동으로 설치되어있을테지만, TFDV만 단독으로 설치하고 싶을 경우 아래와 같이 진행가능하다.

$ pip install tensorflow-data-validation

Statistics

우선 데이터에 대한 통계치부터 생성해보자.

import tensorflow_data_validation as tfdv
// csv
stats = tfdv.generate_statistics_from_csv(
    data_location='/data/input_data.csv',
    delimiter=',')
// TFRecord
stats = tfdv.generate_statistics_from_tfrecord(
    data_location='/data/input_data.tfrecord')

generate_statistics_from를 실행할 경우 아래와 같은 결과를 볼 수 있다:

수치형 데이터의 경우 아래 통계치를 제공한다.

- count

- 결측치수

- 평균과 표준편차

- 최소 최대값

- 0인 값의 비율

카테고리형 변수의 경우 아래 통계치를 제공한다.

- count

- 결측치수

- unique 값 수

- 문자형 값의 평균 길이(length)

- 라벨별 sample count와 rank

 

Schema

Schema 정보를 생성할 수도 있다. 스키마는 컬럼별 자료형과 컬럼 이름을 포함하기도 하지만, 데이터 범위(boundary) 정보도 지닌다.

schema = tfdv.infer_schema(stats) // generates schema protocol
tfdv.display_schema(schema) // display schema

 

오른쪽 시각화 자료에서 Presence란 Nullable과 동일한 의미이다. 데이터가 없어도 되는지, 혹은 꼭 존재해야하는지를 표기하는 구분자이다.

Valency란 학습데이터 example마다 요구되는 값의 수 이다. 즉 카테고리형 변수의 경우 single 이란 1개 값만 존재해야한다는 것이다.

이렇게 TFDV가 자체적으로 추론한 스키마가 내가 정의하는 스키마와 다를 수 있다. 이에 따라 스키마 업데이트에 대한 기능도 제공한다.

아래 예시는 Schema 를 불러들여 sub_issue state라는 컬럼의 스키마를 업데이트하는 코드이다.

sub_issue의 결측치 최대 비율을 0.1로 지정하고, state의 카테고리 값 중에서 'AK'를 제외하는 코드이다.

schema = tfdv.load_schema_text(schema_location)

sub_issue_feature = tfdv.get_feature(schema, 'sub_issue') // 수치형 변수
sub_issue_feature.presence.min_fraction = 0.9
state_domain = tfdv.get_domain(schema, 'state') // 카테고리형 변수
state_domain.value.remove('AK')

tfdv.write_schema_text(schema, schema_location)
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

이렇게 생성된 데이터 스키마와 통계치를 바탕으로 현재 인풋된 데이터에 문제가 있는지 없는지를 판별하는 단계가 필요하다.

학습데이터와 평가데이터 둘 간의 통계를 비교하기도 하는데, 이는 평가데이터가 학습데이터와 얼마나 유사한지를 볼 수 있다.

// TFRecord의 학습/평가 데이터로 통계치를 만들어 visualize하는 코드
train_stats = tfdv.generate_statistics_from_tfrecord(
    data_location=train_tfrecord_filename)
val_stats = tfdv.generate_statistics_from_tfrecord(
    data_location=val_tfrecord_filename)

tfdv.visualize_statistics(lhs_statistics=val_stats, rhs_statistics=train_stats,
                          lhs_name='VAL_DATASET', rhs_name='TRAIN_DATASET')

Train set vs Valid set

 

 

Anomalies

위 시각화 자료로는 구체적으로 얼마만큼 차이가 발생하는지, 어떤 변수에서 차이가 나는지 일일이 확인하기 어려울 수 있다.

따라서 anomaly를 탐색해주는 아래 코드를 통해 구체화해볼 수 있다.

anomalies = tfdv.validate_statistics(statistics=val_stats, schema=schema)

tfdv.display_anomalies(anomalies)

Anomaly Info는 아래와 같이 굉장히 구체적인 수준으로 제공된다.

anomaly_info {
  key: "company"
  value {
    description: "The feature was present in fewer examples than expected."
    severity: ERROR
    short_description: "Column dropped"
    reason {
      type: FEATURE_TYPE_LOW_FRACTION_PRESENT
      short_description: "Column dropped"
      description: "The feature was present in fewer examples than expected."
    }
    path {
      step: "company"
    }
  }
}

Skew and drift

TFDV는 "skew comparator"라는 빌트인 함수를 통해 두 데이터셋 간의 통계치 중 큰 차이를 보이는 점을 발견해낼 수 있도록 해준다.

통계학에서 정의하는 "skew"의 기준을 따르는 것이 아니라, 두 데이터셋의 serving_statistics간 차이를 L-infinity norm으로 본 것이다

* L-inifinity norm이란 두 벡터간의 차이를 표현하는 개념인데, 벡터의 entry중에서 절댓값 중 최대치를 의미한다.

예를 들어 [2,4,-1]과 [9,1,8] 두 벡터간의 L-infinity norm은 두 벡터간 차이인 [-7,3,-9]의 절대최대치인 9가 된다.

company라는 feature에 대해 skew여부를 판단하면 아래와 같은 결과를 추출해낼 수 있다.

tfdv.get_feature(schema,
                 'company').skew_comparator.infinity_norm.threshold = 0.01
skew_anomalies = tfdv.validate_statistics(statistics=train_stats,
                                          schema=schema,
                                          serving_statistics=serving_stats)

TFDV는 drift_comparator라는 함수를 통해 동일한 데이터 타입의 두 데이터셋 통계치를 비교할 수 있게 해준다.

tfdv.get_feature(schema,
                 'company').drift_comparator.infinity_norm.threshold = 0.01
drift_anomalies = tfdv.validate_statistics(statistics=train_stats_today,
                                           schema=schema,
                                           previous_statistics=\
                                               train_stats_yesterday)

L-infinity norm은 drift comparator과 skew comparator 모두 제공된다.

Biased Dataset

여기서 Bias란 실생활 데이터(real world data)를 대표하지 못하는 샘플 데이터를 의미한다.

우리가 수집하는 데이터는 실생활 데이터 전부가 아닌, 그 일부를 가져오는 것이기 때문에, selection bias가 발생할 수 있다.

앞서 살펴본 feature별 시각화 자료나 anomaly protocol로 데이터셋의 bias를 찾아볼 수 있다.

Large Datasets with GCP

규모가 큰 데이터셋의 경우 클라우드 환경에 적재되어있을 수 있다. TFDV는 GCP 환경에서도 동작이 가능하도록 구현되어있다.

기본적으로 앞서 살펴본 함수들 코드 그대로 사용할 수 있지만, Google Cloud Credentials나 temp 혹은 결과 파일이 떨어질 bucket 이름/경로를 지정해주는 등의 몇 가지 추가 사항들이 있다.

from apache_beam.options.pipeline_options import (
    PipelineOptions, GoogleCloudOptions, StandardOptions)

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = '<YOUR_GCP_PROJECT_ID>'
google_cloud_options.job_name = '<YOUR_JOB_NAME>'
google_cloud_options.staging_location = 'gs://<YOUR_GCP_BUCKET>/staging'
google_cloud_options.temp_location = 'gs://<YOUR_GCP_BUCKET>/tmp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

from apache_beam.options.pipeline_options import SetupOptions

setup_options = options.view_as(SetupOptions)
setup_options.extra_packages = [
    '/path/to/tensorflow_data_validation'
    '-0.22.0-cp37-cp37m-manylinux2010_x86_64.whl'] // latest TFDV package

data_set_path = 'gs://<YOUR_GCP_BUCKET>/train_reviews.tfrecord'
output_path = 'gs://<YOUR_GCP_BUCKET>/'
tfdv.generate_statistics_from_tfrecord(data_set_path,
                                       output_path=output_path,
                                       pipeline_options=options)

GCP 서비스 중 Dataflow라는 서비스에 TFDV 라이브러리를 설치하여 동일하게 시각화 자료를 추출하거나 통계치를 생성해볼 수 있다.

Dataflow console
Parallelize and distribute data validation tasks

 

Integrate TFDV into TFX (machine learning pipeline)

지금까지는 TFDV 라이브러리 단독으로 살펴보았다면, 이제는 TFDV를 통한 과정을 기존 머신러닝 파이프라인에 통합하는 이야기를 해볼 것이다.

앞선 글에서 살펴본 StatisticsGen 함수가 기본적인 통계 자료를 제공하는데, 이를 통해 시각화하는 것 역시 가능하다.

SchemaGen 함수는 schema를 생성해준다.

from tfx.components import StatisticsGen
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)
context.show(statistics_gen.outputs['statistics'])


from tfx.components import SchemaGen
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)
context.run(schema_gen)

스키마와 통계치를 생성했다면, ExampleValidator로 데이터셋에 대해 검증을 해볼 수 있다.

이 함수는 기본적으로 우리가 앞서 살펴본 skew나 drift를 탐색하고, anomaly를 찾아주겠지만, 내가 특정한 anomaly조건으로 데이터를 검증하고 싶을 경우 custom component를 만들어 검증할 수 있다.

from tfx.components import ExampleValidator

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

이상 Data Validation에 대해 전반적인 과정을 살펴보았다.

다음 편에서는 데이터 전처리, Data Preprocessing과 관련하여 글을 써내려갈 것이다.


참고자료

https://learning.oreilly.com/library/view/building-machine-learning/9781492053187/

반응형