AI/Kubeflow

Kubeflow Pipeline, KServe SDK

칼쵸쵸 2025. 3. 7. 14:37
반응형

 

Kubeflow Pipeline(KFP)과 KServe SDK는 ML 모델 학습 및 배포를 자동화하는 데 사용됩니다.

  • Kubeflow Pipeline(KFP) → ML Workflow를 정의하고 실행
  • KServe SDK → 모델을 KServe를 통해 배포

이 두 개를 연동하면, 모델을 학습하고 KServe를 통해 자동으로 배포하는 파이프라인을 만들 수 있습니다.

 

Kubeflow Pipeline 기본 개념

Kubeflow Pipeline을 정의하는 주요 구성 요소는 다음과 같습니다.

 

코드 예시

from kfp import dsl
from kfp.components import create_component_from_func

# a + b 연산을 수행하는 함수
def add_numbers(a: int, b: int) -> int:
    return a + b

# Kubeflow Component로 변환
add_op = create_component_from_func(add_numbers)

# Kubeflow Pipeline 정의
@dsl.pipeline(
    name="Addition Pipeline",
    description="Adds two numbers"
)
def add_pipeline(a: int = 3, b: int = 5):
    add_task = add_op(a, b)

# Pipeline 실행을 위한 YAML 파일 생성
if __name__ == "__main__":
    from kfp.compiler import Compiler
    Compiler().compile(add_pipeline, "add_pipeline.yaml")

 

@dsl.pipeline

파이프라인을 정의하는 데코레이터

  • 이 코드는 Kubeflow가 파이프라인을 인식하도록 해줍니다.
  • add_pipeline() 함수 안에서 실행될 작업을 DAG(Directed Acyclic Graph) 형태로 정의합니다.
@dsl.pipeline(
    name="Addition Pipeline",
    description="Adds two numbers"
)
def add_pipeline(a: int = 3, b: int = 5):
    add_task = add_op(a, b)

 

➡️ 의미: add_pipeline()이 실행되면 add_numbers(a, b) 연산이 수행됨.

 

def add_numbers(a, b)

Kubeflow에서 실행할 연산을 정의

  • 여기서 a + b를 수행하는 함수가 정의됩니다.
 
def add_numbers(a: int, b: int) -> int:
    return a + b

➡️ 의미: a와 b를 입력받아 합계를 반환하는 단순한 함수.

 

create_component_from_func

함수를 Kubeflow Component로 변환

  • create_component_from_func()를 사용하면 일반 Python 함수를 Kubeflow의 Task로 등록할 수 있습니다.
add_op = create_component_from_func(add_numbers)

➡️ 의미: add_numbers()를 Kubeflow에서 실행 가능한 컴포넌트로 변환.

 

Compiler().compile

Kubeflow Pipeline 실행을 위한 YAML 파일 생성

  • add_pipeline.yaml이라는 YAML 파일을 생성하여, Kubeflow에서 실행할 수 있도록 함.
from kfp.compiler import Compiler
Compiler().compile(add_pipeline, "add_pipeline.yaml")

 

➡️ 의미: Kubeflow UI에서 이 YAML을 업로드하면 파이프라인을 실행 가능.

 

 

kfp.Client()

Kubeflow 서버에 Pipeline 실행 요청

  • kfp.Client()를 사용하면 Kubeflow 서버에 직접 Pipeline을 실행할 수 있습니다.
client = kfp.Client()
client.create_run_from_pipeline_func(
    add_pipeline, arguments={"a": 10, "b": 20}
)

 

➡️ 의미: a=10, b=20 값을 사용하여 파이프라인을 실행.

 

정리

코드역할

@dsl.pipeline 파이프라인을 정의하는 데코레이터
def add_numbers(a, b) a + b 연산을 수행하는 함수
create_component_from_func 함수를 Kubeflow Component로 변환
Compiler().compile() Pipeline을 실행 가능한 YAML 파일로 변환
kfp.Client() Kubeflow 서버에 실행 요청

 

 

 

컴포넌트 추가

 

아래는 Kubeflow Pipeline에서 a + b를 구한 후 그 값을 c로 나누는 두 개의 컴포넌트를 추가한 예제입니다.

  • 첫 번째 컴포넌트 (add_numbers) → a + b 계산
  • 두 번째 컴포넌트 (divide_result) → (a + b) / c 계산

a + b를 계산 후 c로 나누는 Kubeflow Pipeline

 

from kfp import dsl
from kfp.components import create_component_from_func

# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
    return a + b

# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
    return sum_value / c if c != 0 else float('inf')  # 0으로 나누기 방지

# (3) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)

# (4) Kubeflow Pipeline 정의
@dsl.pipeline(
    name="Addition and Division Pipeline",
    description="Adds two numbers and divides the result by a third number"
)
def add_divide_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0):
    add_task = add_op(a, b)  # a + b 수행
    divide_task = divide_op(add_task.output, c)  # (a + b) / c 수행

# (5) 파이프라인 YAML 생성
if __name__ == "__main__":
    from kfp.compiler import Compiler
    Compiler().compile(add_divide_pipeline, "add_divide_pipeline.yaml")

 

 설명

  1. add_numbers(a, b) → a + b를 계산
  2. divide_result(sum_value, c) → sum_value / c 수행

 

병렬 처리 추가

아래는 Kubeflow Pipeline에서 a + b를 계산한 후,

  • 동시에 c로 나누기 (divide_result)
  • 동시에 d로 곱하기 (multiply_result)

를 수행하도록 만든 최종 코드입니다.

 

최종 코드: a + b를 구한 후 c로 나누고, 동시에 d로 곱하는 Kubeflow Pipeline

from kfp import dsl
from kfp.components import create_component_from_func

# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
    return a + b

# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
    return sum_value / c if c != 0 else float('inf')  # 0으로 나누기 방지

# (3) (a + b) 값을 받아서 d로 곱하는 함수
def multiply_result(sum_value: float, d: float) -> float:
    return sum_value * d

# (4) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)
multiply_op = create_component_from_func(multiply_result)

# (5) Kubeflow Pipeline 정의
@dsl.pipeline(
    name="Addition, Division, and Multiplication Pipeline",
    description="Adds two numbers, then divides by c and multiplies by d in parallel"
)
def add_divide_multiply_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0, d: float = 2.0):
    add_task = add_op(a, b)  # a + b 수행

    # (a + b) 값을 받은 후, 동시에 두 개의 연산을 수행
    divide_task = divide_op(sum_value=add_task.output, c=c)  # (a + b) / c 수행
    multiply_task = multiply_op(sum_value=add_task.output, d=d)  # (a + b) * d 수행

# (6) 파이프라인 YAML 생성
if __name__ == "__main__":
    from kfp.compiler import Compiler
    Compiler().compile(add_divide_multiply_pipeline, "add_divide_multiply_pipeline.yaml")

 

설명

  1. add_numbers(a, b) → a + b를 계산
  2. divide_result(sum_value, c) → sum_value / c 수행
  3. multiply_result(sum_value, d) → sum_value * d 수행
  4. Pipeline에서 add_task.output을 동시에 두 개의 컴포넌트에 전달
     
divide_task = divide_op(sum_value=add_task.output, c=c)
multiply_task = multiply_op(sum_value=add_task.output, d=d)

 

 

add_task.output이 divide_task와 multiply_task에 동시에 전달

 

     +---------------------+
     |  add_numbers(a, b)  |  --> (a + b)
     +---------------------+
                 |
   +-------------+-------------+
   |                           |
+--v------------------+   +----------------------+
| divide_result(sum, c) |   | multiply_result(sum, d) |
+----------------------+   +----------------------+
   (a + b) / c 실행            (a + b) * d 실행

 

 

아래는 Kubeflow Pipeline에서 a + b를 구한 후,

  • 동시에 c로 나누기 (divide_result)
  • 동시에 d로 곱하기 (multiply_result)
  • 그 후 divide_result와 multiply_result의 결과를 다시 더하는 (sum_results)

✅ 병렬처리 결과 취합하기

최종 코드: a + b → (c로 나누기 + d로 곱하기) → 결과를 다시 더하기

from kfp import dsl
from kfp.components import create_component_from_func

# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
    return a + b

# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
    return sum_value / c if c != 0 else float('inf')  # 0으로 나누기 방지

# (3) (a + b) 값을 받아서 d로 곱하는 함수
def multiply_result(sum_value: float, d: float) -> float:
    return sum_value * d

# (4) (a + b) / c 결과와 (a + b) * d 결과를 더하는 함수
def sum_results(divide_output: float, multiply_output: float) -> float:
    return divide_output + multiply_output

# (5) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)
multiply_op = create_component_from_func(multiply_result)
sum_op = create_component_from_func(sum_results)

# (6) Kubeflow Pipeline 정의
@dsl.pipeline(
    name="Full Computation Pipeline",
    description="Computes (a + b), then does (a + b)/c and (a + b)*d in parallel, then sums the results"
)
def full_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0, d: float = 2.0):
    add_task = add_op(a, b)  # (a + b) 수행

    # (a + b) 값을 받은 후, 동시에 두 개의 연산을 수행
    divide_task = divide_op(sum_value=add_task.output, c=c)  # (a + b) / c 수행
    multiply_task = multiply_op(sum_value=add_task.output, d=d)  # (a + b) * d 수행

    # divide_task와 multiply_task가 끝난 후, 결과를 더하는 작업 수행
    sum_task = sum_op(divide_output=divide_task.output, multiply_output=multiply_task.output)

# (7) 파이프라인 YAML 생성
if __name__ == "__main__":
    from kfp.compiler import Compiler
    Compiler().compile(full_pipeline, "full_pipeline.yaml")

 

 

파이프라인 실행 방법

위의 파이썬 파일을 실행하면 full-computation-pipeline.yaml 파일이 생성됨

해당 yaml 파일을 업로드하여 pipeline을 실행

반응형

'AI > Kubeflow' 카테고리의 다른 글

Kubeflow 구성요소(Pipeline , experiment , runs , component)  (0) 2025.03.07