พร้อมสำหรับการผลิตและการเตือน SSH ที่ไม่เชื่อเรื่องพระเจ้าบนคลาวด์บน Slack
ด้วยการค้นหาอย่างง่ายบน google คุณสามารถดูได้ว่าการหาเนื้อหาที่มีคุณภาพดีบน SSH Alarms ทำได้ยากมาก เพราะทุกวันนี้ผู้คนมักจะพึ่งพา Cloud มากขึ้นเรื่อย ๆ เพื่อทำงานง่ายๆ
ตัวอย่างที่ดีคือ AWS CloudWatch ที่มีกลุ่มบันทึกที่ยอดเยี่ยมแต่ต้องพึ่งพาตัวแทน SSM
ลองจินตนาการถึงโฮสต์หนึ่งที่ไม่มีใครควรเข้าถึงและเป้าหมายที่มีมูลค่าสูง หากบริษัทของคุณมีทีม SOC การเข้าถึงเครื่องนี้สามารถตรวจสอบและบรรเทาได้ แต่ถ้าไม่มีก็อาจกลายเป็นฝันร้ายได้
บทช่วยสอนนี้มีจุดประสงค์เพื่อสอนวิธีที่ไม่เพียงแค่รับการแจ้งเตือนจาก SSH ในเครื่องระบบคลาวด์โดยไม่มีตัวแทน แต่ยังรวมถึงวิธีรับ IOC “ตัวบ่งชี้การประนีประนอม” ในการแจ้งเตือนด้วย
แฮนด์ ออน แล็บ
10 นาที
- เริ่มต้นด้วยบรรทัดโค้ดง่ายๆ
sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
sudo vim /etc/ssh/scripts/sshnotify.sh
#!/bin/bash
if [ "$PAM_TYPE" != "close_session" ]; then
host="$(hostname)"
python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit
sudo chmod +x /etc/ssh/scripts/sshnotify.sh
โปรดทราบว่า abuseipdb มี API ฟรี คุณต้องทำบัญชีเพื่อรับรหัสของคุณ
Slack Webhook กำลังใช้งานอยู่
sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-ip', action='store', dest='simple_ip',
help='Store ip')
parser.add_argument('-host', action='store', dest='simple_host',
help='Store host')
parser.add_argument('-user', action='store', dest='simple_user',
help='Store user')
results = parser.parse_args()
print(results.simple_ip)
print(results.simple_host)
querystring = {
'ipAddress': results.simple_ip,
'maxAgeInDays': '1'
}
# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Accept': 'application/json',
'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}
response = requests.request(method='GET', url=url, headers=headers, params=querystring)
# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
"blocks": [
{
"type": "image",
"image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
"alt_text": "inspiration"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Security report : *"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "IP: " + str(decodedResponse["data"]["ipAddress"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Domain: " + str(decodedResponse["data"]["domain"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Last date reported: " + str(decodedResponse["data"]["lastReportedAt"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Abuse confidence: " + str(decodedResponse["data"]["abuseConfidenceScore"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Country: " + str(decodedResponse["data"]["countryCode"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Usage type: " + str(decodedResponse["data"]["usageType"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Total Reports: " + str(decodedResponse["data"]["totalReports"])
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "ISP: " + str(decodedResponse["data"]["isp"])
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sumary*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH login:* " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH Accessfrom IP* " + str(results.simple_ip)
}
}
]
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
sudo systemctl restart sshd
เนื่องจาก IP ของฉันสะอาดไม่มีรายงานใด ๆ เกิดขึ้น แต่ในสถานการณ์จริง มันสามารถช่วยให้คุณเข้าใจว่าการตอบสนองต่อเหตุการณ์นั้นจำเป็นหรือไม่