Siap produksi dan Alarm SSH cloud agnostik di Slack
Dengan pencarian sederhana di google mungkin untuk melihat bahwa sangat sulit untuk menemukan materi berkualitas baik pada Alarm SSH, sederhana karena saat ini orang cenderung semakin bergantung pada Cloud untuk melakukan tugas-tugas sederhana.
Contoh yang bagus adalah AWS CloudWatch dengan grup lognya yang bagus tetapi bergantung pada Agen SSM.
Bayangkan satu host yang tidak boleh diakses oleh siapa pun dan target bernilai tinggi, jika perusahaan Anda memiliki Tim SOC, akses ke mesin ini dapat dipantau dan dikurangi tetapi jika tidak maka itu bisa menjadi mimpi buruk
Jadi tutorial ini bertujuan untuk mengajarkan bagaimana mungkin untuk tidak hanya menerima peringatan dari SSH di mesin cloud tanpa agen tetapi juga bagaimana menerima "Indikator kompromi" IOC pada peringatan tersebut .
Laboratorium Tangan
10 menit
- Mari kita mulai dengan baris kode sederhana
sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
sudo vim /etc/ssh/scripts/sshnotify.sh
#!/bin/bash
if [ "$PAM_TYPE" != "close_session" ]; then
host="$(hostname)"
python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit
sudo chmod +x /etc/ssh/scripts/sshnotify.sh
Harap perhatikan bahwa abuseipdb memiliki API gratis, Anda perlu membuat akun untuk mendapatkan kunci Anda.
Slack Webhook sedang digunakan.
sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-ip', action='store', dest='simple_ip',
help='Store ip')
parser.add_argument('-host', action='store', dest='simple_host',
help='Store host')
parser.add_argument('-user', action='store', dest='simple_user',
help='Store user')
results = parser.parse_args()
print(results.simple_ip)
print(results.simple_host)
querystring = {
'ipAddress': results.simple_ip,
'maxAgeInDays': '1'
}
# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Accept': 'application/json',
'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring)
# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
"blocks": [
{
"type": "image",
"image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
"alt_text": "inspiration"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Security report : *"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "IP: " + str(decodedResponse["data"]["ipAddress"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Domain: " + str(decodedResponse["data"]["domain"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Last date reported: " + str(decodedResponse["data"]["lastReportedAt"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Abuse confidence: " + str(decodedResponse["data"]["abuseConfidenceScore"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Country: " + str(decodedResponse["data"]["countryCode"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Usage type: " + str(decodedResponse["data"]["usageType"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Total Reports: " + str(decodedResponse["data"]["totalReports"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "ISP: " + str(decodedResponse["data"]["isp"])
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sumary*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH login:* " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH Accessfrom IP* " + str(results.simple_ip)
}
}
]
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
sudo systemctl restart sshd

Karena IP saya bersih, tidak ada laporan yang datang darinya, tetapi dalam situasi nyata ini dapat membantu Anda memahami jika diperlukan respons insiden.