Alarmas SSH independientes de la nube y listas para producción en Slack
Con una simple búsqueda en google es posible ver que es muy difícil encontrar material de buena calidad sobre Alarmas SSH, sencillo porque hoy en día la gente tiende a depender cada vez más de la Nube para realizar tareas sencillas.
Un buen ejemplo es AWS CloudWatch con sus grupos de registros que son geniales pero dependen del Agente de SSM.
Imaginemos un host al que se supone que nadie debe acceder y su objetivo de alto valor, si su empresa tiene un equipo SOC, el acceso a esta máquina puede monitorearse y mitigarse, pero si no, puede convertirse en una pesadilla.
Por lo tanto, este tutorial tiene como objetivo enseñar cómo es posible no solo recibir alertas de SSH en máquinas en la nube sin agentes, sino también cómo recibir "Indicadores de compromiso" de IOC en las alertas.
Laboratorio manual
10 minutos
- Comencemos con una simple línea de código.
sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
sudo vim /etc/ssh/scripts/sshnotify.sh
#!/bin/bash
if [ "$PAM_TYPE" != "close_session" ]; then
host="$(hostname)"
python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit
sudo chmod +x /etc/ssh/scripts/sshnotify.sh
Tenga en cuenta que abuseipdb tiene una API gratuita, debe crear una cuenta para obtener su clave.
Slack Webhook se está utilizando.
sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-ip', action='store', dest='simple_ip',
help='Store ip')
parser.add_argument('-host', action='store', dest='simple_host',
help='Store host')
parser.add_argument('-user', action='store', dest='simple_user',
help='Store user')
results = parser.parse_args()
print(results.simple_ip)
print(results.simple_host)
querystring = {
'ipAddress': results.simple_ip,
'maxAgeInDays': '1'
}
# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Accept': 'application/json',
'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring)
# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
"blocks": [
{
"type": "image",
"image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
"alt_text": "inspiration"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Security report : *"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "IP: " + str(decodedResponse["data"]["ipAddress"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Domain: " + str(decodedResponse["data"]["domain"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Last date reported: " + str(decodedResponse["data"]["lastReportedAt"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Abuse confidence: " + str(decodedResponse["data"]["abuseConfidenceScore"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Country: " + str(decodedResponse["data"]["countryCode"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Usage type: " + str(decodedResponse["data"]["usageType"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Total Reports: " + str(decodedResponse["data"]["totalReports"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "ISP: " + str(decodedResponse["data"]["isp"])
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sumary*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH login:* " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH Accessfrom IP* " + str(results.simple_ip)
}
}
]
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
sudo systemctl restart sshd

Como mi IP está limpia, no se generan informes, pero en una situación real puede ayudarlo a comprender si se necesita una respuesta a incidentes.