
Kubeflow Pipeline(KFP)과 KServe SDK는 ML 모델 학습 및 배포를 자동화하는 데 사용됩니다.
- Kubeflow Pipeline(KFP) → ML Workflow를 정의하고 실행
- KServe SDK → 모델을 KServe를 통해 배포
이 두 개를 연동하면, 모델을 학습하고 KServe를 통해 자동으로 배포하는 파이프라인을 만들 수 있습니다.
Kubeflow Pipeline 기본 개념
Kubeflow Pipeline을 정의하는 주요 구성 요소는 다음과 같습니다.
코드 예시
from kfp import dsl
from kfp.components import create_component_from_func
# a + b 연산을 수행하는 함수
def add_numbers(a: int, b: int) -> int:
return a + b
# Kubeflow Component로 변환
add_op = create_component_from_func(add_numbers)
# Kubeflow Pipeline 정의
@dsl.pipeline(
name="Addition Pipeline",
description="Adds two numbers"
)
def add_pipeline(a: int = 3, b: int = 5):
add_task = add_op(a, b)
# Pipeline 실행을 위한 YAML 파일 생성
if __name__ == "__main__":
from kfp.compiler import Compiler
Compiler().compile(add_pipeline, "add_pipeline.yaml")
@dsl.pipeline
파이프라인을 정의하는 데코레이터
- 이 코드는 Kubeflow가 파이프라인을 인식하도록 해줍니다.
- add_pipeline() 함수 안에서 실행될 작업을 DAG(Directed Acyclic Graph) 형태로 정의합니다.
@dsl.pipeline(
name="Addition Pipeline",
description="Adds two numbers"
)
def add_pipeline(a: int = 3, b: int = 5):
add_task = add_op(a, b)
➡️ 의미: add_pipeline()이 실행되면 add_numbers(a, b) 연산이 수행됨.
def add_numbers(a, b)
Kubeflow에서 실행할 연산을 정의
- 여기서 a + b를 수행하는 함수가 정의됩니다.
def add_numbers(a: int, b: int) -> int:
return a + b
➡️ 의미: a와 b를 입력받아 합계를 반환하는 단순한 함수.
create_component_from_func
함수를 Kubeflow Component로 변환
- create_component_from_func()를 사용하면 일반 Python 함수를 Kubeflow의 Task로 등록할 수 있습니다.
add_op = create_component_from_func(add_numbers)
➡️ 의미: add_numbers()를 Kubeflow에서 실행 가능한 컴포넌트로 변환.
Compiler().compile
Kubeflow Pipeline 실행을 위한 YAML 파일 생성
- add_pipeline.yaml이라는 YAML 파일을 생성하여, Kubeflow에서 실행할 수 있도록 함.
from kfp.compiler import Compiler
Compiler().compile(add_pipeline, "add_pipeline.yaml")
➡️ 의미: Kubeflow UI에서 이 YAML을 업로드하면 파이프라인을 실행 가능.
kfp.Client()
Kubeflow 서버에 Pipeline 실행 요청
- kfp.Client()를 사용하면 Kubeflow 서버에 직접 Pipeline을 실행할 수 있습니다.
client = kfp.Client()
client.create_run_from_pipeline_func(
add_pipeline, arguments={"a": 10, "b": 20}
)
➡️ 의미: a=10, b=20 값을 사용하여 파이프라인을 실행.
✅ 정리
코드역할
@dsl.pipeline | 파이프라인을 정의하는 데코레이터 |
def add_numbers(a, b) | a + b 연산을 수행하는 함수 |
create_component_from_func | 함수를 Kubeflow Component로 변환 |
Compiler().compile() | Pipeline을 실행 가능한 YAML 파일로 변환 |
kfp.Client() | Kubeflow 서버에 실행 요청 |
컴포넌트 추가
아래는 Kubeflow Pipeline에서 a + b를 구한 후 그 값을 c로 나누는 두 개의 컴포넌트를 추가한 예제입니다.
- 첫 번째 컴포넌트 (add_numbers) → a + b 계산
- 두 번째 컴포넌트 (divide_result) → (a + b) / c 계산
a + b를 계산 후 c로 나누는 Kubeflow Pipeline
from kfp import dsl
from kfp.components import create_component_from_func
# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
return a + b
# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
return sum_value / c if c != 0 else float('inf') # 0으로 나누기 방지
# (3) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)
# (4) Kubeflow Pipeline 정의
@dsl.pipeline(
name="Addition and Division Pipeline",
description="Adds two numbers and divides the result by a third number"
)
def add_divide_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0):
add_task = add_op(a, b) # a + b 수행
divide_task = divide_op(add_task.output, c) # (a + b) / c 수행
# (5) 파이프라인 YAML 생성
if __name__ == "__main__":
from kfp.compiler import Compiler
Compiler().compile(add_divide_pipeline, "add_divide_pipeline.yaml")
✅ 설명
- add_numbers(a, b) → a + b를 계산
- divide_result(sum_value, c) → sum_value / c 수행
병렬 처리 추가
아래는 Kubeflow Pipeline에서 a + b를 계산한 후,
- 동시에 c로 나누기 (divide_result)
- 동시에 d로 곱하기 (multiply_result)
를 수행하도록 만든 최종 코드입니다.
✅ 최종 코드: a + b를 구한 후 c로 나누고, 동시에 d로 곱하는 Kubeflow Pipeline
from kfp import dsl
from kfp.components import create_component_from_func
# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
return a + b
# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
return sum_value / c if c != 0 else float('inf') # 0으로 나누기 방지
# (3) (a + b) 값을 받아서 d로 곱하는 함수
def multiply_result(sum_value: float, d: float) -> float:
return sum_value * d
# (4) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)
multiply_op = create_component_from_func(multiply_result)
# (5) Kubeflow Pipeline 정의
@dsl.pipeline(
name="Addition, Division, and Multiplication Pipeline",
description="Adds two numbers, then divides by c and multiplies by d in parallel"
)
def add_divide_multiply_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0, d: float = 2.0):
add_task = add_op(a, b) # a + b 수행
# (a + b) 값을 받은 후, 동시에 두 개의 연산을 수행
divide_task = divide_op(sum_value=add_task.output, c=c) # (a + b) / c 수행
multiply_task = multiply_op(sum_value=add_task.output, d=d) # (a + b) * d 수행
# (6) 파이프라인 YAML 생성
if __name__ == "__main__":
from kfp.compiler import Compiler
Compiler().compile(add_divide_multiply_pipeline, "add_divide_multiply_pipeline.yaml")
✅ 설명
- add_numbers(a, b) → a + b를 계산
- divide_result(sum_value, c) → sum_value / c 수행
- multiply_result(sum_value, d) → sum_value * d 수행
- Pipeline에서 add_task.output을 동시에 두 개의 컴포넌트에 전달
divide_task = divide_op(sum_value=add_task.output, c=c)
multiply_task = multiply_op(sum_value=add_task.output, d=d)
add_task.output이 divide_task와 multiply_task에 동시에 전달됨
+---------------------+
| add_numbers(a, b) | --> (a + b)
+---------------------+
|
+-------------+-------------+
| |
+--v------------------+ +----------------------+
| divide_result(sum, c) | | multiply_result(sum, d) |
+----------------------+ +----------------------+
(a + b) / c 실행 (a + b) * d 실행
아래는 Kubeflow Pipeline에서 a + b를 구한 후,
- 동시에 c로 나누기 (divide_result)
- 동시에 d로 곱하기 (multiply_result)
- 그 후 divide_result와 multiply_result의 결과를 다시 더하는 (sum_results)
✅ 병렬처리 결과 취합하기
✅ 최종 코드: a + b → (c로 나누기 + d로 곱하기) → 결과를 다시 더하기
from kfp import dsl
from kfp.components import create_component_from_func
# (1) a + b 연산을 수행하는 함수
def add_numbers(a: float, b: float) -> float:
return a + b
# (2) (a + b) 값을 받아서 c로 나누는 함수
def divide_result(sum_value: float, c: float) -> float:
return sum_value / c if c != 0 else float('inf') # 0으로 나누기 방지
# (3) (a + b) 값을 받아서 d로 곱하는 함수
def multiply_result(sum_value: float, d: float) -> float:
return sum_value * d
# (4) (a + b) / c 결과와 (a + b) * d 결과를 더하는 함수
def sum_results(divide_output: float, multiply_output: float) -> float:
return divide_output + multiply_output
# (5) Kubeflow 컴포넌트로 변환
add_op = create_component_from_func(add_numbers)
divide_op = create_component_from_func(divide_result)
multiply_op = create_component_from_func(multiply_result)
sum_op = create_component_from_func(sum_results)
# (6) Kubeflow Pipeline 정의
@dsl.pipeline(
name="Full Computation Pipeline",
description="Computes (a + b), then does (a + b)/c and (a + b)*d in parallel, then sums the results"
)
def full_pipeline(a: float = 10.0, b: float = 20.0, c: float = 5.0, d: float = 2.0):
add_task = add_op(a, b) # (a + b) 수행
# (a + b) 값을 받은 후, 동시에 두 개의 연산을 수행
divide_task = divide_op(sum_value=add_task.output, c=c) # (a + b) / c 수행
multiply_task = multiply_op(sum_value=add_task.output, d=d) # (a + b) * d 수행
# divide_task와 multiply_task가 끝난 후, 결과를 더하는 작업 수행
sum_task = sum_op(divide_output=divide_task.output, multiply_output=multiply_task.output)
# (7) 파이프라인 YAML 생성
if __name__ == "__main__":
from kfp.compiler import Compiler
Compiler().compile(full_pipeline, "full_pipeline.yaml")
파이프라인 실행 방법
위의 파이썬 파일을 실행하면 full-computation-pipeline.yaml 파일이 생성됨
해당 yaml 파일을 업로드하여 pipeline을 실행

'AI > Kubeflow' 카테고리의 다른 글
Kubeflow 구성요소(Pipeline , experiment , runs , component) (0) | 2025.03.07 |
---|