こんにちは、アトラエでインフラエンジニアをやっているくーまです。
今回は掲題の通り、Argo Eventsでの並列実行数の制御の方法について書いていこうと思います。
前提としては↓のようなものがあります。
- ArgoEventsを使用している
- ArgoWorkflowで処理を実行している
ArgoEventsを導入している場合、何かしら非同期に処理を行いたいといった要件が多いと思われます。
そういった要件の中には、処理に時間がかかったり、そこそこのコンピューティングリソースを必要とするものもあるでしょう。
そうなると必要とされるリソース量に応じてスケーリングしていきたいとなるわけですが、特にDBに負荷がかかる場合などはスケーリングの速度が追いつかなかったり、非同期処理のためだけにスケーリングに最適化されたDBを採用できなかったりという理由で、非同期処理の並列実行数を制御してしまうことが一番現実的、ということもあり得るのではないかと思います。
ということで今回は、ArgoEventsで並列実行数をどう制御するのかについて書いていきます。
同時実行数の制御はArgoEvents側ではなくArgoWorkflow側で行う
いきなりタイトルを破壊している感じの見出しになってしまいましたが、実際こうなりました。
色々と調べてみたところ、ArgoEvents側には同時実行数の制御を行う仕組みは備わっておらず、ArgoWorkflow等の実行側で制限をかけることを想定しているようです。
そのため、以下はArgoWorkflowでの同時実行数の制御の方法と、その挙動になります。
※当然ですが、ArgoWorkflowである必要はありません
同時実行数を制御するYAMLの記述
ArgoWorkflowにはsynchronizationという設定を入れることができます。
これは、該当するWorkflowのみに同時実行の制限を入れられる、というものです。
ワークフローAは同時実行に制限なし、ワークフローBは最大3つまで…みたいな感じにできる、ということです。
今回は実際の挙動まで書いていくため、サンプルのSensorのYAMLを載せます。
なお、今回は引数で指定した秒数sleepするコンテナを動かす想定で書いています。
また、EventSource等は良い感じに作成済みという前提で、Sensorのみ記載します。
AWS SQS → EventSource → EventBus → Sensor → Workflow → Pod
といった感じです。
SQSにJSONを詰め、そのメッセージ内容を引数としてsleepをかける感じです。
apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: test-sleep spec: template: serviceAccountName: test-sleep-sa eventBusName: test-sleep dependencies: - name: test-sleep eventSourceName: test-sleep eventName: test-sleep triggers: - template: name: test-sleep-wf k8s: group: argoproj.io version: v1alpha1 resource: workflows operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: test-sleep- spec: entrypoint: test-sleep synchronization: semaphore: configMapKeyRef: name: test-sleep-conf key: PARALLELISM arguments: parameters: - name: time value: value templates: - name: test-sleep serviceAccountName: test-sleep-sa inputs: parameters: - name: time container: image: alpine:latest args: [sleep, '{{inputs.parameters.time}}'] resources: requests: memory: 100Mi cpu: 100m limits: memory: 100Mi cpu: 100m parameters: - src: dependencyName: test-sleep dataKey: body.time dest: spec.arguments.parameters.0.value
ここで大事なのは真ん中あたりにある
synchronization: semaphore: configMapKeyRef: name: test-sleep-conf key: PARALLELISM
ここの部分です。
ConfigMapで定義したPARALLELISMという設定値を読み込み、その数値の数までしか同時実行されないようになる、という記述です。
今回は3と指定してみます。
ArgoWorkflowの並列実行数制御の挙動
上記のYAMLをapplyし、実際に60秒sleepさせる感じのメッセージを5件SQSにつっこんでみました。
3並列と記載していることもあり、kubectl get workflow
をうち確認すると、以下のようなWorkflowの挙動になっていました。
ワークフロー自体はSensorによって作成されているものの、うち3つがRunning、残りの2つは未定義の状態となっています。
実際にdescribeでWorkflowの詳細を見てみると、以下のようにロックがかかっていることがわかります。
当然ですがPodは3つしか起動しておらず、それらの処理が終わり次第スタックしていた2つのWorkflowの処理が行われ、5つ全て完了するという形になりました。
注意点 (おまけ)
今回はAWS SQSを使いましたが、SQSには「デフォルトの可視性タイムアウト」という仕組みがあり、そこが要注意なので記載します。
要は「処理が一定期間たっても終了しない場合、何らかの理由で処理がうまく行かなかったと判断し、1度失敗したとみなす」というものです。
デフォルトの、と付いてる通り、処理実行側から「すません!ちょっと時間かかってるだけで俺大丈夫っす!」というアピールがあれば、それに合わせ期間を延長してくれます。
但し私の調べた範囲ではArgoWorkflowにて処理がスタックしている際に可視性タイムアウトの時間を伸ばす方法はありませんでした。
当然Workflowがスタックしている期間もSQSからしたらメッセージが取得されて処理されているように見えているわけなので、このタイムアウトの時間が消費されています。
そのため実行開始を待っている時間も織り込んだデフォルトの可視性タイムアウトの設定をする、ということは忘れないようにしないといけなさそうです。