ストリーミング処理
[更新:2025年11月28日]
イベントをトリガーにしてストリームデータを即時収集し、加工・整形した上で分析処理に流し込み、その結果をデータレイクへ蓄積するシステムです。リアルタイムな状況把握や異常検知が容易となり、迅速な対応・自動化・業務効率化を実現する高即応性のデータ処理基盤を構築できます。
各機能のサービス対応
機能 |
対応サービス |
|---|---|
データレイク |
Azure Data Lake Storage Gen2 |
ETL/データ変換 |
Azure Data Factory |
ストリーミング処理 |
Azure Stream Analytics |
イベント処理 |
Azure Event Grid |
構築手順
Fluent Bitを利用したVMログのデータ転送
本セクションではFluent Bitを利用して、さくらのクラウド上に構築しているNginxサーバーのアクセスログを、ETL入力用ストレージに転送します。
参考情報:
Addonをデプロイする
ETL入力用に、Azure Data Lake Storage Gen2を作成します。
TOKEN='<your-api-token>'
curl -v \
--location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Basic $TOKEN" \
--data '{
"location": "japaneast",
"performance": 1,
"redundancy": 1
}'
アクセス権限を設定する (RBAC)
利用者グループに付与する権限
ストレージ |
ロール |
目的 |
|---|---|---|
ETL入力用 |
ストレージ BLOB データ閲覧者 |
VMから転送されたデータの確認用 |
設定手順
Azure Portalで各ストレージアカウントを開く
「アクセス制御 (IAM)」を選択
「+ 追加」>「ロールの割り当ての追加」
該当ロールを選択し、「利用者」グループを追加
データレイクを準備する
コンテナーとディレクトリの設定名
ETL入力用のストレージに以下を設定します:
コンテナー名:
testディレクトリ名:
sample
設定手順
ストレージアカウントを開く
「コンテナー」>「+ コンテナーの追加」
コンテナー名に
testを入力し作成コンテナーを開き、「+ ディレクトリの追加」
ディレクトリ名に
sampleを入力し作成
認証方法の変更
作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。
さくらのクラウドを準備する
さくらのクラウド上にNginxサーバーを作成し、FluentBitの設定を行うサンプルのTerraformとAnsibleを用意しています。アクセスログの転送間隔は5分間隔で転送されるように設定しています。
設定ファイルは、サンプルファイルを確認してください。
playbookに設定する環境変数
Fluent Bitを利用してVMのログデータをAzure Data Lake Storage Gen2に転送するには、Azure Data Lake Storage Gen2の環境変数、ストレージアカウント名が必要です。
設定手順
サンプルのplaybook.yml を開き、以下の値を変更する。
変数名 |
設定値 |
|---|---|
azure_account_name |
ETL入力用ストレージ名 |
azure_account_key |
ETL入力用ストレージのアクセスキー |
アクセスキーはAzure Portalを開き、ETL入力用ストレージ > セキュリティとネットワーク > アクセスキーから確認可能できる。
データ転送を確認する
アクセスログの転送を次の手順で確認します。
Azure Portalを開き、 ETL入力用ストレージ > test > sampleに遷移する。
JSONファイルのVMのアクセスログデータが転送されていることを確認する。
ETL処理を構築する
Addonをデプロイする
Azure Data Lake Storage Gen2の作成(ETL出力・ストリーミング入力用)
TOKEN='<your-api-token>'
curl -v \
--location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Basic $TOKEN" \
--data '{
"location": "japaneast",
"performance": 1,
"redundancy": 1
}'
Azure Data Factoryの作成
curl -v \
--location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/etl' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Basic $TOKEN" \
--data '{
"location": "japaneast"
}'
ユーザーグループにアクセス権限を設定する (RBAC)
ユーザーグループに付与する権限
「利用者」グループに以下の権限を付与します:
ストレージ |
ロール |
目的 |
|---|---|---|
ETL出力・ストリーミング入力用 |
ストレージ BLOB データ閲覧者 |
ETL結果の確認 |
設定手順
Azure Portalで各ストレージアカウントを開く
「アクセス制御 (IAM)」を選択
「+ 追加」>「ロールの割り当ての追加」
該当ロールを選択し、「利用者」グループを追加
マネージドIDにアクセス権限を設定する (RBAC)
Azure Data FactoryのマネージドIDに付与する権限
Data FactoryがETL入力用ストレージからデータを読み取り、ETL出力・ストリーミング入力用ストレージへ書き込めるよう以下の権限を付与します:
ストレージ |
ロール |
理由 |
|---|---|---|
ETL入力用 |
ストレージ BLOB データ閲覧者 |
パイプラインがソースデータを読み取るため |
ETL出力・ストリーミング入力用 |
ストレージ BLOB データ共同作成者 |
パイプラインが変換後のデータを書き込むため |
設定手順
ETL入力用ストレージへの読み取り権限と、ETL出力・ストリーミング入力用ストレージへの書き込み権限を付与します。
ETL入力用ストレージの「アクセス制御 (IAM)」を開く
Azure Data FactoryのマネージドIDに「ストレージ BLOB データ閲覧者」を付与
ETL出力・ストリーミング入力用ストレージの「アクセス制御(IAM)」を開く
Azure Data FactoryのマネージドIDに「ストレージ BLOB データ共同作成者」を付与
データレイクを準備する
コンテナーとディレクトリの設定名
ETL出力・ストリーミング入力用のストレージに以下を設定します。
コンテナー名:
testディレクトリ名:
sample
設定手順
ストレージアカウントを開く
「コンテナー」>「+ コンテナーの追加」
コンテナー名に
testを入力し作成コンテナーを開き、「+ ディレクトリの追加」
ディレクトリ名に
sampleを入力し作成
認証方法の変更
作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。
Azure Data Factoryパイプラインを構築する
パイプラインの作成
Azure Data Factory Studioを開く
「新規」>「パイプライン」を選択
「アクティビティ」>「データフロー」をキャンバスにドラッグ
ソース(入力)の設定
データセットの作成:
パイプラインの「設定」タブ>「新規」
「データフローの追加」を選択
データフローの「ソースの設定」タブ>「データセット >「新規」
サービス:
Azure Data Lake Storage Gen2フォーマット:
JSONリンクサービスの設定
認証の種類:
システム割り当てマネージドIDストレージアカウント: ETL入力用ストレージを選択
作成する
ファイルパス:
test/sampleスキーマのインポート:「なし」を選択
OKを押す
シンク(出力)の設定
データフローで「+ 」>「シンク」を追加
新しいデータセットを作成 (ソースと同様の手順)
リンクサービスでETL出力・ストリーミング入力用ストレージを選択
作成する
プロパティの設定で、ファイルパス:
test/sampleを入力スキーマのインポート:「なし」を選択
OKを押す
ファイルの出力オプションの設定
シンクの「設定」タブ>ファイルのオプションで「パターン」を選択
パターンのテキストウィンドウを選択し、「動的なコンテンツを追加」を選択
以下の式を入力し、「保存して終了」を選択
項目 |
設定値 |
|---|---|
ファイル名のオプション |
パターン |
パターン |
|
フィルター設定
VMから転送されたアクセスログのデータについて、ステータスコードが「404」のデータのみ抽出するように変換します。
データフロー画面で「+」>「フィルター」を追加
フィルター画面で「フィルター設定」>フィルターオンのテキストウィンドウを選択 >「式ビルダーを開く」
以下の式を入力して「保存して終了」
byName('code') == '404'
すべて発行
「すべて発行」を選択して変更を保存します。
ストレージイベントトリガーを設定する
トリガーの作成
ファイルアップロードを検知して自動的にパイプラインを実行するトリガーを設定します。
パイプライン画面で「トリガーの追加」>「新規/編集」
以下の設定を入力:
項目 |
設定値 |
|---|---|
名前 |
任意の名前 |
種類 |
ストレージイベント |
ストレージアカウント |
ETL入力用ストレージ |
コンテナー名 |
|
次で始まるBLOBパス |
|
イベント |
BLOBが作成されました ✓ |
空のBLOBを無視 |
はい |
トリガーの開始 |
作成時のトリガーの開始 ✓ |
「続行」>「OK」>「発行」
ETL処理の動作を確認する
トリガー実行の確認
Azure Data Factory Studioの「監視」>「トリガー実行」を開く
新しいトリガー実行が表示されることを確認
VMからのログ転送は5分間隔で実施されるため、トリガー実行まで5分ほどかかることがあります。
出力データの確認
ETL出力・ストリーミング入力用ストレージのtest/sampleディレクトリに変換後のファイルが作成されていることを確認します。
パイプラインのデータ変換処理は約5分ほどかかります。
内容を確認し、出力例のようにステータスコードが「404」のデータのみが抽出されていることを確認します。
{"@timestamp":"2025-11-11T03:23:18.321166Z","agent":"-","code":"404","host":"-","method":"GET","path":"/cgi-bin/luci/;stok=/locale","referer":"-","remote":"{CLIENT_IP}","size":"162","time":"11/Nov/2025:12:23:18 +0900","user":"-"}
{"@timestamp":"2025-11-11T05:25:48.381311Z","agent":"-","code":"404","host":"-","method":"GET","path":"/cgi-bin/luci/;stok=/locale","referer":"-","remote":"{CLIENT_IP}","size":"162","time":"11/Nov/2025:14:25:48 +0900","user":"-"}
ストリーミング処理の構築
Addonをデプロイする
Azure Data Lake Storage Gen2の作成(ストリーミング出力用)
TOKEN='<your-api-token>'
curl -v \
--location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/datalake' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Basic $TOKEN" \
--data '{
"location": "japaneast",
"performance": 1,
"redundancy": 1
}'
Azure Stream Analyticsの作成
curl -v \
--location 'https://secure.sakura.ad.jp/cloud-test/zone/is1y/api/addon/1.0/analytics/stream' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header "Authorization: Basic $TOKEN" \
--data '{
"location": "japaneast",
"sku": "Standard"
}'
ユーザーグループにアクセス権限を設定する (RBAC)
ユーザーグループに付与する権限
「利用者」グループに以下の権限を付与します:
ストレージ |
ロール |
目的 |
|---|---|---|
ストリーミング出力用 |
ストレージ BLOB データ閲覧者 |
ストリーミング結果の確認 |
設定手順
Azure Portalでストリーミング出力用ストレージアカウントを開く
「アクセス制御 (IAM)」を選択
「+ 追加」>「ロールの割り当ての追加」
「ストレージ BLOB データ閲覧者」を選択し、「利用者」グループを追加
マネージドIDにアクセス権限を設定する (RBAC)
Azure Stream AnalyticsのマネージドIDに付与する権限
Stream Analyticsがストリーミング入力用ストレージからデータを読み取り、ストリーミング出力用ストレージへ書き込めるよう設定します。
ストレージ |
ロール |
理由 |
|---|---|---|
ETL出力・ストリーミング入力用 |
ストレージ BLOB データ閲覧者 |
Azure Stream Analyticsがソースデータを読み取るため |
ストリーミング出力用 |
ストレージ BLOB データ共同作成者 |
Azure Stream Analyticsが出力するデータを書き込むため |
設定手順
ETL出力・ストリーミング入力用ストレージへの読み取り権限とストリーミング出力用ストレージへの書き込み権限を付与します。
ETL出力・ストリーミング入力用ストレージの「アクセス制御 (IAM)」を開く
Azure Stream AnalyticsのマネージドIDに「ストレージ BLOB データ閲覧者」を付与
ストリーミング出力用ストレージの「アクセス制御 (IAM)」を開く
Azure Stream AnalyticsのマネージドIDに「ストレージ BLOB データ共同作成者」を付与
データレイクを準備する
コンテナーとディレクトリの設定名
ストリーミング出力用ストレージに以下を設定します:
コンテナー名:
testディレクトリ名:
sample
作成手順
ストレージアカウントを開く
「コンテナー」>「+ コンテナーの追加」
コンテナー名に
testを入力し作成コンテナーを開き、「+ ディレクトリの追加」
ディレクトリ名に
sampleを入力し作成
認証方法の変更
作成したコンテナーの認証方法を「Microsoft Entraユーザーアカウント」に変更します。
Azure Stream Analyticsを設定する
入力の設定
Azure Stream Analyticsジョブを開く
「入力」>「+ ストリーム入力の追加」>「Blob Storage/ADLS Gen2」を選択
以下の設定を入力:
項目 |
設定値 |
|---|---|
入力エイリアス |
|
ストレージアカウント |
ETL出力・ストリーミング入力用ストレージ |
コンテナー |
|
認証モード |
マネージドID:システム割り当て |
イベントシリアル化形式 |
JSON |
エンコード |
UTF-8 |
イベントの圧縮タイプ |
なし |
出力の設定
「出力」>「+ 追加」>「Blob Storage/ADLS Gen2」を選択
以下の設定を入力する
項目 |
設定値 |
|---|---|
出力エイリアス |
|
ストレージアカウント |
ストリーミング出力用ストレージ |
コンテナー |
|
認証モード |
マネージドID:システム割り当て |
イベントシリアル化形式 |
JSON |
フォーマット |
改行区切り |
エンコード |
UTF-8 |
書き込みモード |
結果が到着したときにアペンドする |
パスパターン |
|
クエリの設定
「クエリ」を選択
以下のクエリを入力:
SELECT
*
INTO
result
FROM
test
ジョブの開始
「概要」を選択
「開始」>「今すぐ」を選択してジョブを開始
Stream Analyticsジョブの「概要」タブで、状態が「実行中」になったことを確認する。
ストリーミング処理の動作を確認する
データの流れの確認
Fluent Bitを利用してVMのアクセスログデータの転送
Azure Data Factoryでパイプラインが自動実行されることを確認
ETL出力・ストリーミング入力用ストレージに変換後のファイルが作成されることを確認
Azure Stream Analyticsがデータを処理していることを確認する
ストリーミング出力用ストレージに結果が出力されることを確認する
Azure Stream Analyticsのメトリック
Azure Stream Analyticsでは、入出力イベント数などのパフォーマンスに関する様々なメトリックが使用できます。