Julie의 Tech 블로그

MLOps, 머신러닝 파이프라인 설계 - (6) Model Training 본문

Tech/MLOps

MLOps, 머신러닝 파이프라인 설계 - (6) Model Training

Julie's tech 2022. 1. 10. 00:35
728x90

본 글은 MLOps에서 가장 꽃이라고 할 수 있는 단계, 모델 학습에 대해 살펴볼 것이다.

지난 글까지는 데이터를 수집 및 검증, 전처리 단계까지 살펴보았었다.

본격적으로 시작하기에 앞서, 이 글은 머신러닝 모델 학습에 관한 설명글이 아니며,

모델 학습과정을 MLOps 서비스를 통해 자동화할 수 있는 방법에 대해 다룰 것이다.

앞서 살펴본 데이터 수집, 검증, 전처리 단계를 통해 모델 학습에 필요한 형태로 데이터가 변형되어 준비되어있다고 생각하자.

그리고 우리는 모델이 이미 사전에 정의되어 구현된 상태로 설명을 시작할 것이다. 아래는 예시 코드인데,

예시는 Keras를 사용하여 텍스트를 처리하는 모델이고, Tensorflow Hub에 등록된 기학습된 모델에서 Transfer Learning하여 구현하였다.

import tensorflow as tf
import tensorflow_hub as hub

def get_model():

    # One-hot 카테고리 변수
    input_features = []
    for key, dim in ONE_HOT_FEATURES.items(): 
        input_features.append(
            tf.keras.Input(shape=(dim + 1,),
                           name=transformed_name(key)))

    # 버킷화된(Bucketized) 변수
    for key, dim in BUCKET_FEATURES.items():
        input_features.append(
            tf.keras.Input(shape=(dim + 1,),
                           name=transformed_name(key)))

    # 텍스트 변수
    input_texts = []
    for key in TEXT_FEATURES.keys():
        input_texts.append(
            tf.keras.Input(shape=(1,),
                           name=transformed_name(key),
                           dtype=tf.string))

    inputs = input_features + input_texts

    # Embed text features
    MODULE_URL = "https://tfhub.dev/google/universal-sentence-encoder/4"
    embed = hub.KerasLayer(MODULE_URL) 
    reshaped_narrative = tf.reshape(input_texts[0], [-1]) 
    embed_narrative = embed(reshaped_narrative)
    ff = tf.keras.layers.Reshape((512, ), input_shape=(1, 512))(embed_narrative)
    dense = tf.keras.layers.Dense(256, activation='relu')(ff)

    output = tf.keras.layers.Dense(1, activation='sigmoid')(dense)
    keras_model = tf.keras.models.Model(inputs, output) 

    keras_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                        loss='binary_crossentropy',
                        metrics=[
                            tf.keras.metrics.BinaryAccuracy()
                        ])
    return keras_model

위와 같이 모델을 정의하고 나면, TFX의 Trainer Component로 녹일 수 있어야한다.

이전 글인 전처리 파트에서 preprocessing_fn 을 정의하였듯이, Trainer Component에서도 run_fn 함수를 정의해야한다.

Trainer Component는 수행시에 모듈 파일 내에 정의된 run_fn 함수를 탐색하여 학습 과정을 시작하기 위한 시작점을 파악한다.

일반적으로 아래와 같은 순서로 구성된다.

1) 학습, 평가 데이터를 로드한다.

2) 모델 아키텍쳐를 정의하고, 모델을 컴파일한다

3) 모델을 학습한다

4) 다음 파이프라인 단계에서 평가될 모델을 export한다

이 함수의 예시는 아래와 같다. (위에서 정의한 예시 모델을 활용한 코드)

 

def run_fn(fn_args):

    # 로드할 데이터를 정의한다
    ## 여기서 등장하는 input_fn 역시 사전에 정의된 함수로, 배치 크기를 지정하여 배치사이즈만큼 로드한다
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    train_dataset = input_fn(fn_args.train_files, tf_transform_output)
    eval_dataset = input_fn(fn_args.eval_files, tf_transform_output)

    # 모델 아키텍쳐를 불러와 컴파일한다
    model = get_model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps) 

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(
                model,
                tf_transform_output).get_concrete_function(
                    tf.TensorSpec(
                        shape=[None],
                        dtype=tf.string,
                        name='examples')
                )
    } 
    model.save(fn_args.serving_model_dir,
               save_format='tf', signatures=signatures)

모델의 signature를 정의하는 부분에 있어 메인 함수인 get_serve_tf_examples_fn 함수의 코드에 대해서도 살펴보자.

해당 함수는 모델이 export되어 deploy되었을 때, 매 예측 request마다 해당 함수로 정의된 것에 따라 raw데이터를 전처리하는 과정을 적용시켜 모델에 전달하여 예측값을 만들어내게 된다.

def get_serve_tf_examples_fn(model, tf_transform_output):

    # 전처리 단계의 output인 preprocessing graph를 로드한다
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        # raw한 tf.Example 데이터를 파싱한다 
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, feature_spec) 

        # 전처리를 한다
        transformed_features = model.tft_layer(parsed_features)
        # 모델에 전처리된 데이터를 input으로 주어 예측값을 받는다
        outputs = model(transformed_features) 
        return {'outputs': outputs}

    return serve_tf_examples_fn

위와 같이, Trainer Component는 아래를 input으로 받게 된다.

- Python 모듈 파일 (run_fn, input_fn, get_serve_tf_examples_fn 등)

- Transform Component에서 발생된 전처리된 데이터

- Transform Component에서 발생된 transform graph

- Validation Component에서 정의된 데이터 스키마

- 학습과 평가 단계(steps) 값 (cf. Trainer Component는 학습을 epoch단위로 하는게 아니라 단계(Steps) 단위로 진행)

코드로 표현하면 아래와 같다.

from tfx.components import Trainer
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor 
from tfx.proto import trainer_pb2

TRAINING_STEPS = 1000
EVALUATION_STEPS = 100

trainer = Trainer(
    ## 사전에 정의한 Python 모듈 파일 경로를 지정
    module_file=os.path.abspath("module.py"),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    ## 앞서 설명한 Trainer Component의 필요 input 항목들 정의 
    transformed_examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=TRAINING_STEPS),
    eval_args=trainer_pb2.EvalArgs(num_steps=EVALUATION_STEPS))

이후 아래와 같은 명령어로 간단하게 Trainer Component를 단독으로 수행(run)해볼 수 있다.

* 참고로 위에서 default executor가 아닌 GenericExecutor를 사용한 이유는 두 가지인데, default trainer_fn 함수 대신 run_fn 을 정의하였기 때문에, 해당 함수로 모델을 학습하고자 하기 위함이고, 두 번째는 해당 executor는 GCP AI Platform 환경에서 모델을 학습하게 된다. 만약 모델이 GPU나 TPUs와 같은 학습에 필요한 특정 하드웨어를 필요로 한다면, 내 로컬 환경이 아닌 클라우드 환경에서 학습할 수 있기 때문에 유용하다.

context.run(trainer)

Trainer Component는 위의 예시처럼 학습 하나만을 위해 사용될 수 있는 것은 아니고, 이전의 run과정에서 모델을 fine-tuning하기 위해 다시 사용하거나, 여러 모델을 동시에 학습할 때 사용할 수도 있다.

TensorBoard를 활용하여 모델의 학습 과정을 대시보드로 볼 수도 있다.

TensorBoard를 활용하기 위해서는 run_fn 함수에 callback 코드를 추가해야한다.

log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir=log_dir, update_freq='batch')

모델 학습하는 코드 구간에도 callback 추가가 필요하다.

model.fit(
    train_dataset,
    steps_per_epoch=fn_args.train_steps,
    validation_data=eval_dataset,
    validation_steps=fn_args.eval_steps,
    callbacks=[tensorboard_callback])

TensorBoard를 노트북 환경에서 조회하고 싶다면, 모델 학습 로그에서 location을 얻어 TensorBoard로 그 값을 입력하면 된다.

model_dir = trainer.outputs['output'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_dir} ## 이후 http://localhost:6006/ 을 접속하면 TensorBoard 조회 가능

마지막으로 TFX 파이프라인에서 모델의 학습 파라미터를 fine-tuning하는 방법에 대해 다뤄보겠다.

TFX 파이프라인에서는 Tranform에서 넘어온 데이터를 가장 최적의 하이퍼파라미터 값을 찾기 위해 학습하게 된다.

이 경우, 모델 정의하는 함수에서 하이퍼파라미터를 input 값으로 넣어주어야한다.

또는 TFX Tuner Component를 활용하여 fine-tuning을 적용해볼 수도 있다.

다음 글은 모델 분석과 모델을 평가하는 Component에 대해 다뤄볼 것이다.

반응형