Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syscalls linux32 #1170

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ env:
- MIASM_TEST_EXTRA_ARG="-o python -t qemu,long"
- MIASM_TEST_EXTRA_ARG="-o z3"
- MIASM_TEST_EXTRA_ARG="-o cparser"
- MIASM_EXTENTED_TESTS="ls_x64"
- MIASM_EXTENTED_TESTS_1="ls_x64"
- MIASM_EXTENTED_TESTS_2="shellcode_x86"
before_script:
- pip install -r optional_requirements.txt
# codespell
Expand All @@ -36,4 +37,5 @@ before_script:
- git clone https://github.com/cea-sec/miasm-extended-tests
script:
- test -z "$MIASM_TEST_EXTRA_ARG" || (cd test && flags=""; python --version |& grep -q "Python 3" || flags="-W error"; python $flags test_all.py $MIASM_TEST_EXTRA_ARG && git ls-files -o --exclude-standard)
- test -z "$MIASM_EXTENTED_TESTS" || (cd "miasm-extended-tests/$MIASM_EXTENTED_TESTS" && ./run.sh "$TRAVIS_BUILD_DIR")
- test -z "$MIASM_EXTENTED_TESTS_1" || (cd "miasm-extended-tests/$MIASM_EXTENTED_TESTS_1" && ./run.sh "$TRAVIS_BUILD_DIR")
- test -z "$MIASM_EXTENTED_TESTS_2" || (cd "miasm-extended-tests/$MIASM_EXTENTED_TESTS_2" && ./run.sh "$TRAVIS_BUILD_DIR")
63 changes: 63 additions & 0 deletions example/jitter/simu_sc_linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from argparse import ArgumentParser
from pdb import pm
from miasm.jitter.csts import PAGE_READ, PAGE_WRITE, EXCEPT_INT_XX, EXCEPT_ACCESS_VIOL, EXCEPT_PRIV_INSN
from miasm.analysis.machine import Machine
from miasm.os_dep.linux import environment, syscall
import logging


def code_sentinelle(jitter):
print("Done")
jitter.run = False
jitter.pc = 0
return True

def priv(jitter):
print("Privilege Exception")
return False


if __name__ == '__main__':
parser = ArgumentParser(description="Linux shellcode")
parser.add_argument("filename", help="Shellcode filename")
parser.add_argument("-j", "--jitter",
help="Jitter engine",
default="python")
parser.add_argument("-a", "--arch", help="Architecture (x86_32, \
x86_64, arml)", choices=["x86_32", "x86_64", "arml"],
default="x86_32")
parser.add_argument("--verbose", "-v", action="store_true",
help="Verbose mode")
args = parser.parse_args()

myjit = Machine(args.arch).jitter(args.jitter)
myjit.init_stack()


data = open(args.filename, 'rb').read()
run_addr = 0x40000000
myjit.vm.add_memory_page(run_addr, PAGE_READ | PAGE_WRITE, data)
if args.verbose:
myjit.set_trace_log()
myjit.add_exception_handler(EXCEPT_PRIV_INSN, priv)
myjit.add_exception_handler(EXCEPT_ACCESS_VIOL, code_sentinelle)

# Log syscalls
log = logging.getLogger('syscalls')
log.setLevel(logging.DEBUG)

# Enable syscall handling
if args.arch == "x86_32":
myjit.push_uint32_t(0x1337beef)
myjit.add_breakpoint(0x1337beef, code_sentinelle)
env = environment.LinuxEnvironment_x86_32()
syscall.enable_syscall_handling(myjit, env, syscall.syscall_callbacks_x86_32)
elif args.arch == "x86_64":
myjit.push_uint64_t(0x1337beef)
myjit.add_breakpoint(0x1337beef, code_sentinelle)
env = environment.LinuxEnvironment_x86_64()
syscall.enable_syscall_handling(myjit, env, syscall.syscall_callbacks_x86_64)
else:
env = environment.LinuxEnvironment_arml()
syscall.enable_syscall_handling(myjit, env, syscall.syscall_callbacks_arml)
myjit.run(run_addr)
15 changes: 15 additions & 0 deletions miasm/os_dep/linux/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def __init__(self, number, family, type_, protocol):
self.type_ = type_
self.protocol = protocol

def read(self, count):
return b""

Comment on lines +186 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a good idea to have a default read here. Maybe we can raise an error with pure abstract function, in order to for the user subclass this in order to implements it's own read.
See for example

raise NotImplementedError("Abstract method")

The user can subclass its own LinuxEnvironement and set a brand new self.network

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea, but it seems hard to subclass, it means you have to implement a subclass of FileDescriptorSocket, Network and LinuxEnvironment, and make all this work together, right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of. It will give something like:

class CustomFileDescriptorSocket(FileDescriptorSocket):
    def read(self, count):
        print("Turlututu")

class CustomNetworking(Networking):
    def socket(self, family, type_, protocol):
        fd = self.linux_env.next_fd()
        fdesc = CustomFileDescriptorSocket(fd, family, type_, protocol)
        self.linux_env.file_descriptors[fd] = fdesc
        return fd


class CustomLinuxEnvironment(LinuxEnvironment):
    def __init__(self):
        super(CustomLinuxEnvironment, self).__init__()
        self.network = CustomNetworking(self)

But maybe there is better: we could modify those classes to have a class variable which embed their needs. For example, for Networking:

class Networking(object):
    """Network abstraction"""

    fd_generator = FileDescriptorSocket
    def __init__(self, linux_env):
        self.linux_env = linux_env

    def socket(self, family, type_, protocol):
        fd = self.linux_env.next_fd()
        fdesc = self.fd_generator(fd, family, type_, protocol)
        self.linux_env.file_descriptors[fd] = fdesc
        return fd

So the "overhead" may just be:

class CustomNetworking(Networking):
    fd_generator = CustomFileDescriptorSocket

But I am not really sure if this is a suitable python pattern.
Or maybe Networking should take it's generator as init argument ?
It's a problem we already face in the SandBox object, which depends on os, arch, ...

@commial @p-l- I am interested if you have some feed on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the same problem than the multiple inheritance of Sandbox. IMHO, it looks like more what we've done in Jitcore with Cgen or SymbExecClass, which reflects your last proposal.

In my opinion, the question is "what we want to provides, and what customization should be reasonably easy to implements?".
I agree with the fact that it should be easy to modify what the socket returns, its state, etc. i'm not sure that the more global Networking part needs that kind of customization possibility.

A pattern we can use would be to provides a kind of "socket factory" (sorry for this word, but it is what it is) that the Network would use to creates its sockets.
It could be a function, taking as input the socket parameters and returning an instance with the socket "interface", ie. a subclass of the socket fd.
It could also be a class, taking as __init__ these parameters, and asked just after for successful creation or not (to keep the possibility to easiliy deny socket creation). I rather prefer the function solution, as it could be easier to return default implementation or several socket families implementation.

This "factory function" is then an attribute of the Networking class, and could be replaced with a dedicated function / property.

If this pattern become more frequent for the Linux kernel stub implementation, we could have a "config-like" class containing several factories functions, or hooks.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the socket factory would be an attribute of Network ?

Using it would be something like :

class CustomFileDescriptorSocket(FileDescriptorSocket):
    def read(self, count):
        print("Chapeau pointu")

env = environment.LinuxEnvironment_x86_32()
env.network.socket_class = CustomFileDescriptorSocket

Is that correct ?


class FileSystem(object):
"""File system abstraction
Expand Down Expand Up @@ -365,6 +368,18 @@ def open_(self, path, flags, follow_link=True):
fdesc.blocks = (size + ((512 - (size % 512)) % 512)) // 512
return fd

def chmod(self, path, mode):
"""Stub for 'chmod' syscall"""
sb_path = self.resolve_path(path)
if os.path.exists(sb_path):
try:
os.chmod(sb_path, mode)
except (OverflowError, FileNotFoundError):
return -1
return 0
else:
return -1


class Networking(object):
"""Network abstraction"""
Expand Down
228 changes: 227 additions & 1 deletion miasm/os_dep/linux/syscall.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,193 @@ def _dump_struct_stat_arml(info):
return data


def sys_x86_32_execve(jitter, linux_env):
# int execve(const char *pathname, char *const argv[],
# char *const envp[]);
pathname_addr, argv_ptr, envp_addr = jitter.syscall_args_systemv(3)
pathname = jitter.get_c_str(pathname_addr)
argv = []
i = 0
argv_addr = jitter.vm.get_u32(argv_ptr)
while argv_addr != 0:
argv.append(jitter.get_c_str(argv_addr))
argv_ptr += 4
argv_addr = jitter.vm.get_u32(argv_ptr)
envp = []
i = 0
while envp_addr != 0:
envp.append(jitter.get_c_str(envp_addr))
i += 4
envp_addr = jitter.vm.get_u32(jitter.cpu.EDX+i)
log.debug("sys_execve(%s, [%s], [%s])", pathname,
", ".join(argv), ", ".join(envp))
jitter.syscall_ret_systemv(0)


def sys_x86_64_execve(jitter, linux_env):
# int execve(const char *pathname, char *const argv[],
# char *const envp[]);
# TODO : merge that into a generic execve
pathname_addr, argv_ptr, envp_addr = jitter.syscall_args_systemv(3)
pathname = jitter.get_c_str(pathname_addr)
argv = []
i = 0
argv_addr = jitter.vm.get_u64(argv_ptr)
while argv_addr != 0:
argv.append(jitter.get_c_str(argv_addr))
argv_ptr += 8
argv_addr = jitter.vm.get_u64(argv_ptr)
envp = []
i = 0
while envp_addr != 0:
envp.append(jitter.get_c_str(envp_addr))
i += 8
envp_addr = jitter.vm.get_u64(jitter.cpu.EDX+i)
log.debug("sys_execve(%s, [%s], [%s])", pathname,
", ".join(argv), ", ".join(envp))
jitter.syscall_ret_systemv(0)


def sys_x86_32_socket(jitter, linux_env):
# int socketcall(int call, unsigned long *args)
# Redirect to several other socket syscalls
# https://github.com/torvalds/linux/blob/master/include/uapi/linux/net.h
SOCKET_CALLS = {
1: "SYS_SOCKET",
2: "SYS_BIND",
3: "SYS_CONNECT",
4: "SYS_LISTEN",
5: "SYS_ACCEPT",
14: "SYS_SETSOCKOPT"
}

SOCKET_DOMAINS = {
0: "AF_UNSPEC",
1: "AF_UNIX",
2: "AF_INET",
3: "AF_AX25",
4: "AF_IPX",
5: "AF_APPLETALK",
6: "AF_NETROM",
7: "AF_BRIDGE",
8: "AF_AAL5",
9: "AF_X25", #Who cares ?
10: "AF_INET6",
11: "AF_MAX"
}

SOCKET_TYPE = {
1: "SOCK_STREAM",
2: "SOCK_DGRAM",
3: "SOCK_RAW"
}

if jitter.cpu.EBX not in SOCKET_CALLS.keys():
raise NotImplemented("SysCall Not Implemented")
if SOCKET_CALLS[jitter.cpu.EBX] == "SYS_SOCKET":
# int socket(int domain, int type, int protocol);
domain = jitter.vm.get_u32(jitter.cpu.ESP)
stype = jitter.vm.get_u32(jitter.cpu.ESP+4)
proto = jitter.vm.get_u32(jitter.cpu.ESP+8)
fd = linux_env.socket(domain, stype, proto)
log.debug("socket(%s, %s, %s)", SOCKET_DOMAINS[domain],
SOCKET_TYPE[stype], proto)
jitter.syscall_ret_systemv(fd)
elif SOCKET_CALLS[jitter.cpu.EBX] == "SYS_BIND":
# int bind(int sockfd, const struct sockaddr *addr,
# socklen_t addrlen);
fd = jitter.vm.get_u32(jitter.cpu.ESP)
socklen = jitter.vm.get_u32(jitter.cpu.ESP+8)
try:
sockaddr = jitter.vm.get_mem(
jitter.vm.get_u32(jitter.cpu.ESP+4),
socklen)
except RuntimeError:
# Not the exact size because shellcodes won't provide the full struct
sockaddr = jitter.vm.get_mem(jitter.vm.get_u32(jitter.cpu.ESP+4), 8)
family = struct.unpack("H", sockaddr[0:2])[0]
if family == 2:
# IPv4
port = struct.unpack(">H", sockaddr[2:4])[0]
ip = ".".join([str(i) for i in struct.unpack("BBBB", sockaddr[4:8])])
log.debug("socket_bind(fd, [%s, %i, %s], %i)", "AF_INET",
port, ip, socklen)
elif family == 10:
# IPv6
port = struct.unpack(">H", sockaddr[2:4])[0]
ip = ".".join([str(i) for i in struct.unpack("B"*16, sockaddr[8:24])])
log.debug("socket_bind(fd, [%s, %i, %s], %i)", "AF_INET6",
port, ip, socklen)
else:
log.debug("socket_bind(fd, sockaddr, socklen_t)")
jitter.syscall_ret_systemv(0)
elif SOCKET_CALLS[jitter.cpu.EBX] == "SYS_CONNECT":
# int connect(int sockfd, const struct sockaddr *addr,
# socklen_t addrlen);
fd = jitter.vm.get_u32(jitter.cpu.ESP)
socklen = jitter.vm.get_u32(jitter.cpu.ESP+8)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the socklen instead of a fixed length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases, a shellcode would not have a full sockaddr struct but just the needed fields, what I have done in the next commit is getting the socklen and if it fails, I only get the first 8 bytes. Is that an ok trick to have it work ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, maybe we should behave like the kernel does so we will be close to a real environment.
If the kernel is ok with semi structures, maybe your patch is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a first get_mem for the full structure and fallback to 8 bytes if the memory if not that large, it should be close to what the kernel is doing I guess

sockaddr = jitter.vm.get_mem(
jitter.vm.get_u32(jitter.cpu.ESP+4),
socklen)
except RuntimeError:
# Not the exact size because shellcodes won't provide the full struct
sockaddr = jitter.vm.get_mem(jitter.vm.get_u32(jitter.cpu.ESP+4), 8)
family = struct.unpack("H", sockaddr[0:2])[0]
if family == 2:
# IPv4
port = struct.unpack(">H", sockaddr[2:4])[0]
ip = ".".join([str(i) for i in struct.unpack("BBBB", sockaddr[4:8])])
log.debug("socket_connect(fd, [%s, %i, %s], %i)", "AF_INET",
port, ip, socklen)
elif family == 10:
port = struct.unpack(">H", sockaddr[2:4])[0]
ip = ".".join([str(i) for i in struct.unpack("B"*16, sockaddr[8:24])])
log.debug("socket_connect(fd, [%s, %i, %s], %i)", "AF_INET6",
port, ip, socklen)
else:
log.debug("socket_connect(fd, sockaddr, socklen)")
jitter.syscall_ret_systemv(0)
Te-k marked this conversation as resolved.
Show resolved Hide resolved
elif SOCKET_CALLS[jitter.cpu.EBX] == "SYS_LISTEN":
# int listen(int sockfd, int backlog);
sockfd = jitter.vm.get_u32(jitter.cpu.ESP)
backlog = jitter.vm.get_u32(jitter.cpu.ESP+4)
log.debug("socket_listen(%x, %x)", sockfd, backlog)
jitter.syscall_ret_systemv(0)
elif SOCKET_CALLS[jitter.cpu.EBX] == "SYS_ACCEPT":
# int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
sockfd = jitter.vm.get_u32(jitter.cpu.ESP)
sockaddr = jitter.vm.get_u32(jitter.cpu.ESP+4)
addrlen = jitter.vm.get_u32(jitter.cpu.ESP+8)
log.debug("socket_accept(%x, %x, %x)", sockfd, sockaddr, addrlen)
jitter.syscall_ret_systemv(0)
elif SOCKET_CALLS[jitter.cpu.EBX] == "SYS_SETSOCKOPT":
# SYS_SETSOCKOPT
# int setsockopt(int sockfd, int level, int optname,
# const void *optval, socklen_t optlen);
sockfd = jitter.vm.get_u32(jitter.cpu.ESP)
level = jitter.vm.get_u32(jitter.cpu.ESP+4)
optname = jitter.vm.get_u32(jitter.cpu.ESP+8)
optval_addr = jitter.vm.get_u32(jitter.cpu.ESP+12)
optlen = jitter.vm.get_u32(jitter.cpu.ESP+16)
log.debug("socket_setsockopt(%x, %x, %x, %x, %x)", sockfd, level, optname,
optval_addr, optlen)
jitter.syscall_ret_systemv(0)
else:
raise NotImplemented()


def sys_generic_chmod(jitter, linux_env):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could really apply the chmod on the file located in the file sandbox file_sb ?

# int chmod(const char *pathname, mode_t mode);
path_addr, mode = jitter.syscall_args_systemv(2)
pathname = jitter.get_c_str(path_addr)
log.debug("sys_chmod(%s, %x)", pathname, mode)

# Stub
ret = linux_env.filesystem.chmod(pathname, mode)
jitter.syscall_ret_systemv(ret)


def sys_x86_64_rt_sigaction(jitter, linux_env):
# Parse arguments
sig, act, oact, sigsetsize = jitter.syscall_args_systemv(4)
Expand Down Expand Up @@ -531,7 +718,12 @@ def sys_generic_write(jitter, linux_env):

# Stub
data = jitter.vm.get_mem(buf, count)
jitter.syscall_ret_systemv(linux_env.write(fd, data))
r = linux_env.write(fd, data)
if r is None:
log.debug("-> write : failed")
jitter.syscall_ret_systemv(-1)
else:
jitter.syscall_ret_systemv(r)


def sys_x86_64_getdents(jitter, linux_env):
Expand Down Expand Up @@ -614,6 +806,13 @@ def sys_x86_64_newlstat(jitter, linux_env):
jitter.cpu.RAX = 0


def sys_generic_exit(jitter, linux_env):
# void exit(int status);
status, = jitter.syscall_args_systemv(1)
log.debug("sys_exit(%i)", status)
jitter.run = False


def sys_arml_lstat64(jitter, linux_env):
# Parse arguments
filename = jitter.cpu.R0
Expand Down Expand Up @@ -821,6 +1020,21 @@ def sys_generic_setgid(jitter, linux_env):
jitter.syscall_ret_systemv(0)


def sys_generic_setreuid(jitter, linux_env):
# Parse arguments
ruid, euid = jitter.syscall_args_systemv(2)
log.debug("sys_setreuid(%x, %x)", ruid, euid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use the current Linux env uid/euid/gid of the linux env here?


# WARNING : no privilege check here
# Easy privilege escalation, just ask
if ruid > -1:
linux_env.user_uid = ruid
if euid > -1:
linux_env.user_euid = euid

jitter.syscall_ret_systemv(0)


def sys_generic_setuid(jitter, linux_env):
# Parse arguments
uid, = jitter.syscall_args_systemv(1)
Expand Down Expand Up @@ -899,7 +1113,16 @@ def sys_arml_gettimeofday(jitter, linux_env):


syscall_callbacks_x86_32 = {
0x1: sys_generic_exit,
0x3: sys_generic_read,
0x4: sys_generic_write,
0x5: sys_generic_open,
0xB: sys_x86_32_execve,
0xF: sys_generic_chmod,
0x46: sys_generic_setreuid,
0x66: sys_x86_32_socket,
0x7A: sys_x86_32_newuname,
0x7D: sys_generic_mprotect,
}


Expand All @@ -925,15 +1148,18 @@ def sys_arml_gettimeofday(jitter, linux_env):
0x27: sys_x86_64_getpid,
0x29: sys_x86_64_socket,
0x2A: sys_x86_64_connect,
0x3B: sys_x86_64_execve,
0x3F: sys_x86_64_newuname,
0x48: sys_generic_fcntl64,
0x4E: sys_x86_64_getdents,
0x59: sys_x86_64_readlink,
0x5A: sys_generic_chmod,
0x63: sys_x86_64_sysinfo,
0x66: sys_generic_getuid,
0x68: sys_generic_getgid,
0x6B: sys_generic_geteuid,
0x6C: sys_generic_getegid,
0x71: sys_generic_setreuid,
0xE4: sys_x86_64_clock_gettime,
0x89: sys_x86_64_statfs,
0x9E: sys_x86_64_arch_prctl,
Expand Down