Skip to content

Commit 071246c

Browse files
authored
Add slack notification (#44)
* Added the Lambda function for the slack notification. * Added the AWS resource for the slack notification lambda function and Webhook URL parameter. Also added new parameters for the SendNotification task for the CD2RefreshStateMachine that will be passed to the slack notification lambda function. * Reverted the SNS topic name because it is unnecessary. * Added the Slack Webhook URL secret name parameter value. * Removed the response.text due to github code scanning alert. * Added the If condition when deciding where to get the SNS Topic ARN depending on the CreateNotificationTopic condition. * Removed the Lambda function code because we will use the existing SNS topic. * Removed the Lambda function AWS resource and Slack related paramter because they are no longer needed. * Updated the message format for SendNotification to match the one expected in the lambda function named ubcla-notifications-sns-slack-notification. * Added a comma in the JSON format for the SNS message. * Updated the Message format for the SendNotification because using the Sub function in the JSON caused an error during the cloudformation stack update. * Removed quotes in the Message.$ parameter. * Fixed the cfn linting error. * Revert "Removed the Lambda function code because we will use the existing SNS topic." This reverts commit b5118f8. * Revert "Removed the Lambda function AWS resource and Slack related paramter because they are no longer needed." This reverts commit 7a37c39. * Reverted the previous changes for the Message value for the SendNotification step. * Added a function for processing the processed CD2 table data and outputs the human readable message. * Took out the inline IAM policy from SlackNotificationFunction and created a new IAM role for it. * Removed unnecessary paramters from SendNotfication that I added previously. * Added a step to transform the string message into the real data. * Fixed the slack secret name references in the AWS resources. * Removed the value for SlackWebHookURLSecretNameParameter. This value will be provided via the environment variables in the AWS Codepipeline. * Changed the docker image source from docker.io to public.ecr.aws to prevent anonymous pull rate limits from Docker Hub during the CodeBuild. * Added the AWS environment value in the sns message title to tell if the message is from staging or production. * Commented the complete tables because it makes the notification too crowded. The failed tables are more important information for the notification. * Added the error message part in the sync_table payload message because these information is missing in the sync_table output. Added the parameter called failed_error_messages in the PivotResults state in the step function. * Added the lines to test if the failed_error_messages get passed correctly from the step function. * Changed the output payload from the PivotResult to SendNotification state to send the entire input data. We will process the CD2 data update result in the Lambda function. * Added generate_error_string() to generate a error string. Added get_ecs_log_url() to get the cloudwatch log url. Replaced the hardcoded error messages with generate_error_string() calls. * Replaced the encoding for the forward slash in the cloudwatch log URL with the URL-encoded version. * Encoding the forward slash in the cloudwatch log stream string. * Fixing a typo in the cloudwatch log url template string. * Changed the Error string format. Also added the condition to handle the case where the exception message is empty. * Refactored the message processing part to reflect the input data format changes from the step function. * Simplified the error format. * Introduced red and green emojis in the slack notification message. * Added the complete table with schema update. * Shorten the message category headers. * Removed failed_init and failed_sync because they are included under the failed section. * Added 2 different thresholds for the number of failed tables. Added a conditional statement to provide different emojis depends on the number of failed tables. When there is a high number of failed tables, we will alert everyone in the notification channel in slack. * Updated the slack emoji codes. * Added the dubugging message for testing. * Revert "Added the dubugging message for testing." This reverts commit 496f107. * Added the comment regarding the edge case where we use the exception class name in case the error message is missing. * Simplified the slack notification title. Added the cloudformation stack name in the title in case you have multiple stacksets for multiple Canvas environments. * Merged the failed tables and errors in the notification message. * Fixed the missing bold tag for the notification title. * Moved get_secret_value() and send_to_slack() into the shared folder because these functions will be used in multiple places. * Created a lambda layer for shared functions for lambda functions. * Added a step to properly name the environment type in the notification title. * Moved the environment name conversion code as a separate function and put it into the lambda layer because it will be used in the multiple lambda functions. Removed sys.path.append() because it is not needed when referencing functions from the lambda layers. * Added the slack notification message in case the list_tables operation fails for any reasons. * Added the requests module for sending a slack message. * Added the red x emoji for the listTables error. * Added the permission to access the Slack Webhook URL from the AWS Secrets Manager for ListTablesRole for slack notification. * Added comments. * Changed the print() statement with the logger.exception(). * Removed the unnecessary instruction on what to do for the invalid_client error. * Changed the LambdaLayer related names. * Removed the parameters that has been commented out.
1 parent 9df5e05 commit 071246c

File tree

12 files changed

+281
-26
lines changed

12 files changed

+281
-26
lines changed

init_table/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM docker.io/library/python:3-alpine
1+
FROM public.ecr.aws/docker/library/python:3-alpine
22

33
ARG UID=1012
44
ARG GID=1012

lambda-layers/python/shared/__init__.py

Whitespace-only changes.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import boto3
2+
import requests
3+
from botocore.exceptions import ClientError
4+
import logging
5+
6+
logger = logging.getLogger()
7+
logger.setLevel(logging.INFO)
8+
9+
def get_secret_value(secret_name, region):
10+
client = boto3.client("secretsmanager", region)
11+
12+
try:
13+
response = client.get_secret_value(SecretId=secret_name)
14+
15+
if 'SecretString' in response:
16+
secret = response['SecretString']
17+
else:
18+
# Decode binary secret if necessary
19+
secret = response['SecretBinary']
20+
21+
return secret
22+
23+
except ClientError as e:
24+
logger.exception(f"Error while fetching secret: {e}")
25+
return None
26+
27+
def send_to_slack(message, slack_webhook_url):
28+
"""Send a message to Slack."""
29+
try:
30+
response = requests.post(slack_webhook_url, json={"text": message})
31+
32+
if not response.ok:
33+
logger.error(f"Failed to send message to Slack: {response.status_code}")
34+
except Exception as e:
35+
logger.exception(f"An error occured during the send_to_slack() operation: {e}")
36+
37+
def get_full_environment_name(environment_string):
38+
if "stg" in environment_string.lower() or "stag" in environment_string.lower():
39+
full_environment_name = "Staging"
40+
elif "prod" in environment_string.lower():
41+
full_environment_name = "Production"
42+
43+
return full_environment_name

list_tables/app.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from botocore.config import Config
88
from dap.api import DAPClient
99
from dap.dap_types import Credentials
10+
from shared.utils import get_secret_value, send_to_slack, get_full_environment_name
1011

1112
region = os.environ.get('AWS_REGION')
1213

@@ -23,28 +24,52 @@
2324

2425
namespace = 'canvas'
2526

27+
REGION = os.environ["AWS_REGION"]
28+
SLACK_WEBHOOK_URL_SECRET_NAME = os.getenv("SLACK_WEBHOOK_SECRET_NAME")
29+
STACK_NAME = os.environ["STACK_NAME"]
30+
SLACK_WEBHOOK_URL = get_secret_value(SLACK_WEBHOOK_URL_SECRET_NAME, REGION)
31+
32+
def generate_error_message(input_error):
33+
environment_name = get_full_environment_name(env)
34+
red_cross_mark_emoji = ':x:'
35+
36+
sns_title = f"<!channel> *{STACK_NAME} ({environment_name})*\n"
37+
message = f"{red_cross_mark_emoji} The ListTables step failed with the following error: \n {input_error}"
38+
39+
return sns_title + message
2640

2741
@logger.inject_lambda_context(log_event=True)
2842
def lambda_handler(event, context: LambdaContext):
29-
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
43+
try:
44+
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
45+
46+
dap_client_id = params['dap_client_id']
47+
dap_client_secret = params['dap_client_secret']
3048

31-
dap_client_id = params['dap_client_id']
32-
dap_client_secret = params['dap_client_secret']
49+
logger.info(f"dap_client_id: {dap_client_id}")
3350

34-
logger.info(f"dap_client_id: {dap_client_id}")
51+
credentials = Credentials.create(client_id=dap_client_id, client_secret=dap_client_secret)
3552

36-
credentials = Credentials.create(client_id=dap_client_id, client_secret=dap_client_secret)
53+
os.chdir("/tmp/")
3754

38-
os.chdir("/tmp/")
55+
tables = asyncio.get_event_loop().run_until_complete(async_get_tables(api_base_url, credentials, namespace))
3956

40-
tables = asyncio.get_event_loop().run_until_complete(async_get_tables(api_base_url, credentials, namespace))
57+
# we can skip certain tables if necessary by setting an environment variable (comma-separated list)
58+
skip_tables = os.environ.get('SKIP_TABLES', '').split(',')
4159

42-
# we can skip certain tables if necessary by setting an environment variable (comma-separated list)
43-
skip_tables = os.environ.get('SKIP_TABLES', '').split(',')
60+
tmap = list(map(lambda t: {'table_name': t, "state": "needs_sync"}, [t for t in tables if t not in skip_tables]))
4461

45-
tmap = list(map(lambda t: {'table_name': t, "state": "needs_sync"}, [t for t in tables if t not in skip_tables]))
62+
return {'tables': tmap}
63+
except Exception as e:
64+
logger.exception(e)
65+
message = generate_error_message(e)
4666

47-
return {'tables': tmap}
67+
# Send a slack notification to alert any issue during the list_tables operation.
68+
try:
69+
send_to_slack(message, SLACK_WEBHOOK_URL)
70+
except Exception as e:
71+
logger.exception(f"Slack notification failed: {e}")
72+
raise
4873

4974

5075
async def async_get_tables(api_base_url: str, credentials: Credentials, namespace: str):

list_tables/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
aws-lambda-powertools==2.43.1
22
pysqlsync==0.8.2
33
instructure-dap-client[postgresql]==1.0.0
4+
requests ~= 2.32.3

slack_notification/__init__.py

Whitespace-only changes.

slack_notification/app.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
import sys
3+
import logging
4+
import ast
5+
6+
from shared.utils import get_secret_value, send_to_slack, get_full_environment_name
7+
8+
logger = logging.getLogger()
9+
logger.setLevel(logging.INFO)
10+
11+
REGION = os.environ["AWS_REGION"]
12+
ENVIRONMENT = os.environ["AWS_ENVIRONMENT"]
13+
SLACK_WEBHOOK_URL_SECRET_NAME = os.getenv("SLACK_WEBHOOK_SECRET_NAME")
14+
STACK_NAME = os.environ["STACK_NAME"]
15+
16+
SLACK_WEBHOOK_URL = get_secret_value(SLACK_WEBHOOK_URL_SECRET_NAME, REGION)
17+
18+
def process_table_update_message(message):
19+
green_check_mark_emoji = ':white_check_mark:'
20+
red_cross_mark_emoji = ':x:'
21+
warning_mark_emoji = ':warning:'
22+
failed_table_number_emoji = green_check_mark_emoji
23+
failed_tables_number_lower_threshold = 2
24+
failed_tables_number_upper_threshold = 10
25+
26+
environment_name = get_full_environment_name(ENVIRONMENT)
27+
28+
sns_title = f"*{STACK_NAME} ({environment_name})*\n"
29+
30+
#Transform the string message from the step function into the real data.
31+
message = ast.literal_eval(message)
32+
33+
# Extract different table state information and error messages from the input data from the SNS topic.
34+
complete_tables = [item["table_name"] for item in message if item.get("state") == "complete"]
35+
complete_tables_with_schema_update = [item["table_name"] for item in message if item.get("state") == "complete_with_update"]
36+
failed = [item for item in message if item.get("state") == "failed" or item.get("state") == "needs_init" or item.get("state") == "needs_sync"]
37+
failed_tables = [item["table_name"] for item in failed if item.get("state") == "failed" or item.get("state") == "needs_init" or item.get("state") == "needs_sync"]
38+
error_messages = [item.get("error_message") for item in failed]
39+
40+
number_of_failed_tables = len(failed_tables)
41+
42+
# Apply different emojis and the <!channel> tag depending on the number of errors.
43+
if number_of_failed_tables > failed_tables_number_upper_threshold:
44+
sns_title = "<!channel> " + sns_title
45+
failed_table_number_emoji = red_cross_mark_emoji
46+
elif number_of_failed_tables > failed_tables_number_lower_threshold:
47+
failed_table_number_emoji = red_cross_mark_emoji
48+
else:
49+
failed_table_number_emoji = warning_mark_emoji
50+
51+
# Create a multi-line message for the slack notification.
52+
message = (
53+
f'{green_check_mark_emoji} Complete: {str(len(complete_tables))} \n'
54+
f'{green_check_mark_emoji} Complete w/ Schema Update: {str(len(complete_tables_with_schema_update))} \n'
55+
f'{failed_table_number_emoji} Failed: {str(number_of_failed_tables)} \n'
56+
f'Failed Tables: \n' + '\n'.join(f'{i + 1}. {msg}' for i, msg in enumerate(error_messages))
57+
)
58+
59+
message = sns_title + message
60+
61+
return message
62+
63+
def lambda_handler(event, context):
64+
65+
# Get the SNS message payload
66+
sns_message = event['Records'][0]['Sns']['Message']
67+
68+
sns_message = process_table_update_message(sns_message)
69+
70+
try:
71+
send_to_slack(sns_message, SLACK_WEBHOOK_URL)
72+
except Exception as e:
73+
logger.exception(f"Slack notification failed: {e}")
74+
raise
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
requests ~= 2.32.3

sync_table/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM docker.io/library/python:3-alpine
1+
FROM public.ecr.aws/docker/library/python:3-alpine
22

33
ARG UID=1012
44
ARG GID=1012

sync_table/app.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from dap.integration.database_errors import NonExistingTableError
1414
from dap.replicator.sql import SQLReplicator
1515
from pysqlsync.base import QueryException
16+
import requests
1617

1718
region = os.environ.get("AWS_REGION")
1819

@@ -31,6 +32,38 @@
3132
param_path = f"/{env}/canvas_data_2"
3233
api_base_url = os.environ.get("API_BASE_URL", "https://api-gateway.instructure.com")
3334

35+
FUNCTION_NAME = 'sync_table'
36+
37+
def get_ecs_log_url():
38+
# Get region from env
39+
region = os.environ.get('AWS_REGION', 'ca-central-1') # fallback if not set
40+
41+
# Get ECS metadata
42+
metadata_uri = os.environ.get('ECS_CONTAINER_METADATA_URI_V4')
43+
if not metadata_uri:
44+
raise Exception("ECS_CONTAINER_METADATA_URI_V4 not set")
45+
46+
metadata = requests.get(f"{metadata_uri}/task").json()
47+
48+
log_group = metadata['Containers'][0]['LogOptions']['awslogs-group']
49+
log_stream = metadata['Containers'][0]['LogOptions']['awslogs-stream']
50+
51+
log_url = (
52+
f"https://{region}.console.aws.amazon.com/cloudwatch/home"
53+
f"?region={region}#logsV2:log-groups/log-group/{log_group.replace('/', '$252F')}/log-events/{log_stream.replace('/', '$252F')}"
54+
)
55+
56+
return log_url
57+
58+
def generate_error_string(function_name, table_name, state, exception, cloudwatch_log_url):
59+
if len(str(exception)) != 0:
60+
return f"{table_name} - {function_name} - {state}, Error: {str(exception)} (<{cloudwatch_log_url}|CloudWatch Log>)"
61+
62+
# This is for the ProcessingError thrown by the tables: grading_period_groups and grading_periods.
63+
# This particular error object doesn't have any error message. In this case, we use the name of the class of an exception object.
64+
else:
65+
return f"{table_name} - {function_name} - {state}, Error: {type(exception).__name__} (<{cloudwatch_log_url}|CloudWatch Log>)"
66+
3467
def start(event):
3568
params = ssm_provider.get_multiple(param_path, max_age=600, decrypt=True)
3669

@@ -52,6 +85,8 @@ def start(event):
5285
client_id=dap_client_id, client_secret=dap_client_secret
5386
)
5487

88+
cloudwatch_log_url = get_ecs_log_url()
89+
5590
table_name = event["table_name"]
5691

5792
logger.info(f"syncing table: {table_name}")
@@ -80,22 +115,27 @@ def start(event):
80115
except Exception as e:
81116
logger.exception(e)
82117
event["state"] = "failed"
118+
# Make the each error as string.
119+
event["error_message"] = generate_error_string(FUNCTION_NAME, table_name, event["state"], e, cloudwatch_log_url)
83120
finally:
84121
restore_dependencies(db_name="cd2", table_name=table_name)
85122
else:
86123
event["state"] = "failed"
87124
except NonExistingTableError as e:
88125
logger.exception(e)
89126
event["state"] = "needs_init"
127+
event["error_message"] = generate_error_string(FUNCTION_NAME, table_name, event["state"], e, cloudwatch_log_url)
90128
except ValueError as e:
91129
logger.exception(e)
92130
if "table not initialized" in str(e):
93131
event["state"] = "needs_init"
94132
else:
95133
event["state"] = "failed"
134+
event["error_message"] = generate_error_string(FUNCTION_NAME, table_name, event["state"], e, cloudwatch_log_url)
96135
except Exception as e:
97136
logger.exception(e)
98137
event["state"] = "failed"
138+
event["error_message"] = generate_error_string(FUNCTION_NAME, table_name, event["state"], e, cloudwatch_log_url)
99139

100140
logger.info(f"event: {event}")
101141

@@ -105,7 +145,6 @@ def start(event):
105145
async def sync_table(credentials, api_base_url, db_connection, namespace, table_name):
106146
async with DAPClient(api_base_url, credentials) as session:
107147
await SQLReplicator(session, db_connection).synchronize(namespace, table_name)
108-
109148

110149

111150
def drop_dependencies(db_name, table_name):
@@ -176,9 +215,9 @@ def restore_dependencies(db_name, table_name):
176215
if token:
177216
stepfunctions.send_task_success(
178217
taskToken=token,
179-
output=json.dumps(payload))
218+
output=json.dumps(payload))
180219

181-
"""
220+
"""
182221
if token and result['state'] == 'failed':
183222
stepfunctions.send_task_failure(
184223
taskToken=token,

0 commit comments

Comments
 (0)