Gotowe do produkcji i niezależne od chmury Alarmy SSH na Slacku

Nov 29 2022
Dzięki prostemu wyszukiwaniu w Google można zobaczyć, że bardzo trudno jest znaleźć dobrej jakości materiały na temat alarmów SSH, proste, ponieważ obecnie ludzie coraz bardziej polegają na chmurze przy wykonywaniu prostych zadań. Dobrym przykładem jest AWS CloudWatch z grupami dzienników, które są świetne, ale zależą od agenta SSM.

Dzięki prostemu wyszukiwaniu w Google można zobaczyć, że bardzo trudno jest znaleźć dobrej jakości materiały na temat alarmów SSH, proste, ponieważ obecnie ludzie coraz bardziej polegają na chmurze przy wykonywaniu prostych zadań.

Dobrym przykładem jest AWS CloudWatch z grupami dzienników, które są świetne, ale zależą od agenta SSM.

Wyobraźmy sobie jednego hosta, do którego nikt nie powinien mieć dostępu, i jego cel o wysokiej wartości, jeśli Twoja firma ma zespół SOC, dostęp do tej maszyny może być monitorowany i ograniczany, ale jeśli nie, może stać się koszmarem

Dlatego ten samouczek ma na celu nauczenie, w jaki sposób można nie tylko otrzymywać alerty z SSH na maszynach w chmurze bez agentów, ale także jak otrzymywać „Wskaźniki kompromisu” IOC w alertach.

Ręka Na Laboratorium

10 minut

  1. Zacznijmy od prostej linii kodu
  2. sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd
    

sudo vim /etc/ssh/scripts/sshnotify.sh

#!/bin/bash

if [ "$PAM_TYPE" != "close_session" ]; then
        host="$(hostname)"
        python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit

sudo chmod +x /etc/ssh/scripts/sshnotify.sh

Zwróć uwagę, że abuseipdb ma darmowe API, musisz założyć konto, aby otrzymać klucz.
Slack Webhook jest używany.

sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse

parser = argparse.ArgumentParser()

parser.add_argument('-ip', action='store', dest='simple_ip',
                    help='Store ip')

parser.add_argument('-host', action='store', dest='simple_host',
                    help='Store host')

parser.add_argument('-user', action='store', dest='simple_user',
                    help='Store user')



results = parser.parse_args()

print(results.simple_ip)

print(results.simple_host)


querystring = {
    'ipAddress': results.simple_ip,
    'maxAgeInDays': '1'
    }

# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'



headers = {
    'Accept': 'application/json',
    'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}





response = requests.request(method='GET', url=url, headers=headers, params=querystring)

# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
 "blocks": [
  {
   "type": "image",
   "image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
   "alt_text": "inspiration"
  },
  {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*Security report : *"
   }
  },
  {
   "type": "divider"
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "IP:  " + str(decodedResponse["data"]["ipAddress"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Domain:  " + str(decodedResponse["data"]["domain"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Last date reported:  " + str(decodedResponse["data"]["lastReportedAt"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Abuse confidence:  " + str(decodedResponse["data"]["abuseConfidenceScore"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Country:  "  + str(decodedResponse["data"]["countryCode"])

   }
  },
  {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Usage type:  " + str(decodedResponse["data"]["usageType"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "Total Reports:  " + str(decodedResponse["data"]["totalReports"])

   }
  },
        {
   "type": "section",
   "text": {
    "type": "plain_text",
    "text": "ISP:  " + str(decodedResponse["data"]["isp"])

   }
  },
        {
   "type": "divider"
  },
        {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*Sumary*"
   }
  },
        {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*SSH login:*  " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
   }
  },
  {
   "type": "section",
   "text": {
    "type": "mrkdwn",
    "text": "*SSH Accessfrom IP*  " + str(results.simple_ip)
   }
  }
 ]
}





response = requests.post(
    webhook_url, data=json.dumps(slack_data),
    headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
    raise ValueError(
        'Request to slack returned an error %s, the response is:\n%s'
        % (response.status_code, response.text)
    )


sudo systemctl restart sshd

Ponieważ moje IP jest czyste, nie pochodzą z niego żadne raporty, ale w rzeczywistej sytuacji może pomóc ci zrozumieć, czy potrzebna jest reakcja na incydenty.