Files
yakpanel-core/class_v2/btdockerModelV2/securityModel.py
2026-04-07 02:04:22 +05:30

1276 lines
73 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# coding: utf-8
# -------------------------------------------------------------------
# YakPanel
# -------------------------------------------------------------------
# Copyright (c) 2015-2099 YakPanel(www.yakpanel.com) All rights reserved.
# -------------------------------------------------------------------
# Author: lwh <lwh@yakpanel.com>
# -------------------------------------------------------------------
import fnmatch
import os, sys, time
import json
import subprocess
from _stat import S_ISREG, S_ISDIR, S_ISLNK
from gevent.lock import BoundedSemaphore
import contextlib
# ------------------------------
# Docker安全检测
# ------------------------------
BASE_PATH = "/www/server/panel"
os.chdir(BASE_PATH)
sys.path.insert(0, "class/")
import public, re
from btdockerModelV2.dockerBase import dockerBase
from public.validate import Param
def auto_progress(func):
"""
@name 自动增长进度条(装饰器)
@author lwh<2024-1-23>
"""
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
self.progress_percent += self.scan_percent
return result
return wrapper
class DockerImageInspector:
def __init__(self):
self.image_info = None
self.image_id = None
self._container_lock = BoundedSemaphore(1)
self._active_containers = set()
def open_image_by_id(self, image_id):
"""打开指定ID的镜像"""
self.image_id = image_id
return self
def get_image_info(self):
"""获取镜像详细信息"""
try:
cmd = f"docker inspect {self.image_id}"
result = subprocess.run(cmd.split(), capture_output=True, text=True)
# public.print_log("|===========result:{}".format(result))
if result.returncode == 0:
self.image_info = json.loads(result.stdout)[0]
return self.image_info
return None
except Exception as e:
print(f"获取镜像信息失败: {str(e)}")
return None
def ocispec_v1(self):
"""
获取镜像的OCI规范信息
返回包含配置和历史记录的字典
"""
if not self.image_info:
self.get_image_info()
if not self.image_info:
return {}
try:
# 获取配置信息
config = self.image_info.get('Config', {})
# 构造OCI规范格式
oci_spec = {
'created': self.image_info.get('Created'),
'architecture': self.image_info.get('Platform', 'amd64'), # 默认amd64
'os': self.image_info.get('Platform', 'linux'), # 默认linux
'config': {
'Env': config.get('Env', []),
'Cmd': config.get('Cmd', []),
'WorkingDir': config.get('WorkingDir', ''),
'Entrypoint': config.get('Entrypoint', []),
'ExposedPorts': config.get('ExposedPorts', {}),
'Volumes': config.get('Volumes', {}),
},
'history': []
}
# 使用docker history命令获取历史记录
try:
cmd = f"docker history --no-trunc --format '{{{{.CreatedBy}}}}' {self.image_id}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
history_lines = result.stdout.strip().split('\n')
for line in history_lines:
if line: # 跳过空行
history_entry = {
'created': '', # 历史记录中可能没有具体时间
'created_by': line,
'empty_layer': line.startswith('#(nop)') # 判断是否为空层
}
oci_spec['history'].append(history_entry)
except Exception as e:
print(f"获取历史记录失败: {str(e)}")
return oci_spec
except Exception as e:
print(f"获取OCI规范信息失败: {str(e)}")
return {}
def reporefs(self):
"""获取镜像的仓库引用名称列表"""
if not self.image_info:
self.get_image_info()
refs = []
if self.image_info and 'RepoTags' in self.image_info:
refs.extend(self.image_info['RepoTags'])
return refs
def id(self):
"""获取镜像ID"""
return self.image_id
@contextlib.contextmanager
def _temp_container(self):
"""创建临时容器的上下文管理器"""
container_id = None
try:
with self._container_lock:
# 创建临时容器
cmd = f"docker create {self.image_id}"
result = subprocess.run(cmd.split(), capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Failing to create a temporary container: {result.stderr}")
container_id = result.stdout.strip()
self._active_containers.add(container_id)
yield container_id
finally:
if container_id:
with self._container_lock:
try:
# 清理临时容器
subprocess.run(['docker', 'rm', container_id],
capture_output=True,
check=True)
self._active_containers.remove(container_id)
except Exception as e:
# public.print_log(f"清理临时容器失败 {container_id}: {str(e)}")
pass
def listdir(self, path):
"""列出指定路径下的所有文件和目录"""
try:
with self._temp_container() as container_id:
cmd = f"docker export {container_id} | tar -t"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
files = set()
for line in result.stdout.splitlines():
line = line.strip().strip('/')
if not line:
continue
rel_path = os.path.relpath(line, path.strip('/'))
if rel_path.startswith('..'):
continue
parts = rel_path.split('/')
if len(parts) == 1:
files.add(parts[0])
return list(files)
except Exception as e:
return []
def walk(self, top):
"""遍历目录树"""
try:
with self._temp_container() as container_id:
cmd = f"docker export {container_id} | tar -t"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
dir_tree = {}
for line in result.stdout.splitlines():
line = line.strip().strip('/')
if not line or not line.startswith(top.strip('/')):
continue
parts = line.split('/')
current_path = ''
for i, part in enumerate(parts):
parent_path = current_path
current_path = os.path.join(current_path, part) if current_path else part
if current_path not in dir_tree:
dir_tree[current_path] = {'dirs': set(), 'files': set()}
if i < len(parts) - 1:
dir_tree[parent_path]['dirs'].add(part)
else:
dir_tree[parent_path]['files'].add(part)
for dirpath in sorted(dir_tree.keys()):
if not dirpath.startswith(top.strip('/')):
continue
full_path = '/' + dirpath
dirs = sorted(dir_tree[dirpath]['dirs'])
files = sorted(dir_tree[dirpath]['files'])
yield full_path, dirs, files
except Exception as e:
yield top, [], []
def stat(self, path):
"""获取文件状态信息"""
try:
with self._temp_container() as container_id:
cmd = f"docker export {container_id} | tar -tvf - {path.lstrip('/')}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
raise Exception("The file does not exist")
info = result.stdout.strip().split(None, 5)[0:5]
mode, uid, gid, size = info[0], info[2], info[3], info[4]
class StatResult:
def __init__(self, mode, size, uid, gid):
self.st_mode = int(mode, 8)
self.st_size = int(size)
self.st_uid = int(uid)
self.st_gid = int(gid)
return StatResult(mode, size, uid, gid)
except Exception as e:
raise
def open(self, path, mode="rb"):
"""打开文件"""
try:
with self._temp_container() as container_id:
cmd = f"docker export {container_id} | tar -xOf - {path.lstrip('/')}"
result = subprocess.run(cmd, shell=True, capture_output=True)
if result.returncode != 0:
raise Exception("Unable to read the file")
class FileWrapper:
def __init__(self, data):
self.data = data
self.position = 0
def read(self):
return self.data
def close(self):
pass
return FileWrapper(result.stdout)
except Exception as e:
raise
def cleanup(self):
"""清理所有活动的临时容器"""
with self._container_lock:
for container_id in list(self._active_containers):
try:
subprocess.run(['docker', 'rm', container_id],
capture_output=True,
check=True)
self._active_containers.remove(container_id)
except Exception as e:
# public.print_log(f"Cleaning up temporary containers failed {container_id}: {str(e)}")
pass
class main(dockerBase):
progress_percent = 0 # 扫描进度条
ids_percent = 0
scan_percent = 0
scan_score = 100 # 扫描分数
progress_content = "Initializing scan..." # 扫描内容
sys_ver = "" # 系统类型
requirements_list = ["veinmind", "veinmind-common"]
docker_obj = "" # docker对象
image_name = "" # 镜像名称
send_time = "" # 记录发送时间
start_time = "" # 开始时间
def short_string(self, text):
"""
@name 缩短字符串为40个字符
"""
if len(text) <= 40:
return text
else:
return text[:37] + "..."
def short_string1(self, text):
"""
@name 缩短字符串中间部分保留前后
"""
if len(text) <= 30:
return text
else:
return text[:15] + "..." + text[-15:]
def get_image_list(self, get):
"""
@name 获取镜像id列表
@author lwh<2024-1-23>
@return
"""
# public.print_log("Get the image id list")
if not hasattr(get, "_ws"):
return True
from btdockerModelV2 import imageModel
image_list = imageModel.main().image_list(get=public.dict_obj())['message']
# public.print_log(image_list)
get._ws.send(public.GetJson({"end": True, "image_list": image_list}))
def send_image_ws(self, get, msg, detail="", repair="", status=1, end=False):
"""
@name 发送ws信息
@author lwh<2024-01-23>
@param msg string 扫描内容
@param status int 风险情况1无风险2告警3危险
@param repair string 修复方案
@param end bool 是否结束
"""
now_time = time.time()
# 判断间隔时间是否小于100ms
if now_time-self.send_time <= 0.1 and not end and status == 1:
return
self.send_time = now_time
# 根据风险情况进行扣分
if status == 2:
score = 2
elif status == 3:
score = 5
else:
score = 0
# msg扫描内容score当前分数type类型docker/image
get._ws.send(public.GetJson({"end": end, "image_name": self.image_name, "status": status, "detail": detail,
"msg": msg, "repair": repair, "score": score, "type": "image"}))
def send_docker_ws(self, get, msg, repair="", status=1, end=True):
"""
@name 发送ws信息
@author lwh<2024-01-23>
@param msg string 扫描内容
@param status int 风险情况1无风险2告警3危险
@param repair string 修复方案
@param end bool 是否结束
"""
# 根据风险情况进行扣分
if status == 2:
score = 2
elif status == 3:
score = 5
else:
score = 0
# msg扫描内容progress扫描进度score当前分数
get._ws.send(public.GetJson({"end": end, "image_name": self.image_name, "status": status, "msg": msg, "repair": repair, "score": score, "type": "docker"}))
def reduce_core(self, score):
"""
@name 减少总分
@author lwh<2024-01-23>
@param score int 需要减少的分数
@return self.scan_score int 所剩分数
"""
self.scan_score -= score
if self.scan_score < 0:
return 0
return self.scan_score
def fix_libdl_dependency(self):
"""修复libdl.so依赖"""
try:
# 检查系统中的libdl文件
result, _ = public.ExecShell("find /usr/lib /usr/lib64 /lib /lib64 -name 'libdl.so*' 2>/dev/null")
libdl_files = [x for x in result.strip().split('\n') if x]
if not libdl_files:
# 如果没有找到任何libdl.so文件需要安装
sys_ver = public.get_os_version()
if "Ubuntu" in sys_ver or "Debian" in sys_ver:
public.ExecShell("apt-get update && apt-get install -y libc6-dev")
elif "CentOS" in sys_ver:
public.ExecShell("yum install -y glibc-devel")
# 重新检查
result, _ = public.ExecShell("find /usr/lib /usr/lib64 /lib /lib64 -name 'libdl.so*' 2>/dev/null")
libdl_files = [x for x in result.strip().split('\n') if x]
if not libdl_files:
return False
# 找到libdl.so.2文件
libdl_so2 = None
for f in libdl_files:
if 'libdl.so.2' in f:
libdl_so2 = f
break
if not libdl_so2:
return False
# 创建软链接
target_dirs = ['/usr/lib', '/usr/lib64', '/lib', '/lib64']
for dir_path in target_dirs:
if os.path.exists(dir_path):
link_path = os.path.join(dir_path, 'libdl.so')
if not os.path.exists(link_path):
public.ExecShell(f"ln -sf {libdl_so2} {link_path}")
return True
except Exception as e:
return False
def image_safe_scan(self, get):
"""
@name 镜像安全扫描入口函数
@author lwh@yakpanel.com
@time 2024-01-22
@param _ws
@return 返回服务器扫描项
"""
public.set_module_logs('docker', 'image_safe_scan', 1)
if not hasattr(get, "_ws"):
return True
if not hasattr(get, "image_id"):
return True
# 获取检测镜像
image_id = get.image_id
# 初始化时间
self.send_time = time.time()
try:
# 自定义一个新的DockerImageInspector
docker_obj = DockerImageInspector()
# 打开镜像
image = docker_obj.open_image_by_id(image_id=image_id)
# 获取ref镜像名
refs = image.reporefs()
if len(refs) > 0:
self.image_name = refs[0]
else:
self.image_name = image.id()
public.print_log("Detecting:{}".format(self.image_name))
self.send_image_ws(get, msg=public.lang("Scanning {} exception history command", self.image_name))
self.scan_history(get, image)
self.send_image_ws(get, msg=public.lang("Scanning {} sensitive information", self.image_name))
self.scan_sensitive(get, image)
self.send_image_ws(get, msg=public.lang("Scanning {} backdoor", self.image_name))
self.scan_backdoor(get, image)
self.send_image_ws(get, msg=public.lang("Scanning {} container escapes"))
self.scan_escape(get, image)
self.send_image_ws(get, msg=public.lang("{}Scan completed", self.image_name), end=True)
except Exception as e:
self.send_image_ws(get, msg=public.lang("Scanning the image failed:{}", str(e)), end=True)
def scan_history(self, get, image):
"""
@name 异常历史命令
@author lwh@yakpanel.com
@time 2024-01-22
"""
instruct_set = (
"FROM", "CMD", "RUN", "LABEL", "MAINTAINER", "EXPOSE", "ENV", "ADD", "COPY", "ENTRYPOINT", "VOLUME", "USER",
"WORKDIR", "ARG", "ONBUILD", "STOPSIGNAL", "HEALTHCHECK", "SHELL")
rules = {
"rules": [{"description": "Miner Repo", "instruct": "RUN", "match": ".*(xmrig|ethminer|miner)\\.git.*"},
{"description": "Unsafe Path", "instruct": "ENV", "match": "PATH=.*(|:)(/tmp|/dev/shm)"},
{
"description": "MySQL Shell Installation",
"instruct": "CMD"
}
]
}
ocispec = image.ocispec_v1()
if 'history' in ocispec.keys() and len(ocispec['history']) > 0:
for history in ocispec['history']:
if 'created_by' in history.keys():
created_by = history['created_by']
created_by_split = created_by.split("#(nop)")
if len(created_by_split) > 1:
command = "#(nop)".join(created_by_split[1:])
command = command.lstrip()
command_split = command.split()
if len(command_split) == 2:
instruct = command_split[0]
command_content = command_split[1]
# 如果是JSON格式的字符串尝试解析
if command_content.startswith('[') and command_content.endswith(']'):
import json
parsed_content = json.loads(command_content)
if isinstance(parsed_content, list):
command_content = ' '.join(parsed_content)
# 移除引号和方括号
command_content = command_content.strip('[]"\' ')
# command_content = " ".join(command_split[1:])
for r in rules["rules"]:
if r["instruct"] == instruct:
if re.match(r["match"], command_content):
self.send_image_ws(get,status=0, msg=public.lang("Suspicious abnormal history command found"), detail=public.lang("It was found that the image has an abnormal historical command [{}], which may implant malware or code into the host system when the container is running, causing security risks.",self.short_string(command_content)), repair=public.lang("1.It is recommended to check whether the command is required for normal business<br/>2.It is recommended to choose official and reliable infrastructure to avoid unnecessary losses."))
break
else:
instruct = command_split[0]
command_content = " ".join(command_split[1:])
for r in rules["rules"]:
if r["instruct"] == instruct:
if re.match(r["match"], command_content):
self.send_image_ws(get,status=0, msg=public.lang("Suspicious abnormal history command found"), detail=public.lang("It was found that the image has an abnormal historical command [{}], which may implant malware or code into the host system when the container is running, causing security risks.",self.short_string(command_content)), repair=public.lang("1.It is recommended to check whether the command is required for normal business<br/>2.It is recommended to choose official and reliable infrastructure to avoid unnecessary losses."))
break
else:
command_split = created_by.split()
if command_split[0] in instruct_set:
for r in rules["rules"]:
if r["instruct"] == command_split[0]:
if re.match(r["match"], " ".join(command_split[1:])):
self.send_image_ws(get,status=0, msg=public.lang("Suspicious abnormal history command found"), detail=public.lang("It was found that the image has an abnormal historical command [{}], which may implant malware or code into the host system when the container is running, causing security risks.",self.short_string(" ".join(command_split[1:]))), repair=public.lang("1.It is recommended to check whether the command is required for normal business<br/>2.It is recommended to choose official and reliable infrastructure to avoid unnecessary losses."))
break
else:
for r in rules["rules"]:
if r["instruct"] == "RUN":
if re.match(r["match"], created_by):
self.send_image_ws(get,status=0, msg=public.lang("Suspicious abnormal history command found"), detail=public.lang("It was found that the image has an abnormal historical command [{}], which may implant malware or code into the host system when the container is running, causing security risks.",self.short_string(created_by)), repair=public.lang("1.It is recommended to check whether the command is required for normal business<br/>2.It is recommended to choose official and reliable infrastructure to avoid unnecessary losses."))
break
def scan_sensitive(self, get, image):
"""
@name 扫描镜像敏感数据
@author lwh<2024-1-22>
"""
# 敏感数据规则
rules = {"whitelist": {
"paths": ["/usr/**", "/lib/**", "/lib32/**", "/bin/**", "/sbin/**", "/var/lib/**", "/var/log/**",
"**/node_modules/**/*.md", "**/node_modules/**/test/**", "**/service/iam/examples_test.go",
"**/grafana/public/build/*.js"]}, "rules": [
{"id": 1, "name": "gitlab_personal_access_token", "description": "GitLab Personal Access Token",
"match": "glpat-[0-9a-zA-Z_\\-]{20}", "level": "high"},
{"id": 2, "name": "AWS", "description": "AWS Access Token", "match": "AKIA[0-9A-Z]{16}", "level": "high"},
{"id": 3, "name": "PKCS8 private key", "description": "PKCS8 private key",
"match": "-----BEGIN PRIVATE KEY-----", "level": "high"},
{"id": 4, "name": "RSA private key", "description": "RSA private key",
"match": "-----BEGIN RSA PRIVATE KEY-----", "level": "high"},
{"id": 5, "name": "SSH private key", "description": "SSH private key",
"match": "-----BEGIN OPENSSH PRIVATE KEY-----", "level": "high"},
{"id": 6, "name": "PGP private key", "description": "PGP private key",
"match": "-----BEGIN PGP PRIVATE KEY BLOCK-----", "level": "high"},
{"id": 7, "name": "Github Personal Access Token", "description": "Github Personal Access Token",
"match": "ghp_[0-9a-zA-Z]{36}", "level": "high"},
{"id": 8, "name": "Github OAuth Access Token", "description": "Github OAuth Access Token",
"match": "gho_[0-9a-zA-Z]{36}", "level": "high"},
{"id": 9, "name": "SSH (DSA) private key", "description": "SSH (DSA) private key",
"match": "-----BEGIN DSA PRIVATE KEY-----", "level": "high"},
{"id": 10, "name": "SSH (EC) private key", "description": "SSH (EC) private key",
"match": "-----BEGIN EC PRIVATE KEY-----", "level": "high"},
{"id": 11, "name": "Github App Token", "description": "Github App Token",
"match": "(ghu|ghs)_[0-9a-zA-Z]{36}", "level": "high"},
{"id": 12, "name": "Github Refresh Token", "description": "Github Refresh Token",
"match": "ghr_[0-9a-zA-Z]{76}", "level": "high"},
{"id": 13, "name": "Shopify shared secret", "description": "Shopify shared secret",
"match": "shpss_[a-fA-F0-9]{32}", "level": "high"},
{"id": 14, "name": "Shopify access token", "description": "Shopify access token",
"match": "shpat_[a-fA-F0-9]{32}", "level": "high"},
{"id": 15, "name": "Shopify custom app access token", "description": "Shopify custom app access token",
"match": "shpca_[a-fA-F0-9]{32}", "level": "high"},
{"id": 16, "name": "Shopify private app access token", "description": "Shopify private app access token",
"match": "shppa_[a-fA-F0-9]{32}", "level": "high"},
{"id": 17, "name": "Slack token", "description": "Slack token", "match": "xox[baprs]-([0-9a-zA-Z]{10,48})?",
"level": "high"},
{"id": 18, "name": "Stripe", "description": "Stripe", "match": "(?i)(sk|pk)_(test|live)_[0-9a-z]{10,32}",
"level": "high"}, {"id": 19, "name": "PyPI upload token", "description": "PyPI upload token",
"match": "pypi-AgEIcHlwaS5vcmc[A-Za-z0-9-_]{50,1000}", "level": "high"},
{"id": 20, "name": "Google (GCP) Service-account", "description": "Google (GCP) Service-account",
"match": "\\\"type\\\": \\\"service_account\\\"", "level": "medium"},
{"id": 21, "name": "Password in URL", "description": "Password in URL",
"match": "[a-zA-Z]{3,10}:\\/\\/[^$][^:@\\/\\n]{3,20}:[^$][^:@\\n\\/]{3,40}@.{1,100}", "level": "high"},
{"id": 22, "name": "Heroku API Key", "description": "Heroku API Key",
"match": "(?i)(?:heroku)(?:[0-9a-z\\-_\\t .]{0,20})(?:[\\s|']|[\\s|\"]){0,3}(?:=|>|:=|\\|\\|:|<=|=>|:)(?:'|\\\"|\\s|=|\\x60){0,5}([0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12})(?:['|\\\"|\\n|\\r|\\s|\\x60]|$)",
"level": "high"}, {"id": 23, "name": "Slack Webhook", "description": "Slack Webhook",
"match": "https://hooks.slack.com/services/T[a-zA-Z0-9_]{8}/B[a-zA-Z0-9_]{8,12}/[a-zA-Z0-9_]{24}",
"level": "medium"},
{"id": 24, "name": "Twilio API Key", "description": "Twilio API Key", "match": "SK[0-9a-fA-F]{32}",
"level": "high"}, {"id": 25, "name": "Age secret key", "description": "Age secret key",
"match": "AGE-SECRET-KEY-1[QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L]{58}", "level": "high"},
{"id": 26, "name": "Facebook token", "description": "Facebook token",
"match": "(?i)(facebook[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{32})['\\\"]",
"level": "high"}, {"id": 27, "name": "Twitter token", "description": "Twitter token",
"match": "(?i)(twitter[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{35,44})['\\\"]",
"level": "high"},
{"id": 28, "name": "Adobe Client ID (Oauth Web)", "description": "Adobe Client ID (Oauth Web)",
"match": "(?i)(adobe[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{32})['\\\"]",
"level": "medium"}, {"id": 29, "name": "Adobe Client Secret", "description": "Adobe Client Secret",
"match": "(p8e-)(?i)[a-z0-9]{32}", "level": "high"},
{"id": 30, "name": "Alibaba AccessKey ID", "description": "Alibaba AccessKey ID",
"match": "(LTAI5t)(?i)[a-z0-9]{18}", "level": "medium", "lock": True},
{"id": 31, "name": "Alibaba Secret Key", "description": "Alibaba Secret Key",
"match": "(?i)(alibaba[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{30})['\\\"]",
"level": "high"}, {"id": 32, "name": "Asana Client ID", "description": "Asana Client ID",
"match": "(?i)(asana[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([0-9]{16})['\\\"]",
"level": "medium"},
{"id": 33, "name": "Asana Client Secret", "description": "Asana Client Secret",
"match": "(?i)(asana[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{32})['\\\"]",
"level": "high"}, {"id": 34, "name": "Atlassian API token", "description": "Atlassian API token",
"match": "(?i)(atlassian[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{24})['\\\"]",
"level": "high"},
{"id": 35, "name": "Bitbucket client ID", "description": "Bitbucket client ID",
"match": "(?i)(bitbucket[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{32})['\\\"]",
"level": "medium"}, {"id": 36, "name": "Bitbucket client secret", "description": "Bitbucket client secret",
"match": "(?i)(bitbucket[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9_\\-]{64})['\\\"]",
"level": "high"},
{"id": 37, "name": "Beamer API token", "description": "Beamer API token",
"match": "(?i)(beamer[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"](b_[a-z0-9=_\\-]{44})['\\\"]",
"level": "high"}, {"id": 38, "name": "Clojars API token", "description": "Clojars API token",
"match": "(CLOJARS_)(?i)[a-z0-9]{60}", "level": "high"},
{"id": 39, "name": "Contentful delivery API token", "description": "Contentful delivery API token",
"match": "(?i)(contentful[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9\\-=_]{43})['\\\"]",
"level": "high"},
{"id": 40, "name": "Contentful preview API token", "description": "Contentful preview API token",
"match": "(?i)(contentful[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9\\-=_]{43})['\\\"]",
"level": "high"}, {"id": 41, "name": "Databricks API token", "description": "Databricks API token",
"match": "dapi[a-h0-9]{32}", "level": "high"},
{"id": 42, "name": "Discord API key", "description": "Discord API key",
"match": "(?i)(discord[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-h0-9]{64})['\\\"]",
"level": "high"}, {"id": 43, "name": "Discord client ID", "description": "Discord client ID",
"match": "(?i)(discord[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([0-9]{18})['\\\"]",
"level": "medium"},
{"id": 44, "name": "Discord client secret", "description": "Discord client secret",
"match": "(?i)(discord[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9=_\\-]{32})['\\\"]",
"level": "high"}, {"id": 45, "name": "Doppler API token", "description": "Doppler API token",
"match": "['\\\"](dp\\.pt\\.)(?i)[a-z0-9]{43}['\\\"]", "level": "high"},
{"id": 46, "name": "Dropbox API secret/key", "description": "Dropbox API secret/key",
"match": "(?i)(dropbox[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{15})['\\\"]",
"level": "high"},
{"id": 47, "name": "Dropbox short lived API token", "description": "Dropbox short lived API token",
"match": "(?i)(dropbox[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"](sl\\.[a-z0-9\\-=_]{135})['\\\"]",
"level": "high"},
{"id": 48, "name": "Dropbox long lived API token", "description": "Dropbox long lived API token",
"match": "(?i)(dropbox[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"][a-z0-9]{11}(AAAAAAAAAA)[a-z0-9\\-_=]{43}['\\\"]",
"level": "high"}, {"id": 49, "name": "Duffel API token", "description": "Duffel API token",
"match": "['\\\"]duffel_(test|live)_(?i)[a-z0-9_-]{43}['\\\"]", "level": "high"},
{"id": 50, "name": "Dynatrace API token", "description": "Dynatrace API token",
"match": "['\\\"]dt0c01\\.(?i)[a-z0-9]{24}\\.[a-z0-9]{64}['\\\"]", "level": "high"},
{"id": 51, "name": "EasyPost API token", "description": "EasyPost API token",
"match": "['\\\"]EZAK(?i)[a-z0-9]{54}['\\\"]", "level": "high"},
{"id": 52, "name": "EasyPost test API token", "description": "EasyPost test API token",
"match": "['\\\"]EZTK(?i)[a-z0-9]{54}['\\\"]", "level": "high"},
{"id": 53, "name": "Fastly API token", "description": "Fastly API token",
"match": "(?i)(fastly[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9\\-=_]{32})['\\\"]",
"level": "high"}, {"id": 54, "name": "Finicity client secret", "description": "Finicity client secret",
"match": "(?i)(finicity[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{20})['\\\"]",
"level": "high"},
{"id": 55, "name": "Finicity API token", "description": "Finicity API token",
"match": "(?i)(finicity[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{32})['\\\"]",
"level": "high"}, {"id": 56, "name": "Flutterweave public key", "description": "Flutterweave public key",
"match": "(?i)FLWPUBK_TEST-[a-h0-9]{32}-X", "level": "medium"},
{"id": 57, "name": "Flutterweave secret key", "description": "Flutterweave secret key",
"match": "(?i)FLWSECK_TEST-[a-h0-9]{32}-X", "level": "high"},
{"id": 58, "name": "Flutterweave encrypted key", "description": "Flutterweave encrypted key",
"match": "FLWSECK_TEST[a-h0-9]{12}", "level": "high"},
{"id": 59, "name": "Frame.io API token", "description": "Frame.io API token",
"match": "fio-u-(?i)[a-z0-9-_=]{64}", "level": "high"},
{"id": 60, "name": "GoCardless API token", "description": "GoCardless API token",
"match": "['\\\"]live_(?i)[a-z0-9-_=]{40}['\\\"]", "level": "high"},
{"id": 61, "name": "Grafana API token", "description": "Grafana API token",
"match": "['\\\"]eyJrIjoi(?i)[a-z0-9-_=]{72,92}['\\\"]", "level": "high"},
{"id": 62, "name": "Hashicorp Terraform user/org API token",
"description": "Hashicorp Terraform user/org API token",
"match": "['\\\"](?i)[a-z0-9]{14}\\.atlasv1\\.[a-z0-9-_=]{60,70}['\\\"]", "level": "high"},
{"id": 63, "name": "Hashicorp Vault batch token", "description": "Hashicorp Vault batch token",
"match": "b\\.AAAAAQ[0-9a-zA-Z_-]{156}", "level": "high"},
{"id": 64, "name": "Hubspot API token", "description": "Hubspot API token",
"match": "(?i)(hubspot[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-h0-9]{8}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{12})['\\\"]",
"level": "high"}, {"id": 65, "name": "Intercom API token", "description": "Intercom API token",
"match": "(?i)(intercom[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9=_]{60})['\\\"]",
"level": "high"},
{"id": 66, "name": "Intercom client secret/ID", "description": "Intercom client secret/ID",
"match": "(?i)(intercom[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-h0-9]{8}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{12})['\\\"]",
"level": "high"},
{"id": 67, "name": "Ionic API token", "description": "Ionic API token", "match": "ion_(?i)[a-z0-9]{42}",
"level": "high"}, {"id": 68, "name": "Linear API token", "description": "Linear API token",
"match": "lin_api_(?i)[a-z0-9]{40}", "level": "high"},
{"id": 69, "name": "Linear client secret/ID", "description": "Linear client secret/ID",
"match": "(?i)(linear[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{32})['\\\"]",
"level": "high"}, {"id": 70, "name": "Lob API Key", "description": "Lob API Key",
"match": "(?i)(lob[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]((live|test)_[a-f0-9]{35})['\\\"]",
"level": "high"},
{"id": 71, "name": "Lob Publishable API Key", "description": "Lob Publishable API Key",
"match": "(?i)(lob[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]((test|live)_pub_[a-f0-9]{31})['\\\"]",
"level": "high"}, {"id": 72, "name": "Mailchimp API key", "description": "Mailchimp API key",
"match": "(?i)(mailchimp[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-f0-9]{32}-us20)['\\\"]",
"level": "high"},
{"id": 73, "name": "Mailgun private API token", "description": "Mailgun private API token",
"match": "(?i)(mailgun[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"](key-[a-f0-9]{32})['\\\"]",
"level": "high"},
{"id": 74, "name": "Mailgun public validation key", "description": "Mailgun public validation key",
"match": "(?i)(mailgun[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"](pubkey-[a-f0-9]{32})['\\\"]",
"level": "high"},
{"id": 75, "name": "Mailgun webhook signing key", "description": "Mailgun webhook signing key",
"match": "(?i)(mailgun[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-h0-9]{32}-[a-h0-9]{8}-[a-h0-9]{8})['\\\"]",
"level": "high"}, {"id": 76, "name": "Mapbox API token", "description": "Mapbox API token",
"match": "(?i)(pk\\.[a-z0-9]{60}\\.[a-z0-9]{22})", "level": "high"},
{"id": 77, "name": "messagebird-api-token", "description": "MessageBird API token",
"match": "(?i)(messagebird[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{25})['\\\"]",
"level": "high"},
{"id": 78, "name": "MessageBird API client ID", "description": "MessageBird API client ID",
"match": "(?i)(messagebird[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-h0-9]{8}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{4}-[a-h0-9]{12})['\\\"]",
"level": "medium"}, {"id": 79, "name": "New Relic user API Key", "description": "New Relic user API Key",
"match": "['\\\"](NRAK-[A-Z0-9]{27})['\\\"]", "level": "high"},
{"id": 80, "name": "New Relic user API ID", "description": "New Relic user API ID",
"match": "(?i)(newrelic[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([A-Z0-9]{64})['\\\"]",
"level": "medium"}, {"id": 81, "name": "New Relic ingest browser API token",
"description": "New Relic ingest browser API token",
"match": "['\\\"](NRJS-[a-f0-9]{19})['\\\"]", "level": "high"},
{"id": 82, "name": "npm access token", "description": "npm access token",
"match": "['\\\"](npm_(?i)[a-z0-9]{36})['\\\"]", "level": "high"},
{"id": 83, "name": "Planetscale password", "description": "Planetscale password",
"match": "pscale_pw_(?i)[a-z0-9\\-_\\.]{43}", "level": "high"},
{"id": 84, "name": "Planetscale API token", "description": "Planetscale API token",
"match": "pscale_tkn_(?i)[a-z0-9\\-_\\.]{43}", "level": "high"},
{"id": 85, "name": "Postman API token", "description": "Postman API token",
"match": "PMAK-(?i)[a-f0-9]{24}\\-[a-f0-9]{34}", "level": "high"},
{"id": 86, "name": "Pulumi API token", "description": "Pulumi API token", "match": "pul-[a-f0-9]{40}",
"level": "high"}, {"id": 87, "name": "Rubygem API token", "description": "Rubygem API token",
"match": "rubygems_[a-f0-9]{48}", "level": "high"},
{"id": 88, "name": "Sendgrid API token", "description": "Sendgrid API token",
"match": "SG\\.(?i)[a-z0-9_\\-\\.]{66}", "level": "high"},
{"id": 89, "name": "Sendinblue API token", "description": "Sendinblue API token",
"match": "xkeysib-[a-f0-9]{64}\\-(?i)[a-z0-9]{16}", "level": "high"},
{"id": 90, "name": "Shippo API token", "description": "Shippo API token",
"match": "shippo_(live|test)_[a-f0-9]{40}", "level": "high"},
{"id": 91, "name": "Linkedin Client secret", "description": "Linkedin Client secret",
"match": "(?i)(linkedin[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z]{16})['\\\"]",
"level": "high"}, {"id": 92, "name": "Linkedin Client ID", "description": "Linkedin Client ID",
"match": "(?i)(linkedin[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{14})['\\\"]",
"level": "medium"},
{"id": 93, "name": "Twitch API token", "description": "Twitch API token",
"match": "(?i)(twitch[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}['\\\"]([a-z0-9]{30})['\\\"]",
"level": "high"}, {"id": 94, "name": "Typeform API token", "description": "Typeform API token",
"match": "(?i)(typeform[a-z0-9_ .\\-,]{0,25})(=|>|:=|\\|\\|:|<=|=>|:).{0,5}(tfp_[a-z0-9\\-_\\.=]{59})",
"level": "high"},
{"id": 95, "name": "Social Security Number", "description": "Social Security Number",
"match": "\\d{3}-\\d{2}-\\d{4}", "level": "low"},
{"id": 96, "name": "Version Control File", "description": "Version Control File",
"filepath": ".*\\/\\.(git|svn)$", "level": "high"},
{"id": 97, "name": "Config File", "description": "Config File", "filepath": ".*\\/config\\.ini$",
"level": "medium"},
{"id": 99, "name": "Desktop Services Store", "description": "Desktop Services Store",
"filepath": " .*\\/\\.DS_Store$", "level": "low"},
{"id": 100, "name": "MySQL client command history file", "description": "MySQL client command history file",
"filepath": ".*\\/\\.(mysql|psql|irb)_history$", "level": "low"},
{"id": 101, "name": "Recon-ng web reconnaissance framework API key database",
"description": "Recon-ng web reconnaissance framework API key database",
"filepath": ".*\\/\\.recon-ng\\/keys\\.db$", "level": "medium"},
{"id": 102, "name": "DBeaver SQL database manager configuration file",
"description": "DBeaver SQL database manager configuration file",
"filepath": ".*\\/\\.dbeaver-data-sources\\.xml$", "level": "low"},
{"id": 103, "name": "S3cmd configuration file", "description": "S3cmd configuration file",
"filepath": ".*\\/\\.s3cfg$", "level": "low"},
{"id": 104, "name": "Ruby On Rails secret token configuration file",
"description": "If the Rails secret token is known, it can allow for remote code execution. (http://www.exploit-db.com/exploits/27527/)",
"filepath": ".*\\/secret_token\\.rb$", "level": "high"}, {"id": 105, "name": "OmniAuth configuration file",
"description": "The OmniAuth configuration file might contain client application secrets.",
"filepath": ".*\\/omniauth\\.rb$",
"level": "high"},
{"id": 106, "name": "Carrierwave configuration file",
"description": "Can contain credentials for online storage systems such as Amazon S3 and Google Storage.",
"filepath": ".*\\/carrierwave\\.rb$", "level": "high"},
{"id": 107, "name": "Potential Ruby On Rails database configuration file",
"description": "Might contain database credentials.", "filepath": ".*\\/database\\.yml$", "level": "high"},
{"id": 108, "name": "Django configuration file",
"description": "Might contain database credentials, online storage system credentials, secret keys, etc.",
"filepath": ".*\\/settings\\.py$", "level": "low"},
{"id": 109, "name": "PHP configuration file", "description": "Might contain credentials and keys.",
"filepath": ".*\\/config(\\.inc)?\\.php$", "level": "low"},
{"id": 110, "name": "Jenkins publish over SSH plugin file",
"description": "Jenkins publish over SSH plugin file",
"filepath": ".*\\/jenkins\\.plugins\\.publish_over_ssh\\.BapSshPublisherPlugin\\.xml$", "level": "high"},
{"id": 111, "name": "Potential Jenkins credentials file",
"description": "Potential Jenkins credentials file", "filepath": ".*\\/credentials\\.xml$",
"level": "high"}, {"id": 112, "name": "Apache htpasswd file", "description": "Apache htpasswd file",
"filepath": ".*\\/\\.htpasswd$", "level": "low"},
{"id": 113, "name": "Configuration file for auto-login process",
"description": "Might contain username and password.", "filepath": ".*\\/\\.(netrc|git-credentials)$",
"level": "high"}, {"id": 114, "name": "Potential MediaWiki configuration file",
"description": "Potential MediaWiki configuration file",
"filepath": ".*\\/LocalSettings\\.php$", "level": "high"},
{"id": 115, "name": "Rubygems credentials file",
"description": "Might contain API key for a rubygems.org account.",
"filepath": ".*\\/\\.gem\\/credentials$", "level": "high"},
{"id": 116, "name": "Potential MSBuild publish profile", "description": "Potential MSBuild publish profile",
"filepath": ".*\\/\\.pubxml(\\.user)?$", "level": "low"},
{"id": 117, "name": "Potential Tencent Accesskey", "description": "Might contain Tencent Accesskey",
"match": "AKID(?i)[a-z0-9]{32}", "level": "high"},
{"id": 118, "name": "Potential aws Accesskey", "description": "Might contain aws Accesskey",
"match": "(A3T[A-Z0-9]|AKIA|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}", "level": "high"},
{"id": 119, "name": "Potential UCloud Accesskey", "description": "Might contain UCloud Accesskey",
"match": "JDC_[A-z,0-9]{28}", "level": "high"},
{"id": 120, "name": "JWT TOKEN", "description": "Might JWT Token ",
"match": "ey[0-9a-zA-Z]{30,34}\\.ey[0-9a-zA-Z-\\/_]{30,500}\\.[0-9a-zA-Z-\\/_]{10,200}={0,2}",
"level": "medium"}, {"id": 121, "name": "Google API", "description": "Might Google API Key ",
"match": "AIza[0-9A-Za-z\\-_]{35}", "level": "medium"},
{"id": 122, "name": "gitlab_pipeline_trigger_token", "description": "GitLab Pipeline Trigger Token",
"match": "glptt-[0-9a-zA-Z_\\-]{20}", "level": "high"},
{"id": 123, "name": "gitlab_runner_registration_token", "description": "GitLab Runner Registration Token",
"match": "GR1348941[0-9a-zA-Z_\\-]{20}", "level": "high"},
{"id": 124, "name": "Flutterwave public key", "description": "Flutterwave public key",
"match": "FLWPUBK_TEST-(?i)[a-h0-9]{32}-X", "level": "high"},
{"id": 125, "name": "Flutterwave secret key", "description": "Flutterwave secret key",
"match": "FLWSECK_TEST-(?i)[a-h0-9]{32}-X", "level": "high"},
{"id": 126, "name": "Flutterwave encrypted key", "description": "Flutterwave encrypted key",
"match": "FLWSECK_TEST[a-h0-9]{12}", "level": "high"},
{"id": 127, "name": "github-app-token", "description": "GitHub App Token",
"match": "(ghu|ghs)_[0-9a-zA-Z]{36}", "level": "high"},
{"id": 128, "name": "github-fine-grained-pat", "description": "GitHub Fine-Grained Personal Access Token",
"match": "github_pat_[0-9a-zA-Z_]{82}", "level": "high"},
{"id": 129, "name": "grafana-cloud-api-token", "description": "Grafana cloud api token",
"match": "glc_[A-Za-z0-9+/]{32,400}={0,2}", "level": "high"},
{"id": 130, "name": "grafana-service-account-token", "description": "Grafana service account token",
"match": "glsa_[A-Za-z0-9]{32}_[A-Fa-f0-9]{8}", "level": "high"},
{"id": 131, "name": "prefect-api-token", "description": "Prefect API token", "match": "pnu_[a-z0-9]{36}",
"level": "medium"}]}
# 排除系统目录
EXCLUDE_DIRS = {
"usr", "lib", "lib32", "lib64", "boot", "run", "media",
"proc", "sys", "dev", "bin", "sbin", "home"
}
def get_scan_dirs():
"""获取需要扫描的目录列表"""
try:
root_contents = image.listdir("/")
return ["/" + d for d in root_contents if d not in EXCLUDE_DIRS]
except Exception as e:
return ["/"]
def check_file_content(filepath, content, rules):
"""检查文件内容是否匹配敏感规则"""
try:
text_content = content.decode('utf-8', errors='ignore')
for rule in rules["rules"]:
if "match" in rule and re.search(rule["match"], text_content):
self.send_image_ws(
get,
msg=f"Discover sensitive information: {filepath}",
detail=f"{rule['description']}: Sensitive information was found in the file {filepath}.",
repair="1. Check and clean up sensitive information \n 2. Use environment variables or a secure key management system",
status=2
)
return True
return False
except:
return False
def check_path_pattern(path, rules):
"""检查路径是否匹配敏感规则"""
for rule in rules["rules"]:
if "filepath" in rule and re.match(rule["filepath"], path):
self.send_image_ws(
get,
msg=f"Discover sensitive {'a directory' if os.path.isdir(path) else 'a file'}: {path}",
detail=f"{rule['description']}: A sensitive path {path} has been detected, which may pose a security risk",
repair="1. Check and delete unnecessary sensitive files/directories\n2. Limit access permissions\n3. Encrypt the storage of sensitive information",
status=2
)
return True
return False
try:
# 1. 获取待扫描目录
scan_dirs = get_scan_dirs()
# 2. 遍历每个目录
for base_dir in scan_dirs:
try:
for root, dirs, files in image.walk(base_dir):
# 控制遍历深度
if len(root.split("/")) > 3:
continue
# 发送进度消息
self.send_image_ws(get, msg=f"Scanning: {root}")
# 跳过空目录
if not dirs and not files:
continue
# 检查目录
for dirname in dirs[:]: # 使用切片创建副本
dirpath = os.path.join(root, dirname)
check_path_pattern(dirpath, rules)
# 检查文件
for filename in files:
try:
filepath = os.path.join(root, filename)
# 跳过白名单文件
if any(fnmatch.fnmatch(filepath, wp) for wp in rules["whitelist"]["paths"]):
continue
# 检查文件状态
try:
f_stat = image.stat(filepath)
if not S_ISREG(f_stat.st_mode) or f_stat.st_size > 10 * 1024 * 1024:
continue
# 读取文件内容
with image.open(filepath, mode="rb") as f:
content = f.read()
if not content: # 跳过空文件
continue
# 检查文件路径和内容
if check_path_pattern(filepath, rules):
continue
check_file_content(filepath, content, rules)
except Exception as e:
continue
except Exception as e:
continue
except Exception as e:
continue
except Exception as e:
# public.print_log(f"|===========扫描失败: {str(e)}")
pass
def scan_backdoor(self, get, image):
"""
@name 扫描后门
@author lwh<2024-1-23>
"""
backdoor_regex_list1 = [
# reverse shell
r'''(nc|ncat|netcat)\b.*(-e|--exec|-c)\b.*?\b(ba|da|z|k|c|a|tc|fi|sc)?sh\b''',
r'''python\w*\b.*\bsocket\b.*?\bconnect\b.*?\bsubprocess\b.*?\bsend\b.*?\bstdout\b.*?\bread\b''',
r'''python\w*\b.*\bsocket\b.*?\bconnect\b.*?\bos\.dup2\b.*?\b(call|spawn|popen)\b\s*\([^)]+?\b(ba|da|z|k|c|a|tc|fi|sc)?sh\b''',
r'''(sh|bash|dash|zsh)\b.*-c\b.*?\becho\b.*?\bsocket\b.*?\bwhile\b.*?\bputs\b.*?\bflush\b.*?\|\s*tclsh\b''',
r'''(sh|bash|dash|zsh)\b.*-c\b.*?\btelnet\b.*?\|&?.*?\b(ba|da|z|k|c|a|tc|fi|sc)?sh\b.*?\|&?\s*telnet\b''',
r'''(sh|bash|dash|zsh)\b.*-c\b.*?\bcat\b.*?\|&?.*?\b(ba|da|z|k|c|a|tc|fi|sc)?sh\b.*?\|\s*(nc|ncat)\b''',
r'''(sh|bash|dash|zsh)\b.*sh\s+(-i)?\s*>&?\s*/dev/(tcp|udp)/.*?/\d+\s+0>&\s*(1|2)''',
]
def bashrc():
"""
@name bashrc后门检测
"""
backdoor_regex_list = [r'''alias\s+ssh=[\'\"]{0,1}strace''', r'''alias\s+sudo=''']
bashrc_dirs = ["/home", "/root"]
for bashrc_dir in bashrc_dirs:
for root, dirs, files in image.walk(bashrc_dir):
for file in files:
if re.match(r'''^\.[\w]*shrc$''', file):
filepath = os.path.join(root, file)
else:
continue
try:
f = image.open(filepath, mode="r")
f_content = f.read()
for backdoor_regex in backdoor_regex_list:
if re.search(backdoor_regex, f_content):
self.send_image_ws(get, msg=public.lang("Found bashrc backdoor{}",filepath), detail=public.lang("It was found that the image has bashrc backdoor: [{}], malicious code content: <br/>{}",filepath, self.short_string(f_content)), repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"), status=3)
for backdoor_regex in backdoor_regex_list1:
if re.search(backdoor_regex, f_content):
self.send_image_ws(get, msg=public.lang("Backdoor file found{}",filepath), detail=public.lang("It was found that the bashrc backdoor file [{}] exists in the image, and the malicious code content is:<br/>{}",filepath, self.short_string(f_content)), repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"), status=3)
except FileNotFoundError:
continue
except BaseException:
pass
def crontab():
"""
@name crontab后门检测
"""
cron_list = ["/etc/crontab", "/etc/cron.hourly", "/etc/cron.daily", "/etc/cron.weekly", "/etc/cron.monthly",
"/etc/cron.d"]
environment_regex = r'''[a-zA-Z90-9]+\s*=\s*[^\s]+$'''
cron_regex = r'''((\d{1,2}|\*)\s+){5}[a-zA-Z0-9]+\s+(.*)'''
backdoor_regex_list = [
# download
r'''^(wget|curl)\b'''
# mrig
r'''^([\w0-9]*mrig[\w0-9]*)\b'''
]
def detect_crontab_content(cron_f):
"""
@name 检查crontab内容
"""
result_dict = {}
for line in cron_f.readlines():
# preprocess
line = line.strip()
line = line.replace("\n", "")
# environment
if re.match(environment_regex, line): continue
m = re.match(cron_regex, line)
if m:
if len(m.groups()) == 3:
cmdline1 = m.group(3)
# for backdoor_regex in backdoor_regex_list:
# if re.search(backdoor_regex, cmdline1):
# result_dict[backdoor_regex] = cmdline1
for backdoor_regex in backdoor_regex_list1:
if re.search(backdoor_regex, cmdline1):
self.send_image_ws(get, msg=public.lang("cron backdoor discovered{}",filepath),
detail=public.lang("It was found that the cron backdoor [{}] exists in the image, and the malicious code content is:<br/>{}",filepath, self.short_string(cmdline1)),
repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"),
status=3)
result_dict[backdoor_regex] = cmdline1
else: continue
return result_dict
for cron in cron_list:
try:
# filetype
cron_stat = image.stat(cron)
if S_ISDIR(cron_stat.st_mode):
for root, dirs, files in image.walk(cron):
for file in files:
filepath = os.path.join(root, file)
with image.open(filepath) as f:
result_dict = detect_crontab_content(f)
# if len(result_dict) > 0:
# for regex, cmdline in result_dict.items():
# self.send_image_ws(get, msg="cron backdoor discovered{}".format(filepath), detail="镜像发现crontab后门{}".format(filepath), repair="文件命中恶意特征{},建议删除此镜像,并及时排查相关使用该镜像部署的容器,删除文件中的恶意代码".format(cmdline), status=3)
elif S_ISREG(cron_stat.st_mode):
with image.open(cron) as f:
result_dict = detect_crontab_content(f)
# if len(result_dict) > 0:
# for regex, cmdline in result_dict.items():
# self.send_image_ws(get, msg="cron backdoor discovered{}".format(cron), detail="发现crontab后门{}".format(cron),
# repair="文件命中恶意特征{},建议删除此镜像,并及时排查相关使用该镜像部署的容器,删除文件中的恶意代码".format(
# cmdline), status=3)
except FileNotFoundError:
continue
def service():
"""
@name 服务后门
"""
service_dir_list = ["/etc/systemd/system"]
for service_dir in service_dir_list:
for root, dirs, files in image.walk(service_dir):
for file in files:
try:
filepath = os.path.join(root, file)
f = image.open(filepath, mode="r")
f_content = f.read()
for backdoor_regex in backdoor_regex_list1:
if re.search(backdoor_regex, f_content):
self.send_image_ws(get, msg=public.lang("Found system backdoor:{}",filepath),
detail=public.lang("It was found that the systemd backdoor file [{}] exists in the image, and the malicious code content is:<br/>{}",filepath, self.short_string(f_content)),
repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"),
status=3)
except FileNotFoundError:
continue
except BaseException:
pass
def sshd():
"""
@name sshd软链接后门检测支持检测常规软连接后门
"""
rootok_list = ("su", "chsh", "chfn", "runuser")
sshd_dirs = ["/home", "/root", "/tmp"]
for sshd_dir in sshd_dirs:
for root, dirs, files in image.walk(sshd_dir):
for f in files:
try:
filepath = os.path.join(root, f)
f_lstat = image.lstat(filepath)
if S_ISLNK(f_lstat.st_mode):
f_link = image.evalsymlink(filepath)
f_exename = filepath.split("/")[-1]
f_link_exename = f_link.split("/")[-1]
if f_exename in rootok_list and f_link_exename == "sshd":
self.send_image_ws(get, msg=public.lang("Found sshd backdoor{}",filepath), detail=public.lang("Found the sshd soft link backdoor: {}, the file hits the malicious feature [exe={};link_file={}]",filepath, f_exename, f_link),
repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"), status=3)
except FileNotFoundError:
continue
except BaseException as e:
pass
# public.print_log(e)
def tcpwrapper():
"""
@name tcpwrapper后门检测
"""
wrapper_config_file_list = ['/etc/hosts.allow', '/etc/hosts.deny']
for config_filepath in wrapper_config_file_list:
try:
with image.open(config_filepath, mode="r") as f:
f_content = f.read()
for backdoor_regex in backdoor_regex_list1:
if re.search(backdoor_regex, f_content):
self.send_image_ws(get, msg=public.lang("Found the tcpwrapper backdoor{}",config_filepath), detail=public.lang("It was found that the tcpwrapper backdoor file [{}] exists in the image, and the malicious code content is:<br/>{}",config_filepath, self.short_string(f_content)),
repair=public.lang("1. Enter the container deployed using the image and delete the malicious code under the file<br/>2. Check whether the container has been invaded, and update the access token or account password of the business in the container<br/>3. It is recommended to replace the official image or Other trusted image deployment containers"), status=3)
except FileNotFoundError:
continue
except BaseException as e:
pass
# public.print_log(e)
# 执行后门检测函数
bashrc()
crontab()
service()
sshd()
tcpwrapper()
def scan_privilege_escalation(self, get, image):
"""
@name 提权风险
@author lwh<2024-01-24>
"""
def scan_escape(self, get, image):
"""
@name 逃逸风险
@author lwh<2024-01-24>
"""
def sudoers():
"""
@name sudo逃逸
"""
sudo_regex = r"(\w{1,})\s\w{1,}=\(.*\)\s(.*)"
unsafe_sudo_files = ["wget", "find", "cat", "apt", "zip", "xxd", "time", "taskset", "git", "sed", "pip", "tmux", "scp", "perl", "bash", "less", "awk", "man", "vim", "env", "ftp"]
try:
with image.open("/etc/sudoers", mode="r") as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if line.startswith("#"):
continue
matches = re.findall(sudo_regex, line)
if len(matches) == 1:
user, sudo_command = matches[0]
if user.lower() in ["admin", "sudo", "root"]:
continue
for unsafe_sudo_file in unsafe_sudo_files:
if unsafe_sudo_file in sudo_command.lower():
self.send_image_ws(get, msg=public.lang("Users found to be at risk of escape{}",user), detail=public.lang("The username {} may complete container escape through the command [{}], allowing the attacker to obtain access rights to the host or other containers. Malicious content:<br/>{}",user, sudo_command, line), status=3, repair=public.lang("1. It is recommended to delete the image or no longer use it<br/>2. If there is already a business using the image, enter the container environment and delete the content of the user {} in the /etc/sudoers file.",user))
break
except Exception as e:
# public.print_log(e)
return
# 开始检测
sudoers()
def scan_log4j2(self, get, image):
"""
@name 扫描是否存在log4j漏洞
@author lwh<2024-01-26>
"""
# def veinmind():
# from veinmind import docker
# client = docker.Docker()
# ids = client.list_image_ids()
# # public.print_log(ids)
# for id in ids:
# image = client.open_image_by_id(id)
# # public.print_log("image id: " + image.id())
# for ref in image.reporefs():
# public.print_log("image ref: " + ref)
# for repo in image.repos():
# public.print_log("image repo: " + repo)
# # public.print_log("image ocispec: " + str(image.ocispec_v1()))
if __name__ == '__main__':
# obj = main()
# get = public.dict_obj()
# obj.get_safe_scan(get=get)
# sudo_regex = r"(\w{1,})\s\w{1,}=\(.*\)\s(.*)"
# unsafe_sudo_files = ["wget", "find", "cat", "apt", "zip", "xxd", "time", "taskset", "git", "sed", "pip", "ed",
# "tmux", "scp", "perl", "bash", "less", "awk", "man", "vi", "vim", "env", "ftp", "all"]
# try:
# with open("/tmp/sudoers.test", "r") as f:
# lines = f.readlines()
# for line in lines:
# line = line.strip()
# if line.startswith("#"):
# continue
# matches = re.findall(sudo_regex, line)
# if len(matches) == 1:
# user, sudo_file = matches[0]
# if user.lower() in ["admin", "sudo", "root"]:
# continue
# print(user.lower(), sudo_file.lower())
# for unsafe_sudo_file in unsafe_sudo_files:
# if unsafe_sudo_file in sudo_file.lower():
# print("用户有问题:{}".format(user))
# break
# except Exception as e:
# print(e)
# 初始化安装检测SDK
try:
from veinmind import docker
except Exception as e:
# public.print_log("Importing veinmind failed:{}".format(e))
requirements_list = ["veinmind"]
shell_command = "btpip install --no-dependencies {}".format(" ".join(requirements_list))
public.ExecShell(shell_command)
sys_ver = public.get_os_version()
# self.send_image_ws(get, msg="正在初始化检测引擎中,首次加载耗时较长...", status=1)
if "Ubuntu" in sys_ver or "Debian" in sys_ver:
public.WriteFile("/etc/apt/sources.list.d/libveinmind.list",
"deb [trusted=yes] https://download.veinmind.tech/libveinmind/apt/ ./")
# public.print_log("Updating apt-get")
public.ExecShell("apt-get update")
time.sleep(1)
public.ExecShell("apt-get install -y libveinmind-dev")
time.sleep(1)
elif "CentOS" in sys_ver:
public.WriteFile("/etc/yum.repos.d/libveinmind.repo", """[libveinmind]
name=libVeinMind SDK yum repository
baseurl=https://download.veinmind.tech/libveinmind/yum/
enabled=1
gpgcheck=0""")
public.ExecShell("yum makecache", timeout=10)
public.ExecShell("yum install -y libveinmind-devel", timeout=10)
else:
pass
# public.print_log("不支持的系统版本")
# return public.returnMsg(False, public.lang("不支持的系统版本"))
# self.send_image_ws(get, msg="正在检查依赖库是否存在...", status=1)
result, err = public.ExecShell("whereis libdl.so")
result = result.strip().split(" ")
# public.print_log("libdl.so库的情况{}".format(result))
if len(result) <= 1:
# public.print_log("缺少libdl.so库")
result, err = public.ExecShell("whereis libdl.so.2")
result = result.strip().split(" ")
# public.print_log("libdl.so.2库的情况:{}".format(result))
if len(result) <= 1:
# public.print_log("缺少libdl.so库需要安装libdl.so或libdl.so2")
public.returnMsg(False, "The libdl.so library is missing, you need to install libdl.so or libdl.so2")
else:
# 建立libdl.so软链接至libdl.so.2
for lib in result[1:]:
ln_command = "ln -s {} {}".format(lib, lib[:-2])
public.print_log("Soft link in progress{}".format(ln_command))
public.ExecShell(ln_command)
from veinmind import docker
# public.print_log("执行成功")