main.py
import base64
import functions_framework
import requests
import json
# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
# Print out the data from Pub/Sub, to prove that it worked
data = base64.b64decode(cloud_event.data["message"]["data"])
# バイト文字列ならデコード
if isinstance(data, bytes):
data = data.decode('utf-8')
# アラート名のデフォルト値を設定
budget_display_name = 'アラート'
# 配列に変換して必要なデータを取得
if isinstance(data, str):
data_json = json.loads(data)
if 'budgetDisplayName' in data_json:
budget_display_name = data_json['budgetDisplayName']
# messageの作成
message = f"【{budget_display_name}】\n```\n{data}\n```"
# Slackに投稿
requests.post(
url='SlackのwebhookURL',
data=json.dumps({
"text": message,
})
)
requirements.txt
functions-framework==3.*
requests