my_debugger.py
# -*- coding: cp949 -*-
from ctypes import *
from my_debugger_defines import *
kernel32 = windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
self.breakpoints = {}
self.first_breakpoint = True
# Here let's determine and store
# the default page size for the system
# determine the system page size.
def load(self,path_to_exe):
# dwCreation flag determines how to create the process
# set creation_flags = CREATE_NEW_CONSOLE if you want
# to see the calculator GUI
creation_flags = DEBUG_PROCESS
# instantiate the structs
startupinfo = STARTUPINFO()
process_information = PROCESS_INFORMATION()
# The following two options allow the started process
# to be shown as a separate window. This also illustrates
# how different settings in the STARTUPINFO struct can affect
# the debuggee.
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
# We then initialize the cb variable in the STARTUPINFO struct
# which is just the size of the struct itself
startupinfo.cb = sizeof(startupinfo)
if kernel32.CreateProcessA(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
byref(startupinfo),
byref(process_information)):
print "[*] We have successfully launched the process!"
print "[*] PID : %d" % process_information.dwProcessId
# 새로 생성한 프로세스의 핸들을 구한 후
# 나중에 접근하기 위해 저장한다.
self.h_process = self.open_process(process_information.dwProcessId)
else:
print "[*] Error : 0x%08x." % kernel32.GetLastError()
def open_process(self,pid):
# PROCESS_ALL_ACCESS = 0x0x001F0FFF
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS,False,pid)
return h_process
def open_thread(self, thread_id): # thread_id를 인자로 전달해주면 해당 thread의 핸들을 리턴한다.
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print "[*] Could not obtain a valid thread handle."
return False
def enumerate_threads(self):
thread_entry = THREADENTRY32() # Thread32First 실행인자로 전달되는 구조체
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
# 먼저 구조체의 크기를 설정해야 한다.
thread_entry.dwSize = sizeof(thread_entry)
success = kernel32.Thread32First(snapshot, byref(thread_entry))
while success:
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
success = kernel32.Thread32Next(snapshot, byref(thread_entry))
# Thread32First 함수 실행 후에는 Thread32Next 함수로 스레드 리스트를 끝까지 열거한다.
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
def get_thread_context (self, thread_id=None, h_thread=None):
context = CONTEXT() # 디버그 레지스터에 대한 정보를 담고있는 CONTEXT 구조체이다.
context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
# 스레드의 핸들을 구한다.
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(self.h_thread, byref(context)):
# 레지스터 정보를 얻기 위해서 GetThreadContext 함수를 사용한다.
# 첫 인자는 레지스터를 얻고자 하는 스레드의 핸들이며, 두번째는 정보가 저장되어야 하는 context의 포인터이다.
return context
else:
return False
def attach(self,pid):
self.h_process = self.open_process(pid)
# 프로세스에 대한 어태치를 시도한다.
# 실패하면 호출을 종료한다.
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
else:
print "[*] Unable to attach to the process."
def run(self):
# 이제 디버기에 대한 디버그 이벤트를 처리해야 한다.
# debugging events
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
resultMap = {
1:"EXCEPTION_DEBUG_EVENT",
2:"CREATE_THREAD_DEBUG_EVENT",
3:"CREATE_PROCESS_DEBUG_EVENT",
4:"EXIT_THREAD_DEBUG_EVENT",
5:"EXIT_PROCESS_DEBUG_EVENT",
6:"LOAD_DLL_DEBUG_EVENT",
7:"UNLOAD_DLL_DEBUG_EVENT",
8:"OUTPUT_DEBUG_STRING_EVENT",
9:"RIP_EVENT"
}
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
# 스레드의 컨텍스터 정보를 구한다.
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread = self.h_thread)
self.debug_event = debug_event
print "Event Code: %d : %s / Thread Id: %d" % (debug_event.dwDebugEventCode,resultMap.get(debug_event.dwDebugEventCode), debug_event.dwThreadId)
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
#exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
if self.exception == EXCEPTION_ACCESS_VIOLATION:
print "Access Violation Detected."
elif self.exception == EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
elif self.exception == EXCEPTION_GUARD_PAGE:
print "Guard Page Access Detected."
elif self.exception == EXCEPTION_SINGLE_STEP:
print "Single Stepping."
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status)
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print "[*] Finished debugging. Exiting..."
return True
else:
print "There was an error"
return False
def exception_handler_breakpoint(self):
#print "[+] Inside the breakpoint handler."
print "Exception Address: 0x%08x" % self.exception_address
if not self.breakpoints.has_key(self.exception_address):
if self.first_breakpoint == True:
self.first_breakpoint = False
print "[+] Hit the first breakpoint."
return DBG_CONTINUE
else:
print "[+] Hit user defined breakpoint."
self.write_process_memory(self.exception_address,self.breakpoints[self.exception_address])
self.context = self.get_thread_context(h_thread=self.h_thread)
self.context.Eip -= 0x01 # Error!
kernel32.SetThreadContext(self.h_thread,byref(self.context))
continue_status = DBG_CONTINUE
return continue_status
def read_process_memory(self,address,length):
data = ""
read_buf = create_string_buffer(length)
count = c_ulong(0)
if not kernel32.ReadProcessMemory(self.h_process,address,read_buf,length,byref(count)):
return False
else:
data = data + read_buf.raw
return data
def write_process_memory(self,address,data):
count = c_ulong(0)
length = len(data)
c_data = c_char_p(data[count.value:])
if not kernel32.WriteProcessMemory(self.h_process,address,c_data,length,byref(count)):
return False
else:
return True
def bp_set(self,address):
if not self.breakpoints.has_key(address):
try:
#원래의 바이트 값을 저장
original_byte = self.read_process_memory(address,1)
# cc opcode write
self.write_process_memory(address,"\xCC")
# 리스트에 bp 추가
self.breakpoints[address] = (original_byte)
print self.breakpoints[address]
except:
return False
return True
def resolve_func(self,dll,function):
handle = kernel32.GetModuleHandleA(dll)
address = kernel32.GetProcAddress(handle,function)
kernel32.CloseHandle(handle)
return address
my_test.py
# -*- coding: cp949 -*-
import my_debugger
f=open("pid.txt",'r')
pid=f.readline()
debugger = my_debugger.debugger()
printf_address = debugger.resolve_func("msvcrt.dll","printf")
debugger.attach(int(pid))
#pid = raw_input("Enter the PID of the process to attach to: ")
print "pid : %s" % pid
print "[+] Address of printf : 0x%08x" % printf_address
debugger.bp_set(printf_address)
debugger.run()
debugger.detach()
'스터디 > └ 소스파일들' 카테고리의 다른 글
| Memory Breakpoint 까지의 소스 (0) | 2015.01.17 |
|---|---|
| Hardware Breakpoint 까지 소스 (0) | 2015.01.17 |
| my_test.py (0) | 2015.01.15 |
| my_debugger_defines.py (0) | 2015.01.15 |
| my_debugger.py (0) | 2015.01.15 |
my_debugger.py