In this blogpost you will read about a Data Alerts system in Slack: a Slackbot that alerts users when it finds inconsistencies in data, using predefined BigQuery queries. After a brief introduction on what this system entails, this article will elaborate on why one would use such a system, as well as how it works in technical terms.
What does the system do?
In short, a set of Python scripts is run to control a couple of components, of which two are most important: BigQuery and Slack. To easily keep track of data quality, the system performs scheduled queries in BigQuery. If the output of said query surpasses a certain threshold, a Slack Bot/App is instructed to alert the user through a message.
Why use it?
With access to a growing amount of data to help us make decisions, we can get overwhelmed. As we are sailing the wild waters of data, keeping the ship afloat becomes increasingly difficult. Floating on an abundance of data, you might have had the sinking feeling that some of your data is unreliable. To solve this, we need to become more confident by maintaining data quality. After all, we don’t want to make decisions based on unreliable data. Think of the system as a lighthouse that warns our ship of incoming storms. It helps us catch potential issues before they spiral out of control.
Previously, Google gave us the option in Universal Analytics to create and manage custom alerts. Unfortunately, with the switch to GA4, this feature was not carried over. With this system, one can still set up alerts to do the data monitoring for us.
How does the system work?
This section will dive deeper into how the system works and go over the technical details. It will give a step-by-step explanation: from writing the queries to receiving an alert. To better understand all the steps, take a good look at the diagram below with an overview of how all components are connected. The system consists of the following components:
- Set of Python scripts. Sends requests, handles responses and performs all the logic to decide whether an alert should be triggered and sent.
- Google Sheets
- Google BigQuery
- Slack
- Google Cloud Platform (specifically, Google Cloud Functions and Cloud Scheduler)
The following steps are discussed:
0. Preparation
- Google Cloud Scheduler sends a request to our Cloud Function
- Our Cloud Function receives and handles the request
- Python code requests alerttable from BigQuery
- BigQuery returns Google Sheet data to Python code
- Handling each alert row input
- Sending Slack alerts
- Inform Google Cloud how many alerts were triggered
Step 0: Preparation
Before the system can run, a couple of things need to be set up and ready for our Python code to send them instructions.
Firstly, a Google Sheets file needs to be created, this is where the user can input query and alert variables. The sheet should have the following set of columns, in the order specified below:
- Yesterday (regular)
- Yesterday (intraday)
- Week before
- Difference percentage (by how much the two numbers may differ)
- Message
The first three are queries that return a single numeric value. Two ‘yesterday’ queries ought to be the same, except for the table they query: either a regular table or an intraday table. GA4 creates an intraday table for the most recent (live) data, before it turns it into a regular table. To avoid any errors, we give our Python code the option to use either query depending on which one is available. As will be explained in later steps, our Python code checks which one is available.
Next, a BigQuery table should be created that connects to the Google Sheet as is shown in the diagram. When the table is queried, it will fetch all data from the connected Google Sheet and return it. It is as if you were to query the Google Sheet.
Moreover, we need to install the Slack App in a Slack Workspace and copy a Slack Bot Token. This token will be used in our Python code to make sure that Slack trusts us and allows the Python code to send messages. To enable the Slack App to send messages, we also need to add it to a channel and copy the channel ID to specify in our Python code where to send a message.
Lastly, we need to put our Python code into a Cloud Function so that the Cloud Scheduler can call it.
Step 1: Google Cloud Scheduler sends a request to our Cloud Function
Upon a scheduled time, Google Cloud Scheduler sends a request to the Cloud Function and tells it to run. A Cloud Function is called with a request object, in this case in JSON format. The example below shows what this request object should contain.
"""
{
"slack_bot_token" : "[SLACK BOT TOKEN]",
"projectname" : "[GOOGLE CLOUD PROJECT NAME]",
"dataset" : "[NAME OF THE DATASET TO QUERY FROM (COMES AFTER PROJECTNAME)]",
"channelid" : "[SLACK CHANNEL ID]",
"alerttable" : "[PATH TO ALERTTABLE IN BIGQUERY]"
}
"""
Step 2: Our Cloud Function receives and handles the request
Once the Cloud Function has received the request, our Python code handles this request by taking the values from the JSON object and logging to Google Cloud that this was successful. It then constructs objects for a BigQuery connector and Slack connector.
class BigQuery:
"""
A Class for the BigQuery connector
Attributes:
----------
projectname : str
Name of the BigQuery project that the connector is connected to
client : Client
BigQuery Client object, used to call BigQuery Client API functions
Methods:
----------
performQuery(query)
Performs a given query using the connector
intradayAvailable(tablename)
Checks if an intraday table is available for yesterday's date, using the tablename passed as parameter
"""
def __init__(self, projectname: str, dataset: str):
"""
Parameters:
----------
projectname : str
Name of the BigQuery project to create the connector for
dataset : str
Name of the Dataset to query from within the BigQuery project
"""
self.projectname = projectname
#We set the correct scopes when initializing our BigQuery client to ensure we can query a table that is connected to a Drive document (our Sheet in this case)
SCOPES = ["https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/cloud-platform"]
#Using the gauth.default() method, we can fetch the credentials from the environment variables that are set in Cloud Functions when assigning a certain service account
#This means we don't need a (JSON) key file for authentication!
credentials, _ = gauth.default(scopes=SCOPES)
self.client = bq.Client(project=projectname, credentials=credentials)
#We use the client's get_dataset() method to check if the parameters passed to this function, as well as the credentials, are correct
try:
print("Auth Success - fetched dataset: " + str(self.client.get_dataset(projectname + "." + dataset)))
except Exception as e:
#If not, we raise an Exception
raise BigQueryAuthError("INVALID BIGQUERY VARIABLES: Error thrown while trying to get dataset from BigQuery project. Please take another look at your JSON body, especially the 'projectname' and 'dataset' values")
BigQuery Connector class
As shown in the code snippet above, the BigQuery class takes a ‘projectname’ and ‘dataset’ with which it initializes a BigQuery client. It does so by first getting the right credentials using the Google Auth library, after which it uses the BigQuery API Client library, passing these credentials. Finally, it checks if the function parameters, as well as the credentials, are valid. From then on we can call the class’s performQuery and intradayAvailable methods as described in the documentation of the code snippet above.
class Slack:
"""
A Class for the Slack Connector
Attributes:
-----------
client : WebClient
Slack WebClient object, initialised with slack bot token. Used to call Slack WebClient functions
defaultchannel : str
The default Slack channel ID to use if one isn't specified when calling a method or the specified ID is invalid
Methods:
----------
postBasicMessage(channelid, fallbacktext, header, message)
Posts a basic message to the specified channel
"""
def __init__(self, defaultchannelid : str, slackbottoken: str):
"""
Initialises the Slack Connector.
Raises an Exception if the Slack Bot Token or Channel ID passed to the function is invalid
Parameters:
----------
defaultchannelid : str
A default channel ID to use for when none is specified in method calls
Check if the ID is valid, otherwise notify user of error
"""
self.client = WebClient(token=slackbottoken)
#We first use the client's auth_test() method to check if the Slack Bot Token is valid
try:
print("Results from auth test using provided Slack Bot Token: " + str(self.client.auth_test()))
except SlackApiError as e:
if(e.response["error"] == "invalid_auth"):
raise SlackAuthError("INVALID TOKEN: The Slack Bot Token you provided fails the authentication test")
else:
raise SlackAuthError("An Unknown Error occurred while attempting a Slack auth test")
#Lastly, we check if the channel ID is valid. If not we raise an exception to tell our function to terminate
if not (self.validChannelID(defaultchannelid)):
raise SlackAuthError("INVALID CHANNEL ID: Error thrown while checking if Slack channel ID is valid")
else:
self.defaultchannel = defaultchannelid
Slack Connector class
The code snippet above shows the class for the Slack connector object that is initialized. It uses a Slack Channel ID and Slack Bot Token to set the default channel ID and initialize a Slack WebClient object using the Python Slack SDK, respectively. Next, it checks if the provided credentials and channel ID are valid. From then on we can call our postBasicMessage method to instruct our Slack app to send a message.
Once the two connectors have been successfully initialized, Google Cloud is notified through a log message.
Step 3: Python code requests alerttable data from BigQuery
Using the BigQuery connector, we call performQuery with the alerttable location from the JSON object from earlier. It uses a simple select all query as shown below.
#Perform a query to fetch all queries and corresponding alert messages
#Will fetch queries and corresponding messages from a Google Sheet that is linked to a table in BigQuery
query = bq.performQuery("SELECT * FROM `{alerttable}`".format(alerttable = alerttable))
Step 4: BigQuery returns Google Sheet data to Python code
When BigQuery receives the request, it fetches all the data from the connected Google Sheets and returns it to our Python code. When this is successfully received, Google Cloud is notified through a log message. We then check if an intraday table is available for the dataset that will be queried, using the BigQuery connector that was initialized earlier. Using this information, we construct an Alerthandler object which therefore knows which of the two ‘yesterday’ queries to use.
class Alerthandler:
"""
A Class that handles all the functionalities related to our alerts
Methods:
----------
extractAlertParams(row)
Takes a row as argument and extracts and returns all the necessary values
validQuery(query)
Checks if the query is valid (doesn't contain any illegal statements)
triggerAlert(yd, wb, th)
Does a calculation to see if we should send an alert and composes a message if that is the case
"""
def __init__(self, intradayAvailable):
"""
Initializes the Alerthandler object
Declares the rowindex to be 0 (tracks the row index currently being processed)
Declares the alertcount to be 0 (tracks how many alerts have already been triggered)
Parameters:
----------
intradayAvailable: bool
Boolean which indicates whether an intraday table is available for yesterday's table
"""
self.rowindex = 0
self.alertcount = 0
self.intraday = intradayAvailable
Alerthandler class
Our Alerthandler is where the magic happens: it handles each row that is returned from the query and determines whether an alert should be triggered and a message should be sent. It also tracks which row it is handling to enable localizing an error when something unexpected happens. This way, it can send detailed log messages to Google Cloud.
Step 5: Handling each alert row input
Step 5A: Extracting the alert parameters. Alerthandler’s extractAlertParams is called with a row from the alerttable as argument. Alerthandler takes each value from the right column, checks if it is the right type and returns all the values.
Step 5B: Check if the ‘yesterday’ and ‘week before’ queries are valid. Alerthandler’s validQuery method is called to check if the queries are valid. Reasons to flag a query as invalid and raise an exception are for example if it contains the “DELETE” statement.
Step 5C: Perform queries by calling performQuery using the BigQuery Connector. Here, the Python code sends another request to BigQuery with each of the queries. BigQuery responds with the results of each of the queries.
Step 5D: Check if an alert should be triggered. Using the results returned before, Alerthandler’s triggerAlert method is called. This method checks whether all values have the right type and if the value that will be the denominator is not zero, since that would raise an error. It then performs a calculation to determine whether an alert should be triggered. As shown below, if the ‘yesterday’ value and the ‘week before’ value differ by more than the predefined percentage, it will return the message that should be sent. This includes details of the numbers that triggered the alert, as shown below. As you can see, the ‘yesterday’ and ‘weekbefore’ queries do need to be queries for these time periods, they are merely a suggestion. The point is that one value is divided by the other and compared to a threshold. Hence, one could do the same for a time period of, for instance, a month.
if (value_yd / value_wb < (1.0-diffp)):
self.alertcount += 1
return " is at least {diffp}% lower than last week \n Yesterday: {yd} \n Week before: {wb}".format(diffp = diffp*100, yd = round(value_yd,2), wb = round(value_wb,2))
if (value_yd / value_wb > (1.0+diffp)):
self.alertcount += 1
return " is at least {diffp}% higher than last week \n Yesterday: {yd} \n Week before: {wb}".format(diffp = diffp*100, yd = round(value_yd,2), wb = round(value_wb,2))
else:
return None
Step 6: Sending Slack alerts
If the Alerthandler returns an alert message for a row, we call the Slack Connector object to instruct the Slack App to send the message. In other words, our Python code sends a request to Slack to send a message on the channel we specified earlier. This method call is shown below. The ‘message_str’ variable is the message that was specified by the user in the Google Sheet.
if not (alert == None):
slck.postBasicMessage("alert! :rotating_light:", "Alert Triggered! :rotating_light:", message_str + alert)
Step 7: Inform Google Cloud how many alerts were triggered
Lastly, our Python code informs Google Cloud, through a log message, how many alerts were triggered.
Time to put it to use!
The Data Alerts system is simple yet effective. We define a set of queries in a Google Sheet which we connect to BigQuery. The system runs this set of queries; the number that each query returns is compared to a previous period. If this change exceeds a threshold that we have defined, the system automatically sends a Slack message to inform the relevant parties. Since we deploy this system as a cloud function, we can schedule it to run every morning. This way, we can start the day feeling confident.
Implementing the system can help monitor data quality and ensure our data remains reliable. Just like a lighthouse warns ships of incoming storms, our system alerts us of potential errors before they become major problems.
Safe voyage!
Feel like reading more data stories? Then take a look at our blog page. Always want to stay up to date? Be sure to follow us on LinkedIn!