Alarmes SSH prêtes pour la production et indépendantes du cloud sur Slack
Avec une simple recherche sur Google, il est possible de voir qu'il est très difficile de trouver du matériel de bonne qualité sur les alarmes SSH, simple parce que de nos jours, les gens ont tendance à dépendre de plus en plus du Cloud pour effectuer des tâches simples.
Un bon exemple est AWS CloudWatch avec ses groupes de journaux qui sont excellents mais qui dépendent de l'agent SSM.
Imaginons un hôte auquel personne n'est censé y accéder et sa cible de grande valeur, si votre entreprise dispose d'une équipe SOC, l'accès à cette machine peut être surveillé et atténué, mais sinon, cela peut devenir un cauchemar
Ce tutoriel a donc pour but d'enseigner comment il est possible non seulement de recevoir des alertes de SSH dans des machines cloud sans agents mais aussi comment recevoir des IOC "Indicateurs de compromission" sur les alertes.
Laboratoire pratique
10 minutes
- Commençons par une simple ligne de code
sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
sudo vim /etc/ssh/scripts/sshnotify.sh
#!/bin/bash
if [ "$PAM_TYPE" != "close_session" ]; then
host="$(hostname)"
python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit
sudo chmod +x /etc/ssh/scripts/sshnotify.sh
Veuillez noter que abuseipdb a une API gratuite, vous devez créer un compte pour obtenir votre clé.
Slack Webhook est utilisé.
sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-ip', action='store', dest='simple_ip',
help='Store ip')
parser.add_argument('-host', action='store', dest='simple_host',
help='Store host')
parser.add_argument('-user', action='store', dest='simple_user',
help='Store user')
results = parser.parse_args()
print(results.simple_ip)
print(results.simple_host)
querystring = {
'ipAddress': results.simple_ip,
'maxAgeInDays': '1'
}
# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Accept': 'application/json',
'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring)
# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
"blocks": [
{
"type": "image",
"image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
"alt_text": "inspiration"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Security report : *"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "IP: " + str(decodedResponse["data"]["ipAddress"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Domain: " + str(decodedResponse["data"]["domain"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Last date reported: " + str(decodedResponse["data"]["lastReportedAt"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Abuse confidence: " + str(decodedResponse["data"]["abuseConfidenceScore"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Country: " + str(decodedResponse["data"]["countryCode"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Usage type: " + str(decodedResponse["data"]["usageType"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Total Reports: " + str(decodedResponse["data"]["totalReports"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "ISP: " + str(decodedResponse["data"]["isp"])
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sumary*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH login:* " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH Accessfrom IP* " + str(results.simple_ip)
}
}
]
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
sudo systemctl restart sshd

Comme mon adresse IP est propre, aucun rapport n'en provient, mais dans une situation réelle, cela peut vous aider à comprendre si une réponse à un incident est nécessaire.