QUICKGUARD ホームページ >

AWS Step Functions

2018.12.25

Cloudwatch Logs を 定期的に S3 へ保存 したい時、Cloudwatch Events + Lambda だけでも可能なのですが、Cloudwatch Logs の エクスポートタスクには複数同時に実行できないという制約があるため、連続してエクスポートを実行しようとするとエラーになってしまいます。
Lambdaは実行時間に制限がありますし、稼働時間に応じて課金される為、 関数の中で Wait するのは余計な実行時間がかかってしまうのが気になります。
そこで、Step Functions を使って Wait の処理を外だししてみます。
Step Functions からパラメータを渡すことができるので、関数の汎用性を上げることにもつながります。
今回の記事では、処理の全体像と、StepFunctionsの定義をご紹介します。


概要

使うAWSサービスの関連性としては、以下のようになっています。
AWS_stepfunctions

処理の流れとしては、大まかに以下のようになります。

  • Cloudwatch Events から Step Function 呼び出し
  • Step Function で 処理したいロググループ分、以下の処理を繰り返す

処理したいロググループのパラメータを指定してLambda のエクスポートタスク作成関数を呼び出し
Lambda のエクスポートタスク完了チェック関数を呼び出し、完了していなければ待ち、完了していたら次のロググループのエクスポートタスクを実行

Step Functionsの定義

今回作成したステートマシン図は以下です。

StatMachine

  • Initialize:Lambdaに渡すパラメータの初期化など
  • Select Log Group:実行対象のロググループを選択する
  • log-group-a~b-name:実行対象のロググループ名をパラメータ変数に入れる
  • log-group-a~b-s3prefix:実行対象のロググループの保存先のS3プレフィックスをパラメータ変数に入れる
  • Create Export Task:Lambdaのexport_log_to_s3関数を実行
  • Task Complete ?:Lambdaのcheck_export_task関数を実行し、エラーだった場合は指定秒待ってから再実行する

ビジュアル的に作成できるのかな?と期待させる見た目ですが、
実際にはフロー図を作成するようには作成できず、JSON でコードを書く必要があります。

ステートマシンステートマシンの定義は、以下の通りです。

{
    "StartAt": "Initialize",
    "States": {
      "Succeed": {
        "Type": "Succeed"
      },
      "Fail": {
        "Type": "Fail"
      },
      "Initialize": {
        "Type": "Pass",
        "Result": {
            "logGroupName": "",
            "s3BucketName": "sample-logs",
            "s3prefix": ""
          },
          "Next": "Select Log Group"
      },
      "Select Log Group": {
        "Type": "Choice",
        "Choices": [
          {
            "Variable": "$.logGroupName",
            "StringEquals": "",
            "Next": "log-group-a-name"
          },
          {
            "Variable": "$.logGroupName",
            "StringEquals": "/var/log/httpd/access_log",
            "Next": "log-group-b-name"
          },
          {
            "Variable": "$.logGroupName",
            "StringEquals": "/var/log/httpd/error_log",
            "Next": "log-group-c-name"
          },
          {
            "Variable": "$.logGroupName",
            "StringEquals": "/var/log/messages",
            "Next": "Succeed"
          }
        ],
        "Default": "Fail"
      },
      "log-group-a-name": {
        "Type": "Pass",
        "Result": "/var/log/httpd/access_log",
        "ResultPath": "$.logGroupName",
        "Next": "log-group-a-s3prefix"
      },
      "log-group-b-name": {
        "Type": "Pass",
        "Result": "/var/log/httpd/error_log",
        "ResultPath": "$.logGroupName",
        "Next": "log-group-b-s3prefix"
      },
      "log-group-c-name": {
        "Type": "Pass",
        "Result": "/var/log/messages",
        "ResultPath": "$.logGroupName",
        "Next": "log-group-c-s3prefix"
      },
      "log-group-a-s3prefix": {
        "Type": "Pass",
        "Result": "httpd-access_log",
        "ResultPath": "$.s3Prefix",
        "Next": "Create Export Task"
      },
      "log-group-b-s3prefix": {
        "Type": "Pass",
        "Result": "httpd-error_log",
        "ResultPath": "$.s3Prefix",
        "Next": "Create Export Task"
      },
      "log-group-c-s3prefix": {
        "Type": "Pass",
        "Result": "messages",
        "ResultPath": "$.s3Prefix",
        "Next": "Create Export Task"
      },
      "Create Export Task": {
        "Type": "Task",
        "Resource": "arn:aws:lambda:ap-northeast-1:1234567890:function:export_log_to_s3",
        "Catch": [
          {
            "ErrorEquals": ["ResourceNotFoundException"],
            "Next": "Select Log Group"
          },
          {
            "ErrorEquals": ["States.ALL"],
            "Next": "Fail"
          }
        ],
        "Next": "Task Complete ?"
      },
      "Task Complete ?": {
        "Type": "Task",
        "Resource": "arn:aws:lambda:ap-northeast-1:1234567890:function:check_export_task",
        "Retry": [
            {
                "ErrorEquals": [ "RUNNING", "PENDING" ],
                "IntervalSeconds": 3,
                "BackoffRate": 1.5,
                "MaxAttempts": 5
            }
        ],
        "Catch": [
            {
                "ErrorEquals": [ "States.ALL" ],
                "Next": "Fail"
            }
        ],
        "Next": "Select Log Group"
      }
    }
  }

"Task Complete ?""Retry"の部分に、Wait の処理が入っています。
再試行までのインターバルや最大試行回数も指定できますし、"BackoffRate"の部分で再試行の間隔を伸ばすことも可能で、なかなか細かい設定ができる事がわかります。


Step Functions は「視覚的なワークフローを構築できる」というキャッチフレーズがついていますが、
上記のステートマシンステートマシンの定義を見てもわかるように、ループ処理をするにも順に処理を記述しなければならないので、ちょっと面倒ですね。
JSON でワークフローを定義するのはちょっととっつきにくいですが、応用の効く機能だと思いますので、色々活用できると思います。