พร้อมสำหรับการผลิตและการเตือน SSH ที่ไม่เชื่อเรื่องพระเจ้าบนคลาวด์บน Slack

Nov 29 2022
ด้วยการค้นหาอย่างง่ายบน google คุณสามารถดูได้ว่าการหาเนื้อหาที่มีคุณภาพดีบน SSH Alarms ทำได้ยากมาก เพราะทุกวันนี้ผู้คนมักจะพึ่งพา Cloud มากขึ้นเรื่อย ๆ เพื่อทำงานง่ายๆ ตัวอย่างที่ดีคือ AWS CloudWatch ที่มีกลุ่มบันทึกที่ยอดเยี่ยมแต่ต้องพึ่งพาตัวแทน SSM

ด้วยการค้นหาอย่างง่ายบน google คุณสามารถดูได้ว่าการหาเนื้อหาที่มีคุณภาพดีบน SSH Alarms ทำได้ยากมาก เพราะทุกวันนี้ผู้คนมักจะพึ่งพา Cloud มากขึ้นเรื่อย ๆ เพื่อทำงานง่ายๆ

ตัวอย่างที่ดีคือ AWS CloudWatch ที่มีกลุ่มบันทึกที่ยอดเยี่ยมแต่ต้องพึ่งพาตัวแทน SSM

ลองจินตนาการถึงโฮสต์หนึ่งที่ไม่มีใครควรเข้าถึงและเป้าหมายที่มีมูลค่าสูง หากบริษัทของคุณมีทีม SOC การเข้าถึงเครื่องนี้สามารถตรวจสอบและบรรเทาได้ แต่ถ้าไม่มีก็อาจกลายเป็นฝันร้ายได้

บทช่วยสอนนี้มีจุดประสงค์เพื่อสอนวิธีที่ไม่เพียงแค่รับการแจ้งเตือนจาก SSH ในเครื่องระบบคลาวด์โดยไม่มีตัวแทน แต่ยังรวมถึงวิธีรับ IOC “ตัวบ่งชี้การประนีประนอม” ในการแจ้งเตือนด้วย

แฮนด์ ออน แล็บ

10 นาที

  1. เริ่มต้นด้วยบรรทัดโค้ดง่ายๆ
  2. sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
    

sudo vim /etc/ssh/scripts/sshnotify.sh

#!/bin/bash

if [ "$PAM_TYPE" != "close_session" ]; then
        host="$(hostname)"
        python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit

sudo chmod +x /etc/ssh/scripts/sshnotify.sh

โปรดทราบว่า abuseipdb มี API ฟรี คุณต้องทำบัญชีเพื่อรับรหัสของคุณ
Slack Webhook กำลังใช้งานอยู่

sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse

parser = argparse.ArgumentParser()

parser.add_argument('-ip', action='store', dest='simple_ip',
                    help='Store ip')

parser.add_argument('-host', action='store', dest='simple_host',
                    help='Store host')

parser.add_argument('-user', action='store', dest='simple_user',
                    help='Store user')



results = parser.parse_args()

print(results.simple_ip)

print(results.simple_host)


querystring = {
    'ipAddress': results.simple_ip,
    'maxAgeInDays': '1'
    }

# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'



headers = {
    'Accept': 'application/json',
    'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}





response = requests.request(method='GET', url=url, headers=headers, params=querystring)

# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
 "blocks": [
  {
   "type": "image",
   "image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
   "alt_text": "inspiration"
  },
  {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*Security report : *"
   }
  },
  {
   "type": "divider"
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "IP:  " + str(decodedResponse["data"]["ipAddress"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Domain:  " + str(decodedResponse["data"]["domain"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Last date reported:  " + str(decodedResponse["data"]["lastReportedAt"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Abuse confidence:  " + str(decodedResponse["data"]["abuseConfidenceScore"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Country:  "  + str(decodedResponse["data"]["countryCode"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Usage type:  " + str(decodedResponse["data"]["usageType"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Total Reports:  " + str(decodedResponse["data"]["totalReports"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "ISP:  " + str(decodedResponse["data"]["isp"])

   }
  },
        {
   "type": "divider"
  },
        {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*Sumary*"
   }
  },
        {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*SSH login:*  " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
   }
  },
  {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*SSH Accessfrom IP*  " + str(results.simple_ip)
   }
  }
 ]
}





response = requests.post(
    webhook_url, data=json.dumps(slack_data),
    headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
    raise ValueError(
        'Request to slack returned an error %s, the response is:\n%s'
        % (response.status_code, response.text)
    )


sudo systemctl restart sshd

เนื่องจาก IP ของฉันสะอาดไม่มีรายงานใด ๆ เกิดขึ้น แต่ในสถานการณ์จริง มันสามารถช่วยให้คุณเข้าใจว่าการตอบสนองต่อเหตุการณ์นั้นจำเป็นหรือไม่