Skip to content

Commit 4fc9f57

Browse files
authored
Merge pull request #43 from showuon/ha
add Flink HA document and example
2 parents acb7b0a + d8e06b4 commit 4fc9f57

File tree

4 files changed

+439
-0
lines changed

4 files changed

+439
-0
lines changed

ha-example/README.md

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# Flink cluster High Availability configuration
2+
3+
A Flink cluster deployment consists of operators, job managers, and task managers.
4+
However, the deployment is not configured to ensure high availability (HA) by default, additional configuration is needed to achieve it.
5+
6+
## Flink Kubernetes Operator
7+
8+
The Flink Kubernetes operator manages the Flink cluster Deployments.
9+
It supports high availability through adding a standby operator instance and using leader election functionality to ensure that only one instance is _in charge_ of the Flink cluster Deployments at any one time.
10+
To enable leader election, we need to add the following two mandatory parameters to the Kubernetes operator's configuration.
11+
12+
```
13+
kubernetes.operator.leader-election.enabled: true
14+
kubernetes.operator.leader-election.lease-name: flink-operator-lease
15+
```
16+
The lease name must be unique in the deployed namespace.
17+
18+
When installing the Flink Kubernetes operator using `helm`, we need to add these 2 parameters into the existing default configuration string.
19+
This can be done via the command line using the `--set` flag:
20+
```
21+
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
22+
--set podSecurityContext=null \
23+
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
24+
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
25+
--set replicas=2 \
26+
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.leader-election.enabled:\ true
27+
kubernetes.operator.leader-election.lease-name:\ flink-operator-lease" \
28+
-n flink
29+
```
30+
Note: The `replicas` configuration is to set to the total number of Flink Kubernetes operator replicas you want.
31+
This will include the leader plus the number of standby instances you want.
32+
One standby (two replicas in total) is usually sufficient.
33+
34+
After running this command, you should see that there are two instances of the Flink Kubernetes operator running.
35+
```
36+
kubectl get pod -n flink | grep flink-kubernetes-operator
37+
NAME READY STATUS RESTARTS AGE
38+
flink-kubernetes-operator-6cd86cc8-fmb2x 2/2 Running 0 3h
39+
flink-kubernetes-operator-6cd86cc8-g298v 2/2 Running 0 3h
40+
```
41+
42+
And when checking the `lease` resource, we can see the holder(leader) operator's name.
43+
```
44+
kubectl get lease -n flink
45+
NAME HOLDER AGE
46+
flink-operator-lease flink-kubernetes-operator-6cd86cc8-g298v 45h
47+
```
48+
49+
You can test the high availability fail-over between the instances, by deleting the leader operator pod.
50+
You should then see that the holder will change to the other instance (or the newly created instance if the creation happens quickly).
51+
52+
## Flink Job Manager
53+
54+
The Job Manager ensures consistency during recovery across Task Managers.
55+
For the Job Manager itself to recover consistently,
56+
an external service must store a minimal amount of recovery metadata (like “ID of last committed checkpoint”),
57+
as well as information needed to elect and lock which Job Manager is the leader (to avoid split-brain situations).
58+
59+
In order to configure Job Managers in your Flink Cluster for high availability you need to add the following settings to the configuration in your `FlinkDeployment` CR like this:
60+
```yaml
61+
apiVersion: flink.apache.org/v1beta1
62+
kind: FlinkDeployment
63+
metadata:
64+
name: recommendation-app
65+
spec:
66+
image: quay.io/streamshub/flink-sql-runner:latest
67+
flinkConfiguration:
68+
# job manager HA settings
69+
high-availability.type: KUBERNETES
70+
high-availability.storageDir: s3://test/ha
71+
```
72+
73+
## Task Manager
74+
75+
[Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/) is Flink’s primary fault-tolerance mechanism, wherein a snapshot of your job’s state is persisted periodically to some durable location.
76+
In the case of failure, of a Task running your job's code, Flink will restart the Task from the most recent checkpoint and resume processing.
77+
Although not strictly related to HA of the Flink cluster, it is important to enable checkpointing in production deployments to ensure fault tolerance.
78+
By default, the checkpointing is not enabled.
79+
You can enable it by setting the checkpointing interval in the `FlinkDeployment` CR like this:
80+
```yaml
81+
apiVersion: flink.apache.org/v1beta1
82+
kind: FlinkDeployment
83+
metadata:
84+
name: recommendation-app
85+
spec:
86+
image: quay.io/streamshub/flink-sql-runner:latest
87+
flinkConfiguration:
88+
# job manager HA settings
89+
execution.checkpointing.interval: 1min
90+
state.checkpoints.dir: s3://test/cp
91+
```
92+
The settings above will checkpoint the Task state every 1 minute under the s3 path provided.
93+
94+
## Example: Making the `recommendation-app` fault tolerant and highly available
95+
96+
Here, we will use the [recommendation-app](../recommendation-app) as an example to demonstrate the job manager HA.
97+
98+
1. Installing Flink Kubernetes Operator with leader election enabled like this:
99+
```
100+
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
101+
--set podSecurityContext=null \
102+
--set defaultConfiguration."log4j-operator\.properties"=monitorInterval\=30 \
103+
--set defaultConfiguration."log4j-console\.properties"=monitorInterval\=30 \
104+
--set replicas=2 \
105+
--set defaultConfiguration."flink-conf\.yaml"="kubernetes.operator.leader-election.enabled:\ true
106+
kubernetes.operator.leader-election.lease-name:\ flink-operator-lease" \
107+
-n flink
108+
```
109+
2. Follow the [guide](minio-install/README.md) to deploy and create a local S3 compatible storage service using minio and add a bucket named `test`.
110+
3. Replace the `MINIO_POD_ID` in `recommendation-app-HA/flink-deployment-ha.yaml` with this command output:
111+
```
112+
kubectl get pod minio -n flink --template={{.status.podIP}}
113+
```
114+
4. Deploy the `FlinkDeployment` CR with HA configured
115+
```
116+
kubectl apply -f recommendation-app-HA/flink-deployment-ha.yaml -n flink
117+
```
118+
5. There should be 2 recommendation-app job manager and 1 task manager pod deployed
119+
```
120+
kubectl get pod -l app=recommendation-app -n flink
121+
NAME READY STATUS RESTARTS AGE
122+
recommendation-app-76b6854f98-4qcz4 1/1 Running 0 3m59s
123+
recommendation-app-76b6854f98-9zb24 1/1 Running 0 3m59s
124+
recommendation-app-taskmanager-1-1 1/1 Running 0 2m5s
125+
```
126+
6. Browse the minio console, to make sure the metadata of the job manager is uploaded to `s3://test/ha`
127+
7. Find out which job manager pod is the leader
128+
129+
To do this, check the pod logs.
130+
If a job manager is the standby, then the log will indicate that it is watching and waiting to becoming the leader.
131+
```
132+
kubectl logs -n flink recommendation-app-76b6854f98-4qcz4 --tail=2 -f
133+
2024-12-11 08:31:22,729 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for flink/recommendation-app-cluster-config-map, watching id:3c7e23f2-fcaa-4f47-8623-fa1ebe9609ea
134+
2024-12-11 08:31:22,729 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for flink/recommendation-app-cluster-config-map, watching id:ea191b50-a1ce-43c6-9bfc-a61f739fa8b3
135+
```
136+
8. Delete the leader pod of job manager, and monitor the logs of the standby pod
137+
138+
Keep the step(7) command running, and delete the leader pod in another terminal
139+
```
140+
kubectl logs -n flink recommendation-app-76b6854f98-4qcz4 --tail=2 -f
141+
...
142+
2024-12-11 08:50:16,438 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected for recommendation-app-cluster-config-map.
143+
2024-12-11 08:50:16,518 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected 907d6eda-7619-45c5-97c6-e2a6243f56f6 for recommendation-app-cluster-config-map.
144+
2024-12-11 08:50:16,524 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://10.244.1.30:8081 was granted leadership with leaderSessionID=87a6a102-c77b-4ec2-b263-b20a60921c9e
145+
2024-12-11 08:50:16,524 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is granted leadership with session id 87a6a102-c77b-4ec2-b263-b20a60921c9e.
146+
```
147+
You should see the leadership is changed to the other pod.
148+
9. Make sure the checkpointing file is successfully uploaded to `s3://test/cp` via the minio console.
149+
10. Monitor the sink topic in kafka
150+
151+
Run the console consumer to get the result of sink topic:
152+
```
153+
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
154+
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.recommended.products --from-beginning
155+
user-36,"83,172,77,41,128,168","2024-12-11 08:32:35"
156+
user-63,"15,141,160,64,23,143","2024-12-11 08:32:35"
157+
user-0,"83,172,77,41,128,168","2024-12-11 08:32:40"
158+
...
159+
user-93,"14,110","2024-12-11 09:00:20"
160+
user-75,128,"2024-12-11 09:00:25"
161+
```
162+
It will emit the result by time. Keep this terminal open, we'll check it later.
163+
11. Delete the task manager pod
164+
```
165+
kubectl delete pod/recommendation-app-taskmanager-1-1 -n flink
166+
pod "recommendation-app-taskmanager-1-1" deleted
167+
```
168+
12. Make sure the newly created task manager pod is loading the checkpoint
169+
```
170+
kubectl logs recommendation-app-taskmanager-2-1 -n flink -f | grep test/cp
171+
...
172+
2024-12-11 09:02:53,271 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Starting to restore from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/0a34bfc6-188c-405f-8778-114caa6029ec, 0a34bfc6-188c-405f-8778-114caa6029ec [1209621 bytes]}.
173+
2024-12-11 09:02:54,576 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Starting to restore from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/6b904fdf-d657-4a34-ab72-8f455c1aa578, 6b904fdf-d657-4a34-ab72-8f455c1aa578 [111579 bytes]}.
174+
2024-12-11 09:02:54,577 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Starting to restore from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/0c1c98c1-c423-4e62-b16c-fba2056e730d, 0c1c98c1-c423-4e62-b16c-fba2056e730d [49189 bytes]}.
175+
...
176+
2024-12-11 09:02:55,280 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Finished restoring from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/d804b45a-d7d3-4fd6-8837-83a1652c447d, d804b45a-d7d3-4fd6-8837-83a1652c447d [48588 bytes]}.
177+
2024-12-11 09:02:55,572 INFO org.apache.flink.runtime.state.heap.HeapRestoreOperation [] - Finished restoring from state handle: KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}}, stateHandle=RelativeFileStateHandle State: s3://test/cp/61f9e79ca6a301ed97cc4c1c6197accf/chk-28/ea11d00d-c79b-49d4-b1ff-a73ad01b7197, ea11d00d-c79b-49d4-b1ff-a73ad01b7197 [48167 bytes]}.
178+
...
179+
```
180+
13. Check the sink topic consumer output, it should continue from minutes ago, not from the beginning:
181+
```
182+
kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \
183+
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.recommended.products --from-beginning
184+
user-36,"83,172,77,41,128,168","2024-12-11 08:32:35"
185+
user-63,"15,141,160,64,23,143","2024-12-11 08:32:35"
186+
...
187+
user-75,128,"2024-12-11 09:00:25"
188+
# pod deleted
189+
user-69,64,"2024-12-11 08:45:15"
190+
...
191+
```

ha-example/minio-install/README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# minio installation
2+
3+
This folder contains an example minio deployment yaml and the instructions are based on the [minio documentation](https://min.io/docs/minio/kubernetes/upstream/index.html).
4+
5+
1. Deploy the minio with default configurations
6+
```
7+
kubectl apply -f minio.yaml -n flink
8+
```
9+
2. Access the MinIO S3 Console
10+
```
11+
kubectl port-forward deployment/minio 9000 9090 -n flink
12+
```
13+
3. Create a bucket via MinIO S3 Console
14+
15+
Open `http://localhost:9090`, and login with `minioadmin/minioadmin`, then go to `Buckets` -> `Create Bucket` to create a bucket.
16+
4. Monitor the files in the bucket
17+
18+
Click on the `Object Browser` to view the files in the buckets.
19+
20+
After minio is deployed and bucket is created, the flink configuration can be set like this in the `FlinkDeployment` CR:
21+
```yaml
22+
apiVersion: flink.apache.org/v1beta1
23+
kind: FlinkDeployment
24+
metadata:
25+
name: recommendation-app
26+
spec:
27+
image: quay.io/streamshub/flink-sql-runner:v0.0.1
28+
flinkVersion: v1_19
29+
flinkConfiguration:
30+
# minio setting
31+
s3.access-key: minioadmin
32+
s3.secret-key: minioadmin
33+
s3.endpoint: http://MINIO_POD_ID:9000
34+
s3.path.style.access: "true"
35+
high-availability.storageDir: s3://test/ha
36+
state.checkpoints.dir: s3://test/cp
37+
```
38+
Note:
39+
1. Suppose there is a bucket named `test` in minio.
40+
2. The `MINIO_POD_ID` can be found via:
41+
```
42+
kubectl get pod minio -n flink --template={{.status.podIP}}
43+
```

ha-example/minio-install/minio.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
labels:
5+
app: minio
6+
name: minio
7+
spec:
8+
replicas: 1
9+
selector:
10+
matchLabels:
11+
app: minio
12+
template:
13+
metadata:
14+
labels:
15+
app: minio
16+
spec:
17+
containers:
18+
- name: minio
19+
image: quay.io/minio/minio:latest
20+
command:
21+
- /bin/bash
22+
- -c
23+
args:
24+
- minio server /data --console-address :9090
25+
volumeMounts:
26+
- mountPath: /data
27+
name: localvolume # Corresponds to the `spec.volumes` Persistent Volume
28+
volumes:
29+
- name: localvolume
30+
hostPath: # MinIO generally recommends using locally-attached volumes
31+
path: /mnt/disk1/data # Specify a path to a local drive or volume on the Kubernetes worker node
32+
type: DirectoryOrCreate # The path to the last directory must exist

0 commit comments

Comments
 (0)