环境配置
编译 ollvm
先将 GitHub 上的 ollvm 下载下来。
git clone -b llvm-4.0 https://github.com/obfuscator-llvm/obfuscator.git
创建文件夹,作为编译 ollvm 的目录。
mkdir ollvm
cd ollvm
加载 ollvm 项目。
cmake -DCMAKE_BUILD_TYPE=Release -DLLVM_INCLUDE_TESTS=OFF ~/obfuscator
然后编译项目。
make -j4
ubuntu18.04 我的环境是直接编译成功了,但是对于 ubuntu20.04 ,我这里遇到了两个报错。 第一个报错: 这个错误是因为本机的 gcc 和 g++ 版本是 9.x.x ,改为 8.x.x 就好了。
安装 8.x.x 的编译器
sudo apt-get install gcc-8 g++-8 -y
利用 linux 软件版本管理命令 update-alternatives 更改优先级。
先要在 update-alternatives 工具中注册
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-8 8
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-8 8
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-9 9
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-9 9
gcc 切换版本 默认gcc 8
sudo update-alternatives --config gcc
gcc 版本已成功切换 同理, g++ 切换版本 默认g++ 8
sudo update-alternatives --config g++
第二个报错: 貌似也是编译器版本的问题,不过可以通过修改源码解决。
首先找到出错的位置
sudo gedit ~/obfuscator/include/llvm/ExecutionEngine/Orc/OrcRemoteTargetClient.h
修改 char 为 uint8_t 即可。
添加字符串加密模块
这里采用上海交大密码与计算机安全实验室GoSSIP小组设计的基于LLVM 4.0的孤挺花混淆框架。
首先找到字符串加密模块 提取出该文件,放到 obfuscator 相同目录下 在 Obfuscation下的 CMakeLists.txt 将 StringObfuscation.cpp 添加到编译库中 在 include/llvm/Transforms/Obfuscation下增加字符串混淆的头文件 StringObfuscation.h 内容为:
namespace llvm {
Pass* createStringObfuscation(bool flag);
}
最后只需要在 /lib/Transforms/IPO 下的 PassManagerBuilder.cpp 将字符串加密的编译选项添加进去即可
-
添加 #include "llvm/Transforms/Obfuscation/StringObfuscation.h" 引用 -
在合适的地方插入以下两条函数声明,即编译时的编译参数 -mllvm -sobf : static cl::opt<std::string>
Seed("seed", cl::init(""),
cl::desc("seed for the random"));
static cl::opt<bool>
StringObf("sobf", cl::init(false),
cl::desc("Enable the string obfuscation"));
-
在 PassManagerBuilder::PassManagerBuilder() 构造函数中添加随机数因子的初始化
if(!Seed.empty()) {
if(!llvm::cryptoutils->prng_seed(Seed.c_str()))
exit(1);
}
-
最后将该 pass 添加进 void PassManagerBuilder::populateModulePassManager 中即可 MPM.add(createStringObfuscation(StringObf));
注意别加到前面的 if 判断中。 最后重新进行前面的编译过程即可。
基本使用
在bin目录下执行以下指令:./clang test.c -o test -mllvm -sub -mllvm -fla -mllvm -bcf -mllvm -sobf
-mllvm -sub :激活指令替换 -mllvm -sub_loop=3 :如果激活了传递,则在函数上应用3次。默认值:1-mllvm -fla :激活控制流平坦化 -mllvm -split :激活基本块分割。在一起使用时改善展平。 -mllvm -split_num=3 :如果激活了传递,则在每个基本块上应用3次。默认值:1-mllvm -bcf :激活虚假控制流程 -mllvm -bcf_loop=3 :如果激活了传递,则在函数上应用3次。默认值:1 -mllvm -bcf_prob=40 :如果激活了传递,基本块将以40%的概率进行模糊处理。默认值:30-mllvm -sobf :编译时候添加选项开启字符串加密
指令替换
指令替换将正常的二元运算指令替换为等效而更加复杂的指令序列,已达到混淆运算过程的目的。
加法替换
a = b + c 有 4 种替换方案
- addNeg
a=b-(-c) - addDoubleNeg
a=-(-b+(-c)) - addRand
r=rand();a=b+r;a=a+c;a=a-r; - addRand2
r=rand();a=b-r;a=a+c;a=a+r;
减法替换
a=b-c 有 3 种替换方案
- subNeg
a=b+(-c) - subRand
r=rand();a=b+r;a=a-c;a=a-r; - subRand2
r=rand();a=b-r;a=a-c;a=a+r;
与替换
a=b&c 有 2 种替换方案
- addSubstitute
a=(b^~c)&b - andSubstituteRand
a=~(~b|~c)&(r|~r)
或替换
a=b|c 有 2 种替换方案
- orSubstitute
a=(b&c)|(b^c) - orSubstituteRand
a=~(~b&~c)&(r|~r)
异或替换
a=b^c 有 2 种替换方案
- xorSubstitute
a=(~a&b)|(a&~b) - xorSubstituteRand
a=(b^r)^(c^r)=(~b&r|b&~r)^(~c&r|c&~r)
控制流平坦化
控制流平坦化原理
混淆之后的 CFG 图是这样的: 可以抽象为下面这个结构: 平坦化过程其实就是把源代码划分代码块后将其改为 switch 结构。
以上面的图片为例,原本代码为:
序言;
真实块1;
if (condition) {
真实块2;
} else {
真实块3;
}
retn 块;
平坦化之后代码为:
序言
switchVar = 2;
while (true) {
case 2: {
真实块1
switchVar = condition ? 5 : 3;
break;
}
case 5: {
真实块2
switchVar = 1;
break;
}
case 3: {
真实块3
switchVar = 1;
break;
}
case 1: {
retn 块
return;
}
}
利用angr符号执行去除控制流平坦化
参考
静态分析CFG
整个程序的 CFG 可以通过 CFGFast 函数得到:
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
与 IDA 不同的是,angr 的 CFG 会将 call 指令也视为跳转的一种,作为基本块的最后一条指令,并且 IDA 中的 CFG 是以函数为单位的,而不是整个程序。所以我们要通过 angrmangement 中的 to_supergraph 函数将 angr 的 CFG 单个函数的 CFG ,再转化为类似 IDA 的 CFG ,代码如下:
def get_cfg():
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
function_cfg = cfg.functions.get(start).transition_graph
super_cfg = to_supergraph(function_cfg)
return super_cfg
这样我们就得到了一个类似 IDA 的 CFG ,接着我们要通过分析平坦化后的控制流的结构规律来识别出各类型的基本块,识别方法如下:
-
序言/入口块(Prologue):没有前驱块的基本块即是入口块 for node in cfg.nodes:
if cfg.in_degree(node) == 0:
prologue_node = node
-
返回块(Return):没有后继块的基本块即是返回块,返回块可能有多个 retn_nodes = []
for node in cfg.nodes:
if cfg.out_degree(node) == 0:
retn_nodes.append(node)
-
主分发器(Main dispatcher):入口块的后继块即为主分发器 main_dispatcher_node = list(cfg.successors(prologue_node))[0]
-
预分发器(Predispatcher):主分发器的前驱块,且不为入口块 for node in cfg.predecessors(main_dispatcher_node):
if node.addr != prologue_node.addr:
predispatcher_node = node
break
-
真实块(Relevant blocks):预分发器的前驱块,为了后续处理方便,这里也把入口块算作真实块 for node in cfg.predecessors(main_dispatcher_node):
if node.addr != prologue_node.addr:
predispatcher_node = node
break
-
子分发器/无用块(Sub dispatchers):除上述基本块之外的基本块都为子分发器,因为子分发器再恢复之后的控制流中不起任何作用,所以也叫作无用块,之后要被 nop 掉 relevant_nodes = [prologue_node]
sub_dispatcher_nodes = []
for node in cfg.nodes:
if node in cfg.predecessors(predispatcher_node):
relevant_nodes.append(node)
elif node != prologue_node and node not in retn_nodes:
sub_dispatcher_nodes.append(node)
重建控制流
利用 angr 符号执行,确定各个真实块之间的关系。 首先获取所有有效的代码块的起始地址,包括序言块,真实块和返回块。
relevant_addrs = set.union({node.addr for node in relevant_nodes}, {node.addr for node in retn_nodes})
创建记录控制流的 flow,用于接下来存放每个块与其后继块的地址的映射。
flow = defaultdict(list)
预处理代码块:
- 由于去混淆是以函数为单位的,因此对于函数调用,需要将其 hook 掉。
for insn in block.capstone.insns:
if insn.mnemonic == 'call':
proj.hook(insn.address, hook=nop_proc, length=5)
print('Hook [%s\t%s] at %#x' % (insn.mnemonic, insn.op_str, insn.address))
- 按照是否存在条件跳转将代码块分类,并记录条件跳转的类型和跳转地址。
elif insn.mnemonic.startswith('cmov'):
has_branch = True
patch_addrs[block_addr] = insn.address
cmov_types[block_addr] = insn.mnemonic
针对前面划分的两类真实块分别处理,建立控制流:
-
有一个确定的后继块 这种情况直接符号执行到下一个真实块。 simgr = proj.factory.simgr(state)
simgr.step()
while len(simgr.active):
for active in simgr.active:
if active.addr in relevant_addrs:
flow[block_addr].append(active.addr)
return
simgr.step()
-
有两个后继块,跳转到哪个由某个条件决定 通过statement断点监控了VEX IR中的ITE指令,通过修改ITE指令中的临时变量改变符号执行的状态。然后符号执行确定后继块。 def modify_ITE_cond(state):
expressions = list(state.scratch.irsb.statements[state.inspect.statement].expressions)
if len(expressions) != 0 and isinstance(expressions[0], pyvex.expr.ITE):
state.scratch.temps[expressions[0].cond.tmp] = modify_cond
state.inspect._breakpoints['statement'] = []
state.inspect.b('statement', when=BP_BEFORE, action=modify_ITE_cond)
Patch程序
首先将子分发器全部 nop 掉,因为这些基本块在我们重建之后的控制流中不起任何作用:
for node in sub_dispatcher_nodes:
fill_nops(node.addr, node.size)
print('Fill nops from %#x to %#x' % (node.addr, node.addr + node.size))
对于没有分支的真实块,直接让他跳转到对应的后继真实块。
注意入口块要做一个特殊处理,因为入口块的最后一条指令并不是jmp指令,所以要从主分发块的头部进行Patch。
for node in relevant_nodes:
childs = flow[node.addr]
if len(childs) == 1:
if node.addr == prologue_node.addr:
patch_addr = node.addr + node.size
else:
patch_addr = node.addr + node.size - 5
fill_jmp(patch_addr, childs[0])
print('Patch jmp %#x at %#x' % (childs[0], patch_addr))
对于有分支的基本块,则根据 cmov 指令的类型进行 Patch
elif len(childs) == 2:
patch_addr = patch_addrs[node.addr]
cmov_type = cmov_types[node.addr]
fill_nops(patch_addr, node.addr + node.size - patch_addr)
fill_jx(patch_addr, childs[0], cmov_type)
fill_jmp(patch_addr + 6, childs[1])
print('Patch jz %#x at %#x' % (childs[0], patch_addr))
print('Patch jmp %#x at %#x' % (childs[1], patch_addr + 6))
完整代码
from collections import defaultdict
import angr
from angr.state_plugins.inspect import BP_BEFORE
from angrmanagement.utils.graph import to_supergraph
import argparse
import sys
import claripy
import logging
import pyvex
from keystone import *
logging.getLogger('angr.storage.memory_mixins.default_filler_mixin').setLevel(logging.ERROR)
def get_cfg():
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
function_cfg = cfg.functions.get(start).transition_graph
super_cfg = to_supergraph(function_cfg)
return super_cfg
def analyse_blocks():
retn_nodes = []
for node in cfg.nodes:
if cfg.in_degree(node) == 0:
prologue_node = node
elif cfg.out_degree(node) == 0:
retn_nodes.append(node)
main_dispatcher_node = list(cfg.successors(prologue_node))[0]
for node in cfg.predecessors(main_dispatcher_node):
if node.addr != prologue_node.addr:
predispatcher_node = node
break
relevant_nodes = [prologue_node]
sub_dispatcher_nodes = []
for node in cfg.nodes:
if node in cfg.predecessors(predispatcher_node):
relevant_nodes.append(node)
elif node != prologue_node and node not in retn_nodes:
sub_dispatcher_nodes.append(node)
return prologue_node, main_dispatcher_node, sub_dispatcher_nodes, retn_nodes, relevant_nodes, predispatcher_node
def preprocess(block_addr):
def nop_proc(state):
pass
block = proj.factory.block(block_addr)
has_branch = False
for insn in block.capstone.insns:
if insn.mnemonic == 'call':
proj.hook(insn.address, hook=nop_proc, length=5)
print('Hook [%s\t%s] at %#x' % (insn.mnemonic, insn.op_str, insn.address))
elif insn.mnemonic.startswith('cmov'):
has_branch = True
patch_addrs[block_addr] = insn.address
cmov_types[block_addr] = insn.mnemonic
return has_branch
def symbolic_execute(block_addr, modify_cond=None):
def modify_ITE_cond(state):
expressions = list(state.scratch.irsb.statements[state.inspect.statement].expressions)
if len(expressions) != 0 and isinstance(expressions[0], pyvex.expr.ITE):
state.scratch.temps[expressions[0].cond.tmp] = modify_cond
state.inspect._breakpoints['statement'] = []
state = proj.factory.blank_state(addr=block_addr, remove_options={
angr.sim_options.LAZY_SOLVES})
if modify_cond is not None:
state.inspect.b('statement', when=BP_BEFORE, action=modify_ITE_cond)
simgr = proj.factory.simgr(state)
simgr.step()
while len(simgr.active):
for active in simgr.active:
if active.addr in relevant_addrs:
flow[block_addr].append(active.addr)
return
simgr.step()
print('Error at block %#x' % block_addr)
def fill_nops(addr, size):
offset = addr - base_addr
content[offset:offset + size] = b'\x90' * size
def fill_jmp(src, dest):
offset = src - base_addr
if dest != src + 5:
content[offset] = 0xE9
content[offset + 1:offset + 5] = (dest - src - 5).to_bytes(4, 'little', signed=True)
else:
fill_nops(src, 5)
def get_jx_opcode(jx_type):
ks = Ks(KS_ARCH_X86, KS_MODE_32)
code, count = ks.asm(f'{jx_type} 0xFFFFFFFF')
return b''.join(map(lambda x: x.to_bytes(1, sys.byteorder), code[0:2]))
def fill_jx(src, dest, cmov_type):
offset = src - base_addr
content[offset:offset + 2] = get_jx_opcode(cmov_type.replace('cmov', 'j'))
content[offset + 2:offset + 6] = (dest - src - 6).to_bytes(4, 'little', signed=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Deobfuscate OLLVM Control Flow Flatten')
parser.add_argument('-f', '--file', help='binary to deobfuscate')
parser.add_argument('-s', '--start', help='start address of the deobfuscation')
parser.add_argument('-o', '--out', help='output file path')
args = parser.parse_args()
if args.file is None or args.start is None or args.out is None:
parser.print_help()
sys.exit(0)
filename = args.file
start = int(args.start, 16)
proj = angr.Project(filename, load_options={'auto_load_libs': False})
print('**************** Step-1 Static Analysis(1/3) ****************')
cfg = get_cfg()
prologue_node, main_dispatcher_node, sub_dispatcher_nodes, retn_nodes, relevant_nodes, predispatcher_node = analyse_blocks()
print('Prologue block at %#x' % prologue_node.addr)
print('Main dispatcher block at %#x' % main_dispatcher_node.addr)
print('Sub dispatcher blocks at ', [hex(node.addr) for node in sub_dispatcher_nodes])
print('Return blocks at ', [hex(node.addr) for node in retn_nodes])
print('Relevant blocks at ', [hex(node.addr) for node in relevant_nodes])
print('Predispatcher blocks at %#x' % predispatcher_node.addr)
print('**************** Step-2 Recover Control Flow(2/3) ****************')
relevant_addrs = set.union({node.addr for node in relevant_nodes}, {node.addr for node in retn_nodes})
patch_addrs = {}
cmov_types = {}
flow = defaultdict(list)
for node in relevant_nodes:
block_addr = node.addr
has_branch = preprocess(block_addr)
if has_branch:
symbolic_execute(block_addr, modify_cond=claripy.BVV(1, 1))
symbolic_execute(block_addr, modify_cond=claripy.BVV(0, 1))
else:
symbolic_execute(block_addr)
for node in relevant_nodes:
block_addr = node.addr
print('Real successors of block %#x: ' % block_addr, [hex(child) for child in flow[block_addr]])
print('**************** Step-3 Patch Binary(3/3) ****************')
base_addr = proj.loader.main_object.mapped_base
with open(filename, 'rb') as file:
content = bytearray(file.read())
for node in sub_dispatcher_nodes:
fill_nops(node.addr, node.size)
print('Fill nops from %#x to %#x' % (node.addr, node.addr + node.size))
for node in relevant_nodes:
childs = flow[node.addr]
if len(childs) == 1:
if node.addr == prologue_node.addr:
patch_addr = node.addr + node.size
else:
patch_addr = node.addr + node.size - 5
fill_jmp(patch_addr, childs[0])
print('Patch jmp %#x at %#x' % (childs[0], patch_addr))
elif len(childs) == 2:
patch_addr = patch_addrs[node.addr]
cmov_type = cmov_types[node.addr]
fill_nops(patch_addr, node.addr + node.size - patch_addr)
fill_jx(patch_addr, childs[0], cmov_type)
fill_jmp(patch_addr + 6, childs[1])
print('Patch jz %#x at %#x' % (childs[0], patch_addr))
print('Patch jmp %#x at %#x' % (childs[1], patch_addr + 6))
else:
print('Error')
sys.exit(-1)
with open(args.out, 'wb') as file:
file.write(content)
虚假控制流
虚假控制流
使用一些 IDA 分析不出的永真和永假的判断。
利用angr符号执行去除虚假控制流
参考 大体思路就是用 angr 执行一遍,标记一下那些没有执行到的代码块,然后 nop 掉。
获取代码块
blocks = set()
cfg = get_cfg(func_addr)
for node in cfg.nodes:
blocks.add(node.addr)
利用 angr 符号执行识别出不可达的基本块
state = proj.factory.blank_state(addr=func_addr)
simgr = proj.factory.simgr(state)
while len(simgr.active):
for active in simgr.active:
blocks.discard(active.addr)
block = proj.factory.block(active.addr)
for insn in block.capstone.insns:
if insn.mnemonic == 'call':
next_func_addr = int(insn.op_str, 16)
proj.hook(next_func_addr, angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
print('Hook [%s\t%s] at %#x' % (insn.mnemonic, insn.op_str, insn.address))
simgr.step()
将不可达基本快 nop 掉
for block_addr in blocks:
patch_nops(proj.factory.block(block_addr))
完整代码
import angr
from angrmanagement.utils.graph import to_supergraph
import argparse
import logging
import os
def patch_nops(block):
offset = block.addr - proj.loader.main_object.mapped_base
binfile[offset: offset + block.size] = b'\x90' * block.size
print('Patch nop at block %#x' % block.addr)
def get_cfg(func_addr):
cfg = proj.analyses.CFGFast(normalize=True, force_complete_scan=False)
function_cfg = cfg.functions.get(func_addr).transition_graph
super_cfg = to_supergraph(function_cfg)
return super_cfg
def deobfu_func(func_addr):
blocks = set()
cfg = get_cfg(func_addr)
for node in cfg.nodes:
blocks.add(node.addr)
print([hex(b) for b in blocks])
state = proj.factory.blank_state(addr=func_addr)
simgr = proj.factory.simgr(state)
while len(simgr.active):
for active in simgr.active:
blocks.discard(active.addr)
block = proj.factory.block(active.addr)
for insn in block.capstone.insns:
if insn.mnemonic == 'call':
next_func_addr = int(insn.op_str, 16)
proj.hook(next_func_addr, angr.SIM_PROCEDURES["stubs"]["ReturnUnconstrained"](), replace=True)
print('Hook [%s\t%s] at %#x' % (insn.mnemonic, insn.op_str, insn.address))
simgr.step()
for block_addr in blocks:
patch_nops(proj.factory.block(block_addr))
if __name__ == '__main__':
logging.getLogger('cle').setLevel(logging.ERROR)
logging.getLogger('angr').setLevel(logging.ERROR)
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--file', required=True, help='File to deobfuscate')
parser.add_argument('-s', '--start', type=lambda x: int(x, 0), help='Starting address of target function')
args = parser.parse_args()
proj = angr.Project(args.file, load_options={"auto_load_libs": False})
start = args.start
if start == None:
main = proj.loader.find_symbol('main')
if main == None:
parser.error('Can\'t find the main function, please provide argument -s/--start')
start = main.rebased_addr
with open(args.file, 'rb') as file:
binfile = bytearray(file.read())
deobfu_func(func_addr=start)
fname, ext = os.path.splitext(args.file)
with open(fname + '_recovered' + ext, 'wb') as file:
file.write(binfile)
print('Deobfuscation success!')
·
|