ストリーミング処理

[更新:2025年11月28日]

stream_processing

イベントをトリガーにしてストリームデータを即時収集し、加工・整形した上で分析処理に流し込み、その結果をデータレイクへ蓄積するシステムです。リアルタイムな状況把握や異常検知が容易となり、迅速な対応・自動化・業務効率化を実現する高即応性のデータ処理基盤を構築できます。

各機能のサービス対応

機能

対応サービス

データレイク

Azure Data Lake Storage Gen2

ETL/データ変換

Azure Data Factory

ストリーミング処理

Azure Stream Analytics

イベント処理

Azure Event Grid

構築手順

Fluent Bitを利用したVMログのデータ転送

本セクションではFluent Bitを利用して、さくらのクラウド上に構築しているNginxサーバーのアクセスログを、ETL入力用ストレージに転送します。

参考情報:

Addonをデプロイする

ETL入力用に、Azure Data Lake Storage Gen2を作成します。

TOKEN='<your-api-token>'

curl -v \
  --location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
  --header 'Content-Type: application/json' \
  --header 'Accept: application/json' \
  --header "Authorization: Basic $TOKEN" \
  --data '{
    "location": "japaneast",
    "performance": 1,
    "redundancy": 1
  }'

アクセス権限を設定する (RBAC)

利用者グループに付与する権限

ストレージ

ロール

目的

ETL入力用

ストレージ BLOB データ閲覧者

VMから転送されたデータの確認用

設定手順
  1. Azure Portalで各ストレージアカウントを開く

  2. 「アクセス制御 (IAM)」を選択

  3. 「+ 追加」>「ロールの割り当ての追加」

  4. 該当ロールを選択し、「利用者」グループを追加

ユーザーグループRBAC設定

データレイクを準備する

コンテナーとディレクトリの設定名

ETL入力用のストレージに以下を設定します:

  • コンテナー名: test

  • ディレクトリ名: sample

設定手順
  1. ストレージアカウントを開く

  2. 「コンテナー」>「+ コンテナーの追加」

コンテナー作成
  1. コンテナー名にtestを入力し作成

  2. コンテナーを開き、「+ ディレクトリの追加」

  3. ディレクトリ名にsampleを入力し作成

ディレクトリ作成
認証方法の変更

作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。

認証方法の変更

さくらのクラウドを準備する

さくらのクラウド上にNginxサーバーを作成し、FluentBitの設定を行うサンプルのTerraformとAnsibleを用意しています。アクセスログの転送間隔は5分間隔で転送されるように設定しています。

設定ファイルは、サンプルファイルを確認してください。

playbookに設定する環境変数

Fluent Bitを利用してVMのログデータをAzure Data Lake Storage Gen2に転送するには、Azure Data Lake Storage Gen2の環境変数、ストレージアカウント名が必要です。

設定手順
  1. サンプルのplaybook.yml を開き、以下の値を変更する。

変数名

設定値

azure_account_name

ETL入力用ストレージ名

azure_account_key

ETL入力用ストレージのアクセスキー

  1. アクセスキーはAzure Portalを開き、ETL入力用ストレージ > セキュリティとネットワーク > アクセスキーから確認可能できる。

アクセスキーの確認

データ転送を確認する

アクセスログの転送を次の手順で確認します。

  1. Azure Portalを開き、 ETL入力用ストレージ > test > sampleに遷移する。

  2. JSONファイルのVMのアクセスログデータが転送されていることを確認する。

VMアクセスログデータの確認

ETL処理を構築する

Addonをデプロイする

Azure Data Lake Storage Gen2の作成(ETL出力・ストリーミング入力用)
TOKEN='<your-api-token>'

curl -v \
  --location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
  --header 'Content-Type: application/json' \
  --header 'Accept: application/json' \
  --header "Authorization: Basic $TOKEN" \
  --data '{
    "location": "japaneast",
    "performance": 1,
    "redundancy": 1
  }'
Azure Data Factoryの作成
curl -v \
  --location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/etl' \
  --header 'Content-Type: application/json' \
  --header 'Accept: application/json' \
  --header "Authorization: Basic $TOKEN" \
  --data '{
    "location": "japaneast"
  }'

ユーザーグループにアクセス権限を設定する (RBAC)

ユーザーグループに付与する権限

「利用者」グループに以下の権限を付与します:

ストレージ

ロール

目的

ETL出力・ストリーミング入力用

ストレージ BLOB データ閲覧者

ETL結果の確認

設定手順
  1. Azure Portalで各ストレージアカウントを開く

  2. 「アクセス制御 (IAM)」を選択

  3. 「+ 追加」>「ロールの割り当ての追加」

  4. 該当ロールを選択し、「利用者」グループを追加

ユーザーグループRBAC設定

マネージドIDにアクセス権限を設定する (RBAC)

Azure Data FactoryのマネージドIDに付与する権限

Data FactoryがETL入力用ストレージからデータを読み取り、ETL出力・ストリーミング入力用ストレージへ書き込めるよう以下の権限を付与します:

ストレージ

ロール

理由

ETL入力用

ストレージ BLOB データ閲覧者

パイプラインがソースデータを読み取るため

ETL出力・ストリーミング入力用

ストレージ BLOB データ共同作成者

パイプラインが変換後のデータを書き込むため

設定手順

ETL入力用ストレージへの読み取り権限と、ETL出力・ストリーミング入力用ストレージへの書き込み権限を付与します。

  1. ETL入力用ストレージの「アクセス制御 (IAM)」を開く

  2. Azure Data FactoryのマネージドIDに「ストレージ BLOB データ閲覧者」を付与

Data Factory RBAC設定(ストレージBLOB閲覧者)
  1. ETL出力・ストリーミング入力用ストレージの「アクセス制御(IAM)」を開く

  2. Azure Data FactoryのマネージドIDに「ストレージ BLOB データ共同作成者」を付与

Data Factory RBAC設定(ストレージBLOB共同作成者)

データレイクを準備する

コンテナーとディレクトリの設定名

ETL出力・ストリーミング入力用のストレージに以下を設定します。

  • コンテナー名: test

  • ディレクトリ名: sample

設定手順
  1. ストレージアカウントを開く

  2. 「コンテナー」>「+ コンテナーの追加」

コンテナー作成
  1. コンテナー名にtestを入力し作成

  2. コンテナーを開き、「+ ディレクトリの追加」

  3. ディレクトリ名にsampleを入力し作成

ディレクトリ作成
認証方法の変更

作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。

認証方法の変更

Azure Data Factoryパイプラインを構築する

パイプラインの作成
  1. Azure Data Factory Studioを開く

  2. 「新規」>「パイプライン」を選択

パイプライン作成
  1. 「アクティビティ」>「データフロー」をキャンバスにドラッグ

データフロー選択
ソース(入力)の設定

データセットの作成:

  1. パイプラインの「設定」タブ>「新規」

パイプラインのデータフロー設定
  1. 「データフローの追加」を選択

  2. データフローの「ソースの設定」タブ>「データセット >「新規」

ソースの新規データセット作成
  • サービス: Azure Data Lake Storage Gen2

  • フォーマット: JSON

  • リンクサービスの設定

    • 認証の種類: システム割り当てマネージドID

    • ストレージアカウント: ETL入力用ストレージを選択

  1. 作成する

ソース設定
  • ファイルパス: test/sample

  • スキーマのインポート:「なし」を選択

  1. OKを押す

ソースのファイルパス・オプション設定
シンク(出力)の設定
  1. データフローで「+ 」>「シンク」を追加

  2. 新しいデータセットを作成 (ソースと同様の手順)

  3. リンクサービスでETL出力・ストリーミング入力用ストレージを選択

  4. 作成する

  5. プロパティの設定で、ファイルパス: test/sampleを入力

  6. スキーマのインポート:「なし」を選択

  7. OKを押す

シンク設定
ファイルの出力オプションの設定
  1. シンクの「設定」タブ>ファイルのオプションで「パターン」を選択

  2. パターンのテキストウィンドウを選択し、「動的なコンテンツを追加」を選択

AddonA5SinkFileOptionPatternMenu
  1. 以下の式を入力し、「保存して終了」を選択

項目

設定値

ファイル名のオプション

パターン

パターン

concat('data_', toString(currentTimestamp(), 'yyyyMMddHHmmss'), '.json')

AddonA5SinkFileOptionPatternSetting
フィルター設定

VMから転送されたアクセスログのデータについて、ステータスコードが「404」のデータのみ抽出するように変換します。

  1. データフロー画面で「+」>「フィルター」を追加

フィルターの追加
  1. フィルター画面で「フィルター設定」>フィルターオンのテキストウィンドウを選択 >「式ビルダーを開く」

式ビルダー画面
  1. 以下の式を入力して「保存して終了」

byName('code') == '404'
式ビルダーの設定
すべて発行

「すべて発行」を選択して変更を保存します。

発行

ストレージイベントトリガーを設定する

トリガーの作成

ファイルアップロードを検知して自動的にパイプラインを実行するトリガーを設定します。

  1. パイプライン画面で「トリガーの追加」>「新規/編集」

ストレージイベントトリガーの追加
  1. 以下の設定を入力:

項目

設定値

名前

任意の名前

種類

ストレージイベント

ストレージアカウント

ETL入力用ストレージ

コンテナー名

/test/

次で始まるBLOBパス

sample/

イベント

BLOBが作成されました ✓

空のBLOBを無視

はい

トリガーの開始

作成時のトリガーの開始 ✓

トリガー設定1 トリガー設定2
  1. 「続行」>「OK」>「発行」

ETL処理の動作を確認する

トリガー実行の確認
  1. Azure Data Factory Studioの「監視」>「トリガー実行」を開く

  2. 新しいトリガー実行が表示されることを確認

VMからのログ転送は5分間隔で実施されるため、トリガー実行まで5分ほどかかることがあります。

トリガー実行ログ
出力データの確認

ETL出力・ストリーミング入力用ストレージtest/sampleディレクトリに変換後のファイルが作成されていることを確認します。

パイプラインのデータ変換処理は約5分ほどかかります。

ETLで変換したデータの出力結果

内容を確認し、出力例のようにステータスコードが「404」のデータのみが抽出されていることを確認します。

{"@timestamp":"2025-11-11T03:23:18.321166Z","agent":"-","code":"404","host":"-","method":"GET","path":"/cgi-bin/luci/;stok=/locale","referer":"-","remote":"{CLIENT_IP}","size":"162","time":"11/Nov/2025:12:23:18 +0900","user":"-"}
{"@timestamp":"2025-11-11T05:25:48.381311Z","agent":"-","code":"404","host":"-","method":"GET","path":"/cgi-bin/luci/;stok=/locale","referer":"-","remote":"{CLIENT_IP}","size":"162","time":"11/Nov/2025:14:25:48 +0900","user":"-"}

ストリーミング処理の構築

Addonをデプロイする

Azure Data Lake Storage Gen2の作成(ストリーミング出力用)
TOKEN='<your-api-token>'

curl -v \
  --location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
  --header 'Content-Type: application/json' \
  --header 'Accept: application/json' \
  --header "Authorization: Basic $TOKEN" \
  --data '{
    "location": "japaneast",
    "performance": 1,
    "redundancy": 1
  }'
Azure Stream Analyticsの作成
curl -v \
  --location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/stream' \
  --header 'Content-Type: application/json' \
  --header 'Accept: application/json' \
  --header "Authorization: Basic $TOKEN" \
  --data '{
    "location": "japaneast",
    "sku": "Standard"
  }'

ユーザーグループにアクセス権限を設定する (RBAC)

ユーザーグループに付与する権限

「利用者」グループに以下の権限を付与します:

ストレージ

ロール

目的

ストリーミング出力用

ストレージ BLOB データ閲覧者

ストリーミング結果の確認

設定手順
  1. Azure Portalでストリーミング出力用ストレージアカウントを開く

  2. 「アクセス制御 (IAM)」を選択

  3. 「+ 追加」>「ロールの割り当ての追加」

  4. 「ストレージ BLOB データ閲覧者」を選択し、「利用者」グループを追加

利用者のRBAC付与(ストリーミング出力用ストレージ)

マネージドIDにアクセス権限を設定する (RBAC)

Azure Stream AnalyticsのマネージドIDに付与する権限

Stream Analyticsがストリーミング入力用ストレージからデータを読み取り、ストリーミング出力用ストレージへ書き込めるよう設定します。

ストレージ

ロール

理由

ETL出力・ストリーミング入力用

ストレージ BLOB データ閲覧者

Azure Stream Analyticsがソースデータを読み取るため

ストリーミング出力用

ストレージ BLOB データ共同作成者

Azure Stream Analyticsが出力するデータを書き込むため

設定手順

ETL出力・ストリーミング入力用ストレージへの読み取り権限とストリーミング出力用ストレージへの書き込み権限を付与します。

  1. ETL出力・ストリーミング入力用ストレージの「アクセス制御 (IAM)」を開く

  2. Azure Stream AnalyticsのマネージドIDに「ストレージ BLOB データ閲覧者」を付与

Azure Stream AnalyticsのRBAC付与(ETL出力用ストレージ)
  1. ストリーミング出力用ストレージの「アクセス制御 (IAM)」を開く

  2. Azure Stream AnalyticsのマネージドIDに「ストレージ BLOB データ共同作成者」を付与

Azure Stream AnalyticsのRBAC付与(ストリーミング出力用ストレージ)

データレイクを準備する

コンテナーとディレクトリの設定名

ストリーミング出力用ストレージに以下を設定します:

  • コンテナー名: test

  • ディレクトリ名: sample

作成手順
  1. ストレージアカウントを開く

  2. 「コンテナー」>「+ コンテナーの追加」

コンテナーの追加
  1. コンテナー名にtestを入力し作成

  2. コンテナーを開き、「+ ディレクトリの追加」

  3. ディレクトリ名にsampleを入力し作成

ディレクトリの追加
認証方法の変更

作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。

コンテナーの認証方法の変更

Azure Stream Analyticsを設定する

入力の設定
  1. Azure Stream Analyticsジョブを開く

入力の構成
  1. 「入力」>「+ ストリーム入力の追加」>「Blob Storage/ADLS Gen2」を選択

  2. 以下の設定を入力:

項目

設定値

入力エイリアス

test

ストレージアカウント

ETL出力・ストリーミング入力用ストレージ

コンテナー

test

認証モード

マネージドID:システム割り当て

イベントシリアル化形式

JSON

エンコード

UTF-8

イベントの圧縮タイプ

なし

入力設定1 入力設定2
出力の設定
  1. 「出力」>「+ 追加」>「Blob Storage/ADLS Gen2」を選択

出力の構成
  1. 以下の設定を入力する

項目

設定値

出力エイリアス

result

ストレージアカウント

ストリーミング出力用ストレージ

コンテナー

test

認証モード

マネージドID:システム割り当て

イベントシリアル化形式

JSON

フォーマット

改行区切り

エンコード

UTF-8

書き込みモード

結果が到着したときにアペンドする

パスパターン

sample/

出力設定1 出力設定2
クエリの設定
  1. 「クエリ」を選択

  2. 以下のクエリを入力:

SELECT
*
INTO
  result
FROM
  test
AddonA5QuerySave.png
ジョブの開始
  1. 「概要」を選択

  2. 「開始」>「今すぐ」を選択してジョブを開始

ジョブの開始 ジョブの開始時刻の設定
  1. Stream Analyticsジョブの「概要」タブで、状態が「実行中」になったことを確認する。

ジョブ状態の確認

ストリーミング処理の動作を確認する

データの流れの確認
  1. Fluent Bitを利用してVMのアクセスログデータの転送

  2. Azure Data Factoryでパイプラインが自動実行されることを確認

ストレージイベントトリガーの確認
  1. ETL出力・ストリーミング入力用ストレージに変換後のファイルが作成されることを確認

ETL処理の確認
  1. Azure Stream Analyticsがデータを処理していることを確認する

  2. ストリーミング出力用ストレージに結果が出力されることを確認する

ストリーミング処理結果の確認

Azure Stream Analyticsのメトリック

Azure Stream Analyticsでは、入出力イベント数などのパフォーマンスに関する様々なメトリックが使用できます。

Stream Analyticsのメトリクス

関連情報: Azure Stream Analytics 監視データ リファレンス