오늘은 Flink를 Kubernetes에서 사용하는 방법 중 Kubernetes Operator를 사용하는 방법에 대해 알아보고자 한다.
Kubernetes Operator란 공식 문서에 나와 있는 것처럼 custom resource를 사용하여 애플리케이션이나 컴포넌트의 lifecycle을 관리하는 역할을 하는 software extensions이다. Apache 재단에 있는 Flink Kubernetes Operator를 사용하게 되면 JobManager와 TaskManager를 손쉽게 사용할 수 있으며 해당 Operator가 관리하는 CRD(custom resource definition)을 가지고 애플리케이션을 배포할 수 있다.
목차
- Flink Kubernetes Operator 소개
- Architecture
- FlinkDeployment & FlinkSessionJob에 대해
- Application mode VS Session mode
- Autoscaler
- Monitoring
Flink Kubernetes Operator 소개
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/
Flink Kubernetes Operator는 2024년 2월 기준으로 현재 1.7 버전까지 릴리즈가 되었고 Flink 1.18도 사용 가능하다. 해당 Operator를 이용하게 된다면 Application Mode의 경우 FlinkDeployment라는 kind만 이용해서 애플리케이션(JobManager와 TaskManager)을 배포할 수 있고 Session Mode의 경우엔 FlinkDeployment와 FlinkSessionJob을 사용해서 애플리케이션(JobManager와 TaskManager)을 배포할 수 있다. 또한, 통합 모니터링과 autoscaler 등 다양한 기능들을 제공하고 있다.
Operator를 설치하는 방법은 간단하며 해당 repo 를 참고해서 설치를 진행하면 된다.
# cert-manager가 이미 설치된 경우 skip
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.4/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/
# default values로 설치
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
# values를 custom해서 설치
# helm install -f values.yaml flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
하지만 default value로 helm으로 operator를 설치했을 경우 image 오류가 떴었고 values 파일을 아래와 같이 수정해서 진행했을 때 문제가 없었다.
image:
repository: apache/flink-kubernetes-operator
pullPolicy: IfNotPresent
tag: latest
# If image digest is set then it takes precedence and the image tag will be ignored
# digest: ""
imagePullSecrets: []
Architecture
공식 사이트에 나와 있는 것처럼 Flink Kubernetes Operator Architecture는 위처럼 구성되어 있다. 해당 Architecture에서는 Namespace를 분리해서 설명하고 있지만 같은 Namespace에도 배포할 수 있으며 Operator의 경우엔 helm을 이용해서 설치할때 다음과 같은 명령어로 설치를 진행하면 된다.
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/
# default values로 설치
# helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace operator
# values를 custom해서 설치
# helm install -f values.yaml flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace operator
FlinkDeployment & FlinkSessionJob에 대해
FlinkDeployment와 FlinkSessionJob은 Flink Kubernetes Operator에서 제공되는 CRD이다. GitHub를 통해 자세하 리소스 스펙을 확인할 수 있다.
FlinkDeployment와 FlinkSessionJob의 이름과 비슷하게 하는 역할도 비슷한데 FlinkDeployment의 경우 cluster를 구성하기 위해 사용하는 CRD이고 FlinkSessionJob은 Session cluster를 구축했을 때 따로 Job을 제출하기 위한 CRD이다.
Flink Deployment는 아래처럼 작성했을 경우 Application cluster로 구축되게 되고 Application과 Session에 대한 내용은 다음 섹션을 참고하길 바란다. 스펙 참조
spec에는 cluster의 TaskManager와 JobManager의 리소스 스펙을 정의할 수 있고 flinkConfiguration을 통해 checkpoint나 task slot 개수 등 다양하게 설정할 수 있다.
(참고로 taskManager의 Session 모드의 경우 replicas 수는 의도한대로 동작하진 않고 parallelism 총 갯수에 맞춰서 taskManager가 배포된다.)
아래 버전은 cluster의 스펙에 job이 같이 들어간 구조로 되어서 Application으로 동작되게 되고 Session의 경우엔 아래 job을 제거하고 따로 FlinkSessionJob을 구성하면 된다.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: sample
spec:
image: flink:1.18
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "4"
serviceAccount: flink
jobManager:
resource:
memory: "1Gi"
cpu: 1
taskManager:
resource:
memory: "1Gi"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
아래는 Job을 따로 분리할 경우 cluster에 Job을 제출하기 위한 FlinkSessionJob 샘플이다. 스펙 참조
jarURI의 경우 아래처럼 링크로 적용할 수 있고 위에처럼 file을 지정할 수 있다. (하지만 두개 다 테스트 해봤을 때 Application 모드는 local로만 적용할 수 있고 Session 모드의 경우에 링크가 가능하며 file의 경우에는 처음 prefix로 local이 아니라 file로 적용을 해야한다.)
아래 parallelism은 전체 파이프라인의 병렬화 정도를 적용하는 것이며 소스 코드 상에서 정의한 parallelism이 우선순위가 더 높다. 또한, 여기서 주의해야할 점은 deploymentName인데 이는 cluster name가 동일시 해야하며 같은 jar 파일로 여러 개의 job을 띄우려는 경우 jobName도 unique하게 해줘야한다.
추가로 upgradeMode는 해당 링크를 참조하면 되고 아래의 stateless의 경우 상태가 없는 것을 의미하며 재시작할 때도 이전의 상태를 불러오지 않고 시작하는 모드이다. 상태를 저장하기 위해서는 checkpoint 설정이 필요하며 default로는 checkpoint가 비활성화되어 있기 때문에 만약 몇 분 단위나 몇 시간 단위로 데이터 파이프라인의 전체 상태를 저장하길 원한다면 checkpoint를 활용하는 것을 추천한다. (savepoint도 업그레이드 목적으로 1일이나 몇 시간 단위로 주기적으로 해놓는 것이 좋을 수 있다.)
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: sample-job
spec:
deploymentName: sample
job:
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.17.2/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 4
upgradeMode: stateless
Application mode vs Session mode
Flink에서 application을(여기선 하나의 데이터 파이프라인을 의미) 실행시키는 방법에는 총 두가지가 있는데 그것이 이제 Application 방식과 Session 방식이다. 해당 참조
먼저, Application 방식의 경우 application을(여기선 하나의 데이터 파이프라인을 의미하며 job이라고도 부름) 실행할 때 Flink cluster와 job이 같이 실행되게 되고 둘의 lifecycle은 동일하게 따라가게 된다. 따라서 Application 방식은 Flink cluster 하나당 하나의 job만 배포할 수 있고 이는 Session과 비교했을 때 해당 job이 완전 격리가 가능해진다는 것을 의미한다.
반대로 Session 방식의 경우 Flink cluster와 job을 따로 실행시키게 되고 이 둘의 lifecycle은 서로 다르게 된다. Application 방식과는 다르게 Session 방식은 하나의 Flink cluster에 여러 개의 job을 띄울 수 있고 해당 job들은 서로 시스템 자원을 공유하게 된다. (이는 서로 간의 격리가 되지 않음을 의미)
따라서, 둘의 장단점을 비교하자면 아래와 같이 나타낼 수 있다.
Application mode | Session mode | |
pros | - 리소스 격리가 가능하다 - 해당 job만 쉽게 모니터링 할 수 있다. |
- 여러 개의 job을 실행 가능하다. - job을 빠르게 실행할 수 있다. |
cons | - 하나의 job을 실행시키는데 실행 시간이 오래 걸린다.(Flink cluster를 구성하는 시간도 포함) | - 리소스를 공유하기 때문에 job 끼리의 리소스 격리가 불가능하다. - 리소스 공유 특성 때문에 여러 개의 job이 있을 경우 해당 job의 리소스 사용량만 모니터링하기 어렵다. |
AutoScaler
이제까지 Flink Kubernetes Operator의 기본적인 구성 요소에 대해 알아봤으며 이제는 운영 시 고려해야할 확장성에 대해 알아보려고 한다. Flink에서는 AutoScaler라는 기능을 제공해주고 있으며 이는 Operator(데이터 파이프라인을 구성하는 하나의 요소라고 생각하면 됩니다)마다의 parallelism을 조절하는 기능으로 리소스 사용량을 Kubernetes Operator가 주기적으로 체크하다가 기준 선을 넘었을 때 해당 Operator parallelism을 조절하게 된다.
이는 병목 지점이 되는 Operator 구간만 parallelism을 조절해 해소시킬 수 있다는 장점이 있지만, 반대로 의도치 않게 parallelism을 조절해서 예기치 못하는 상황이 발생할 수도 있다는 문제점이 생길 수 있다. 따라서 이를 잘 고려하고 해당 기능을 사용하면 좋을 것 같고 만약 해당 Flink Kubernetes Operator를 안쓰게 된다면 Kubernetes의 HPA를 활용하길 바란다.(이를 Flink Kubernetes Operator에 적용해보려고 했으나 아키텍처상 FlinkDeployment는 JobManager만 scope가 되어있기 때문에 의도한 TaskManager가 늘어나진 않았다. -Session mode에서 사용-)
아래는 AutoScaler를 적용하는 예시이며 해당 내용은 FlinkDeployment에 추가해서 cluster를 구성하면 된다. 아래는 우리가 target utilization을 정의해서 parallelism을 조절할 수 있지만 Flink Kubernetes Operator에서는 알아서 자동으로 리소스 사용량을 보고 조절하는 버전도 존재한다.
flinkConfiguration:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
job.autoscaler.metrics.window: 5m
job.autoscaler.target.utilization: "0.7"
job.autoscaler.target.utilization.boundary: "0.1"
job.autoscaler.restart.time: 1m
pipeline.max-parallelism: "720"
아래는 위에서 언급한 자동으로 조절하는 버전이며 여기선 AutoScaler StandAlone이라고 부르고 있다.
flinkconfiguration:
jobmanager.scheduler : adaptive
job.autoscaler.enabled : true
job.autoscaler.scaling.enabled : true
job.autoscaler.stabilization.interval : 1m
job.autoscaler.metrics.window : 5m
job.autoscaler.restart.time: 1m
‼️ 참고로 autoscaler를 활용하기 위해서는 checkpoint 활성화는 필수
Monitoring
마지막으론 이제 monitoring 설정에 대해 다뤄보려고 한다.
Flink Kubernetes Operator에서는 편리하게 Prometheus와 Grafana로 모니터링 할 수 있는 가이드를 제공하고 있으며 다양한 Metric 요소도 존재하고 있다. 해당 참조
아래 Flink 공식 블로그에서 어떤 걸 모니터링하면 좋을 지 설명하고 있으니 참고하면 좋을 것 같다.
https://flink.apache.org/2019/02/21/monitoring-apache-flink-applications-101/
기본적으로 제공하고 있는 기능이라 FlinkDeployment에서 아래 요소만 찾아서 수정해주면 된다.
flinkConfiguration:
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
TaskManager에도 9249 port를 설정하기 위해 아래처럼 podTemplate을 재정의하였고 timezone을 맞추기 위해 한국 timezone을 컨테이너에 마운팅해주었다.
spec:
podTemplate:
spec:
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2Gi
limits:
ephemeral-storage: 2Gi
volumeMounts:
- mountPath: /etc/localtime
name: timezone-config
ports:
- containerPort: 9249
name: metrics
protocol: TCP
volumes:
- name: timezone-config
hostPath:
path: /usr/share/zoneinfo/Asia/Seoul
jobManager:
replicas: 1
resource:
memory: "1Gi"
cpu: 0.5
taskManager:
replicas: 2
resource:
memory: "1Gi"
cpu: 1
그 다음 아래의 helm charts를 설치해주면 되고 설정이 필요한 경우엔 마찬가지로 values를 수정해서 -f values.yaml을 커맨드에 추가해주면 된다.
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack
추가적으로 아래 yaml 만들어서 metric에 대한 selector르 만들어야 prometheus가 metric을 가져올 수 있다.
apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: flink-cluster-metrics
labels:
release: prometheus
spec:
selector:
matchLabels:
app: flink-cluster
podMetricsEndpoints:
- port: metrics
이번 글에서는 전체적으로 Flink Kubernetes Operator 요소와 활용 방법에 대해 알아보았고 다음에는 Flink 개념에 대한 글을 더 써보려고 한다. 현재 기준으로 Flink는 1.19버전이 나왔고 Flink Kubernetes Operator는 1.8버전이 나왔다.(기존 1.7버전에는 Autoscaler가 Session mode에선 동작되지 않았는데 그게 1.8버전부터 지원이 가능하다고 한다.) 이렇듯 Flink는 계속해서 더 많은 기능을 지원하려고 노력하고 있고 실시간 데이터 처리 파트에서 높은 선호도를 보이고 있는 것 같다.
다음에는 클라우드 환경에서 Flink 서비스를 활용해보기 위해 AWS 상에서 Flink를 활용해보고 글을 작성해서 가져와보려고 한다.
참고문헌
https://github.com/apache/flink-kubernetes-operator/tree/main
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/
'AI & Data' 카테고리의 다른 글
Apache Flink에 대해.. (1) | 2024.01.24 |
---|---|
비지도 학습 (0) | 2021.11.07 |
Matplotlib 기초 학습 내용 (0) | 2021.10.30 |
머신러닝 기초 학습 내용 (0) | 2021.10.29 |
pandas 기초 학습 내용 (0) | 2021.10.29 |