Atrae Tech Blog

People Tech Companyの株式会社アトラエのテックブログです。

Argo Eventsでの並列実行数の制御方法

こんにちは、アトラエでインフラエンジニアをやっているくーまです。

今回は掲題の通り、Argo Eventsでの並列実行数の制御の方法について書いていこうと思います。

前提としては↓のようなものがあります。

  • ArgoEventsを使用している
  • ArgoWorkflowで処理を実行している

ArgoEventsを導入している場合、何かしら非同期に処理を行いたいといった要件が多いと思われます。

そういった要件の中には、処理に時間がかかったり、そこそこのコンピューティングリソースを必要とするものもあるでしょう。

そうなると必要とされるリソース量に応じてスケーリングしていきたいとなるわけですが、特にDBに負荷がかかる場合などはスケーリングの速度が追いつかなかったり、非同期処理のためだけにスケーリングに最適化されたDBを採用できなかったりという理由で、非同期処理の並列実行数を制御してしまうことが一番現実的、ということもあり得るのではないかと思います。

ということで今回は、ArgoEventsで並列実行数をどう制御するのかについて書いていきます。

同時実行数の制御はArgoEvents側ではなくArgoWorkflow側で行う

いきなりタイトルを破壊している感じの見出しになってしまいましたが、実際こうなりました。

色々と調べてみたところ、ArgoEvents側には同時実行数の制御を行う仕組みは備わっておらず、ArgoWorkflow等の実行側で制限をかけることを想定しているようです。

そのため、以下はArgoWorkflowでの同時実行数の制御の方法と、その挙動になります。

※当然ですが、ArgoWorkflowである必要はありません

同時実行数を制御するYAMLの記述

ArgoWorkflowにはsynchronizationという設定を入れることができます。

これは、該当するWorkflowのみに同時実行の制限を入れられる、というものです。

ワークフローAは同時実行に制限なし、ワークフローBは最大3つまで…みたいな感じにできる、ということです。

今回は実際の挙動まで書いていくため、サンプルのSensorのYAMLを載せます。

なお、今回は引数で指定した秒数sleepするコンテナを動かす想定で書いています。

また、EventSource等は良い感じに作成済みという前提で、Sensorのみ記載します。

アーキテクチャ

AWS SQS → EventSource → EventBus → Sensor → Workflow → Pod

といった感じです。

SQSにJSONを詰め、そのメッセージ内容を引数としてsleepをかける感じです。

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: test-sleep
spec:
  template:
    serviceAccountName: test-sleep-sa
  eventBusName: test-sleep
  dependencies:
  - name: test-sleep
    eventSourceName: test-sleep
    eventName: test-sleep

  triggers:
  - template:
      name: test-sleep-wf
      k8s:
        group: argoproj.io
        version: v1alpha1
        resource: workflows
        operation: create
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: test-sleep-
            spec:
              entrypoint: test-sleep
              synchronization:
                semaphore:
                  configMapKeyRef:
                    name: test-sleep-conf
                    key: PARALLELISM

              arguments:
                parameters:
                - name: time
                  value: value
              templates:
              - name: test-sleep
                serviceAccountName: test-sleep-sa
                inputs:
                  parameters:
                  - name: time
                container:
                  image: alpine:latest
                  args: [sleep, '{{inputs.parameters.time}}']
                  resources:
                    requests:
                      memory: 100Mi
                      cpu: 100m
                    limits:
                      memory: 100Mi
                      cpu: 100m
        parameters:
        - src:
            dependencyName: test-sleep
            dataKey: body.time
          dest: spec.arguments.parameters.0.value

ここで大事なのは真ん中あたりにある

              synchronization:
                semaphore:
                  configMapKeyRef:
                    name: test-sleep-conf
                    key: PARALLELISM

ここの部分です。

ConfigMapで定義したPARALLELISMという設定値を読み込み、その数値の数までしか同時実行されないようになる、という記述です。

今回は3と指定してみます。

ArgoWorkflowの並列実行数制御の挙動

上記のYAMLをapplyし、実際に60秒sleepさせる感じのメッセージを5件SQSにつっこんでみました。

3並列と記載していることもあり、kubectl get workflowをうち確認すると、以下のようなWorkflowの挙動になっていました。

f:id:atrae_tech:20211214015758p:plain

ワークフロー自体はSensorによって作成されているものの、うち3つがRunning、残りの2つは未定義の状態となっています。

実際にdescribeでWorkflowの詳細を見てみると、以下のようにロックがかかっていることがわかります。

f:id:atrae_tech:20211214021329p:plain

当然ですがPodは3つしか起動しておらず、それらの処理が終わり次第スタックしていた2つのWorkflowの処理が行われ、5つ全て完了するという形になりました。

注意点 (おまけ)

今回はAWS SQSを使いましたが、SQSには「デフォルトの可視性タイムアウト」という仕組みがあり、そこが要注意なので記載します。

要は「処理が一定期間たっても終了しない場合、何らかの理由で処理がうまく行かなかったと判断し、1度失敗したとみなす」というものです。

デフォルトの、と付いてる通り、処理実行側から「すません!ちょっと時間かかってるだけで俺大丈夫っす!」というアピールがあれば、それに合わせ期間を延長してくれます。

但し私の調べた範囲ではArgoWorkflowにて処理がスタックしている際に可視性タイムアウトの時間を伸ばす方法はありませんでした。

当然Workflowがスタックしている期間もSQSからしたらメッセージが取得されて処理されているように見えているわけなので、このタイムアウトの時間が消費されています。

そのため実行開始を待っている時間も織り込んだデフォルトの可視性タイムアウトの設定をする、ということは忘れないようにしないといけなさそうです。