Quark Method Reference
quark.core.quark.py
The module provides methods to perform the five-stage checking and Quark report generating.
Method |
Function |
---|---|
Find children under a specified parent of a method. |
|
Find the intersection between two sets of methods. |
|
Find the intersection between the parents of two sets of methods. |
|
Find methods in the specified class or its subclasses that match the method name and descriptor. |
|
Evaluate the executions of opcodes in the target method and return the usage of registers. |
|
Check if the two methods use the same parameter. |
|
Check for the usage of the same parameter between two lists of methods. |
|
Check if the parameter values match the specified patterns and keywords. |
|
Check if the mutual parent of the two methods calls any first method before any second method. |
|
Check the APK at five levels to analyze if it meets the rules. |
|
Get a report of the analysis of the APK in JSON format. |
|
Generate details of the analysis of the APK according to a rule. |
|
Add the analysis of the APK according to a rule to the summary report. |
|
Show the summary report of the APK. |
|
Show the tabular report summarizing the statistical information of the APK. |
|
Show the report summarizing the result of the five-level check and the confidence of the APK. |
|
Show all call graphs of the APK. |
|
Show rule classification data in a table, a graphic, or JSON format. |
find_previous_method
The algorithm of find_previous_method
The find_previous_method method uses a DFS algorithm to collect all MethodObjects called by the parent_method and add them to the specified wrapper. The search starts from the base_method and goes on recursively until there are no more levels or all candidates have been processed.
1. Initialize an empty set "visited_methods" if it is not provided.
2. Get a set "method_set" using "self.apkinfo.upperfunc(base_method)".
3. Add "base_method" to the "visited_methods" set.
4. If "method_set" is not None then check if "parent_function" is in "method_set".
- If yes, append "base_method" to "wrapper".
- If no, then iterate through each item in "method_set".
- If the item is in "visited_methods", skip it and continue to the next item.
- If not, call "find_previous_method" again with the current item, "parent_function", "wrapper", and "visited_methods".
The code of find_previous_method
def find_previous_method(
self, base_method, parent_function, wrapper, visited_methods=None
):
"""
Find the method under the parent function, based on base_method before to parent_function.
This will append the method into wrapper.
:param base_method: the base function which needs to be searched.
:param parent_function: the top-level function which calls the basic function.
:param wrapper: list is used to track each function.
:param visited_methods: set with tested method.
:return: None
"""
if visited_methods is None:
visited_methods = set()
method_set = self.apkinfo.upperfunc(base_method)
visited_methods.add(base_method)
if method_set is not None:
if parent_function in method_set:
wrapper.append(base_method)
else:
for item in method_set:
# prevent to test the tested methods.
if item in visited_methods:
continue
self.find_previous_method(
item, parent_function, wrapper, visited_methods
)
find_intersection
The algorithm of find_intersection
The find_intersection
method takes in two sets, first_method_set
and second_method_set
, and finds their intersection using a recursive search algorithm.
Here is the process of find_intersection
。
1. Check that the input sets are not empty.
If one of the sets is empty, raise a ValueError.
2. Use the & operator to find the intersection of the two sets.
If the intersection is not empty, return the resulting set.
3. If the intersection is empty, call the method_recursive_search
function with the input sets and a specified maximum depth.
4. The method_recursive_search function recursively searches for
the intersection of the two input sets up to the specified depth
by splitting the sets into subsets and comparing each subset's elements.
- If the intersection is found, return the resulting set.
- Otherwise, return None.
The code of find_intersection
def find_intersection(self, first_method_set, second_method_set, depth=1):
"""
Find the first_method_list ∩ second_method_list.
[MethodAnalysis, MethodAnalysis,...]
:param first_method_set: first list that contains each MethodAnalysis.
:param second_method_set: second list that contains each MethodAnalysis.
:param depth: maximum number of recursive search functions.
:return: a set of first_method_list ∩ second_method_list or None.
"""
# Check both lists are not null
if not first_method_set or not second_method_set:
raise ValueError("Set is Null")
# find ∩
result = first_method_set & second_method_set
if result:
return result
else:
return self.method_recursive_search(
depth, first_method_set, second_method_set
)
method_recursive_search
The algorithm of method_recursive_search
The method_recursive_search
algorithm finds the intersection between
two sets of methods. Specifically, the algorithm expands each set by
recursively adding their respective upper-level method objects until it
finds an intersection or the depth reaches MAX_SEARCH_LAYER
.
Here is the process of method_recursive_search
.
1. The method_recursive_search function takes three arguments:
- depth, first_method_set, and second_method_set
2. If the depth+1 > MAX_SEARCH_LAYER, return None.
3. Create next_level_set_1 and next_level_set_2 that are the copies of first_method_set and second_method_set, respectively.
4. Expand next_level_set_1 and next_level_set_2 by adding their respective upper-level methods.
5. Calls find_intersection with the next_level_set_1, next_level_set_2 and depth+1 as arguments recursively.
- If an intersection is found, return the result.
- If no intersection is found, continue searching until depth > MAX_SEARCH_LAYER.
The code of method_recursive_search
def method_recursive_search(
self, depth, first_method_set, second_method_set
):
# Not found same method usage, try to find the next layer.
depth += 1
if depth > MAX_SEARCH_LAYER:
return None
# Append first layer into next layer.
next_level_set_1 = first_method_set.copy()
next_level_set_2 = second_method_set.copy()
# Extend the xref from function into next layer.
for method in first_method_set:
if self.apkinfo.upperfunc(method):
next_level_set_1 = (
self.apkinfo.upperfunc(method) | next_level_set_1
)
for method in second_method_set:
if self.apkinfo.upperfunc(method):
next_level_set_2 = (
self.apkinfo.upperfunc(method) | next_level_set_2
)
return self.find_intersection(
next_level_set_1, next_level_set_2, depth
)
find_api_usage
The algorithm of find_api_usage
find_api_usage
searches for methods with method_name
and descriptor_name
, that belong to either the class_name
or its subclass. It returns a list that contains matching methods.
Here is the process of find_api_usage
.
1. Initialize an empty "method_list".
2. Search for an exact match of the method by its "class_name", "method_name", and "descriptor_name".
- If found, return a list with the matching methods.
3. Create a list of potential methods with matching "method_name" and "descriptor_name".
4. Filter the list of potential methods to include only those with bytecodes.
5. Check if the class of each potential method is a subclass of the given "class_name".
- If yes, add the method to "method_list".
6. Return "method_list".
Here is the flowchart of find_api_usage
.
The code of find_api_usage
def find_api_usage(self, class_name, method_name, descriptor_name):
method_list = []
# Source method
source_method = self.apkinfo.find_method(
class_name, method_name, descriptor_name
)
if source_method:
return [source_method]
# Potential Method
potential_method_list = [
method
for method in self.apkinfo.all_methods
if method.name == method_name
and method.descriptor == descriptor_name
]
potential_method_list = [
method
for method in potential_method_list
if not next(self.apkinfo.get_method_bytecode(method), None)
]
# Check if each method's class is a subclass of the given class
for method in potential_method_list:
current_class_set = {method.class_name}
while current_class_set and not current_class_set.intersection(
{class_name, "Ljava/lang/Object;"}
):
next_class_set = set()
for clazz in current_class_set:
next_class_set.update(
self.apkinfo.superclass_relationships[clazz]
)
current_class_set = next_class_set
current_class_set.discard("Ljava/lang/Object;")
if current_class_set:
method_list.append(method)
return method_list
_evaluate_method
The algorithm of _evaluate_method
The _evaluate_method
method evaluates the execution of opcodes in the target method and returns a matrix representing the usage of each involved register. The method takes one parameter, method, which is the method to be evaluated.
Here is the process of _evaluate_method
.
1. Create a PyEval object with the apkinfo attribute of the instance. PyEval is presumably
a class that handles the evaluation of opcodes.
2. Loop through the bytecode objects in the target method by calling the get_method_bytecode
method of the apkinfo attribute.
3. Extract the mnemonic (opcode), registers, and parameter from the bytecode_obj and create
an instruction list containing these elements.
4. Convert all elements of the instruction list to strings (in case there are MUTF8String objects).
5. Check if the opcode (the first element of instruction) is in the eval dictionary of the pyeval object.
- If it is, call the corresponding function with the instruction as its argument.
6. Once the loop is finished, call the show_table method of the pyeval object to return the
matrix representing the usage of each involved register.
Here is the flowchart of _evaluate_method
.
The code of _evaluate_method
def _evaluate_method(self, method) -> List[List[str]]:
"""
Evaluate the execution of the opcodes in the target method and return
the usage of each involved register.
:param method: Method to be evaluated
:return: Matrix that holds the usage of the registers
"""
pyeval = PyEval(self.apkinfo)
for bytecode_obj in self.apkinfo.get_method_bytecode(method):
# ['new-instance', 'v4', Lcom/google/progress/SMSHelper;]
instruction = [bytecode_obj.mnemonic]
if bytecode_obj.registers is not None:
instruction.extend(bytecode_obj.registers)
if bytecode_obj.parameter is not None:
instruction.append(bytecode_obj.parameter)
# for the case of MUTF8String
instruction = [str(x) for x in instruction]
if instruction[0] in pyeval.eval.keys():
pyeval.eval[instruction[0]](instruction)
return pyeval.show_table()
check_parameter_on_single_method
The algorithm of check_parameter_on_single_method
The check_parameter_on_single_method
function checks whether two methods use the same parameter.
Here is the process of check_parameter_on_single_method
.
1. Define a method named check_parameter_on_single_method, which takes 5 parameters:
* self: a reference to the current object, indicating that this method is defined in a class
* usage_table: a table for storing the usage of called functions
* first_method: the first API or the method calling the first API
* second_method: the second API or the method calling the second API
* keyword_item_list: a list of keywords used to determine if the parameter meets specific conditions
2. Define a Boolean variable regex, which is set to False by default.
3. Obtain the patterns of first_method and second_method based on the given input, and store them in
first_method_pattern and second_method_pattern, respectively.
4. Define a generator matched_records. Use the filter function to filter register_usage_records to
include only those matched records used by both first_method and second_method.
5. Use a for loop to process the matched records one by one.
6. Call method check_parameter_values to check if the matched records contain keywords in keyword_item_list.
- If True, add matched keywords to matched_keyword_list.
- If False, leave matched_keyword_list empty.
7. Use yield to return the matched record and matched_keyword_list. This method is a generator that processes
data and returns results at the same time.
Here is the flowchart of check_parameter_on_single_method
.
The code of check_parameter_on_single_method
def check_parameter_on_single_method(
self,
usage_table,
first_method,
second_method,
keyword_item_list=None,
regex=False,
) -> Generator[Tuple[str, List[str]], None, None]:
"""Check the usage of the same parameter between two method.
:param usage_table: the usage of the involved registers
:param first_method: the first API or the method calling the first APIs
:param second_method: the second API or the method calling the second
APIs
:param keyword_item_list: keywords required to be present in the usage
, defaults to None
:param regex: treat the keywords as regular expressions, defaults to
False
:yield: _description_
"""
first_method_pattern = PyEval.get_method_pattern(
first_method.class_name, first_method.name, first_method.descriptor
)
second_method_pattern = PyEval.get_method_pattern(
second_method.class_name,
second_method.name,
second_method.descriptor,
)
register_usage_records = (
c_func
for table in usage_table
for val_obj in table
for c_func in val_obj.called_by_func
)
matched_records = filter(
lambda r: first_method_pattern in r and second_method_pattern in r,
register_usage_records,
)
for record in matched_records:
if keyword_item_list and list(keyword_item_list):
matched_keyword_list = self.check_parameter_values(
record,
(first_method_pattern, second_method_pattern),
keyword_item_list,
regex,
)
if matched_keyword_list:
yield (record, matched_keyword_list)
else:
yield (record, None)
check_parameter
The algorithm of check_parameter
The function check_parameter
is designed to check for the usage of the same parameter between two methods.
Here is the process of check_parameter
.
1. Check if parent_function, first_method_list or second_method_list is None.
- If True, raise a TypeError exception.
2. Check if the keyword_item_list parameter exists and has elements.
- If False, set keyword_item_list to None.
3. Initialize the state variable to False.
4. Evaluate the opcode of the parent_function by calling self._evaluate_method and store the result to usage_table.
5. Iterate through the combinations of methods from the first_method_list and second_method_list.
6. Call self.check_parameter_on_single_method with usage_table to check if the two methods use the same parameters.
- If True,
- Record the corresponding call graph analysis.
- Record the mapping between the parent function and the wrapper method.
- Set the state variable to True.
7. Once the iteration finishes, return the state variable.
Here is the flowchart of check_parameter
.
The code of check_parameter
def check_parameter(
self,
parent_function,
first_method_list,
second_method_list,
keyword_item_list=None,
regex=False,
):
"""
Check the usage of the same parameter between two method.
:param parent_function: function that call the first function and
second functions at the same time.
:param first_method_list: function which calls before the second
method.
:param second_method_list: function which calls after the first method.
:return: True or False
"""
if parent_function is None:
raise TypeError("Parent function is None.")
if first_method_list is None or second_method_list is None:
raise TypeError("First or second method list is None.")
if keyword_item_list:
keyword_item_list = list(keyword_item_list)
if not any(keyword_item_list):
keyword_item_list = None
state = False
# Evaluate the opcode in the parent function
usage_table = self._evaluate_method(parent_function)
# Check if any of the target methods (the first and second methods)
# used the same registers.
state = False
for first_call_method in first_method_list:
for second_call_method in second_method_list:
result_generator = self.check_parameter_on_single_method(
usage_table,
first_call_method,
second_call_method,
keyword_item_list,
regex,
)
found = next(result_generator, None) is not None
# Build for the call graph
if found:
call_graph_analysis = {
"parent": parent_function,
"first_call": first_call_method,
"second_call": second_call_method,
"apkinfo": self.apkinfo,
"first_api": self.quark_analysis.first_api,
"second_api": self.quark_analysis.second_api,
"crime": self.quark_analysis.crime_description,
}
self.quark_analysis.call_graph_analysis_list.append(
call_graph_analysis
)
# Record the mapping between the parent function and the
# wrapper method
self.quark_analysis.parent_wrapper_mapping[
parent_function.full_name
] = self.apkinfo.get_wrapper_smali(
parent_function,
first_call_method,
second_call_method,
)
state = True
return state
check_parameter_values
The algorithm of check_parameter_values
The function check_parameter_values
is designed to check if the parameter values in the source string match the specified patterns and keywords. Then, it collects the matched strings into a set and returns it.
Here is the process of check_parameter_values
.
1. Create an empty set matched_string_set.
2. Use tools.get_parenthetic_contents to extract the content that matches each pattern in the pattern_list from the source_str. Store the results in the parameter_strs list.
3. Use zip to pair up the parameter_strs and keyword_item_list and iterate over them.
4. For each pairing of parameter_str and keyword_item, perform the following operations:
- Check if keyword_item is not None.
- For each keyword in keyword_item, perform the following operations:
- Check If regex is True,
- If True,
- Use re.findall to search for matching strings and store them in matched_strings.
- Check if matched_strings has any matching strings.
- If True, Add all nonempty strings from matched_strings to the matched_string_set.
- If False, add all keywords in parameter_str to the matched_string_set.
5. Once the iteration finishes, return a list of strings from the matched_string_set, which represents all the matched results.
Here is the flowchart of check_parameter_values
.
The code of check_parameter_values
@staticmethod
def check_parameter_values(
source_str, pattern_list, keyword_item_list, regex=False
) -> List[str]:
matched_string_set = set()
parameter_strs = [
tools.get_parenthetic_contents(
source_str, source_str.index(pattern) + len(pattern)
)
for pattern in pattern_list
]
for parameter_str, keyword_item in zip(
parameter_strs, keyword_item_list
):
if keyword_item is None:
continue
for keyword in keyword_item:
if regex:
matched_strings = re.findall(keyword, parameter_str)
if any(matched_strings):
matched_strings = filter(bool, matched_strings)
matched_strings = list(matched_strings)
element = matched_strings[0]
if isinstance(
element, collections.abc.Sequence
) and not isinstance(element, str):
for str_list in matched_strings:
matched_string_set.update(str_list)
else:
matched_string_set.update(matched_strings)
else:
if str(keyword) in parameter_str:
matched_string_set.add(keyword)
return [e for e in list(matched_string_set) if bool(e)]
check_sequence
The algorithm of check_sequence
The function check_sequence
checks if mutual_parent
calls any first method before any second method. If Yes, check_sequence
records the mapping between mutual_parent
and the matched methods and returns True.
Here is the process of check_sequence
.
1. Initialize the variable state as False.
2. Iterate the method pairs formed by first_method_list and second_method_list.
3. From mutual_parent, find method calls that call any method in the pair. Then collect them into the list seq_table.
4. Check if the length of seq_table is less than 2.
- If True, continue to the next iteration.
5. Sort seq_table according to the offsets of the method calls. Then name the sorted list as method_list_need_check.
6. Check if the method pair is a sublist of method_list_need_check.
- If True,
- Set state to True.
- Record the mapping between mutual_parent and the method pair in quark_analysis.
7. Return state.
Here is the flowchart of check_sequence
.
The code of check_sequence
def check_sequence(
self, mutual_parent, first_method_list, second_method_list
):
"""
Check if the first function appeared before the second function.
:param mutual_parent: function that call the first function and second functions at the same time.
:param first_method_list: the first show up function, which is a MethodAnalysis
:param second_method_list: the second show up function, which is a MethodAnalysis
:return: True or False
"""
state = False
for first_call_method in first_method_list:
for second_call_method in second_method_list:
seq_table = [
(call, number)
for call, number in self.apkinfo.lowerfunc(mutual_parent)
if call in (first_call_method, second_call_method)
]
# sorting based on the value of the number
if len(seq_table) < 2:
# Not Found sequence in same_method
continue
seq_table.sort(key=operator.itemgetter(1))
# seq_table would look like: [(getLocation, 1256), (sendSms, 1566), (sendSms, 2398)]
method_list_need_check = [x[0] for x in seq_table]
sequence_pattern_method = [
first_call_method,
second_call_method,
]
if tools.contains(
sequence_pattern_method, method_list_need_check
):
state = True
# Record the mapping between the parent function and the wrapper method
self.quark_analysis.parent_wrapper_mapping[
mutual_parent.full_name
] = self.apkinfo.get_wrapper_smali(
mutual_parent, first_call_method, second_call_method
)
return state
run
The algorithm of run
The function run
checks the APK file at five levels to analyze whether it meets the rules.
Here is the process of run
.
1. Clean the results of the previous analysis.
2. Store the 'crime' description in the analysis result
3. Level 1 Check: Permission requested
- Check if the input file is a DEX file.
- If Yes, set the first item of check_item in rule_obj to True.
- If No, check if the permissions of the APK include the permissions in the rule.
- If Yes, set the first item of check_item to True.
- If No, the function exits.
4. Level 2 Check: Native API call
- Check if the APK uses any of the two native APIs in the rule.
- If Yes, set the second item of check_item to True and store information about the calls of the two native APIs in the analysis result.
- If No, the function exits.
5. Level 3 Check: Certain combination of native API
- Check if the APK uses both native APIs in the rule.
- If Yes, set the third item of check_item to True and store the calls of the two native APIs in the analysis result.
- If No, the function exits.
6. Level 4 Check: Calling sequence of native API
- Check if there are any mutual parent functions between each combined API call of the two native APIs
- If Yes, check if any mutual parent function calls the first method before the second method.
- If Yes, set the fourth item of check_item to True and store information about the parent functions in the analysis result.
- If No, the function exits.
7. Level 5 Check: APIs that handle the same register
- Check if the native APIs in the rule handle the same registers.
- If Yes, set the fifth item of check_item to True and store the parent functions in the analysis result.
- If No, the function exits.
Here is the flowchart of run
.
The code of run
def run(self, rule_obj):
"""
Run the five levels check to get the y_score.
:param rule_obj: the instance of the RuleObject.
:return: None
"""
self.quark_analysis.clean_result()
self.quark_analysis.crime_description = rule_obj.crime
# Level 1: Permission Check
if self.apkinfo.ret_type == "DEX":
rule_obj.check_item[0] = True
elif set(rule_obj.permission).issubset(set(self.apkinfo.permissions)):
rule_obj.check_item[0] = True
else:
# Exit if the level 1 stage check fails.
return
# Level 2: Single Native API Check
api_1_method_name = rule_obj.api[0]["method"]
api_1_class_name = rule_obj.api[0]["class"]
api_1_descriptor = rule_obj.api[0]["descriptor"]
api_2_method_name = rule_obj.api[1]["method"]
api_2_class_name = rule_obj.api[1]["class"]
api_2_descriptor = rule_obj.api[1]["descriptor"]
first_api_list = self.find_api_usage(
api_1_class_name, api_1_method_name, api_1_descriptor
)
second_api_list = self.find_api_usage(
api_2_class_name, api_2_method_name, api_2_descriptor
)
if not first_api_list and not second_api_list:
# Exit if the level 2 stage check fails.
return
else:
rule_obj.check_item[1] = True
if first_api_list:
self.quark_analysis.level_2_result.append(first_api_list[0])
if second_api_list:
self.quark_analysis.level_2_result.append(second_api_list[0])
# Level 3: Both Native API Check
if not (first_api_list and second_api_list):
# Exit if the level 3 stage check fails.
return
self.quark_analysis.first_api = first_api_list[0]
self.quark_analysis.second_api = second_api_list[0]
rule_obj.check_item[2] = True
self.quark_analysis.level_3_result = [set(), set()]
# Level 4: Sequence Check
for first_api in first_api_list:
for second_api in second_api_list:
# Looking for the first layer of the upper function
first_api_xref_from = self.apkinfo.upperfunc(first_api)
second_api_xref_from = self.apkinfo.upperfunc(second_api)
self.quark_analysis.level_3_result[0].update(
first_api_xref_from
)
self.quark_analysis.level_3_result[1].update(
second_api_xref_from
)
if not first_api_xref_from:
print_warning(
f"Unable to find the upperfunc of {first_api}"
)
continue
if not second_api_xref_from:
print_warning(
f"Unable to find the upperfunc of{second_api}"
)
continue
mutual_parent_function_list = self.find_intersection(
first_api_xref_from, second_api_xref_from
)
if mutual_parent_function_list is None:
# Exit if the level 4 stage check fails.
return
for parent_function in mutual_parent_function_list:
first_wrapper = []
second_wrapper = []
self.find_previous_method(
first_api, parent_function, first_wrapper
)
self.find_previous_method(
second_api, parent_function, second_wrapper
)
if self.check_sequence(
parent_function, first_wrapper, second_wrapper
):
rule_obj.check_item[3] = True
self.quark_analysis.level_4_result.append(
parent_function
)
keyword_item_list = (
rule_obj.api[i].get("keyword", None)
for i in range(2)
)
# Level 5: Handling The Same Register Check
if self.check_parameter(
parent_function,
first_wrapper,
second_wrapper,
keyword_item_list=keyword_item_list,
):
rule_obj.check_item[4] = True
self.quark_analysis.level_5_result.append(
parent_function
)
get_json_report
The algorithm of get_json_report
The function get_json_report
generates a report of the analysis performed on the APK file, in JSON format.
Here is the process of get_json_report
.
1. Create a Weight object with the total score and weight from the analysis result.
2. Calculate the threat level with the Weight object and store the result in the variable warning.
3. Loop through a list of threat levels and check if the variable warning contains any of the threat levels.
- If Yes, sets the variable warning to the threat level.
4. Return a report with various pieces of information:
- The MD5 hash of the APK
- The filename of the APK
- The file size of the APK
- The threat level of the APK
- The total score of the analysis result
- The JSON report of the analysis result
Here is the flowchart of get_json_report
.
The code of get_json_report
def get_json_report(self):
"""
Get quark report including summary and detail with json format.
:return: json report
"""
w = Weight(
self.quark_analysis.score_sum, self.quark_analysis.weight_sum
)
warning = w.calculate()
# Filter out color code in threat level
for level in ["Low Risk", "Moderate Risk", "High Risk"]:
if level in warning:
warning = level
return {
"md5": self.apkinfo.md5,
"apk_filename": self.apkinfo.filename,
"size_bytes": self.apkinfo.filesize,
"threat_level": warning,
"total_score": self.quark_analysis.score_sum,
"crimes": self.quark_analysis.json_report,
}
generate_json_report
The algorithm of generate_json_report
The function generate_json_report
generates a JSON report based on the information extracted from the ruleobject instance .
Here is the process of generate_json_report
.
1. Calculate confidence percentage by counting the number of True values in check_item and multiplying by 20. Store the confidence value.
2. Count the True values in check_item and store the count as conf. Use conf to calculate the weight of the rule using the get_score method.
3. Assign the score attribute's value to the score variable.
4. Check the first item in check_item.
-If True, assign the permission attribute to permissions.
-Otherwise, assign an empty list.
5. Check the second item in check_item.
-If True, populate the API list with dictionaries from quark_analysis.level_2_result.
6. Check the third item in check_item.
-If True, assign the API attribute's value to the combination variable.
7. Define two empty lists:
-sequnce_show_up
-same_operation_show_up
8. Check if the fourth item in the check_item is True and the quark_analysis.level_4_result list is not empty.
-If True,
-populate the sequnce_show_up list with dictionaries containing full_name attributes and their corresponding values from quark_analysis.parent_wrapper_mapping.
-Check if the fifth item in the check_item is True and the quark_analysis.level_5_result list is not empty.
-If True, populate the same_operation_show_up list with dictionaries containing full_name attributes and their corresponding values from quark_analysis.parent_wrapper_mapping.
9. Create a dictionary called crime, containing the following attributes:
-rule:filename of rule in rule_obj
-crime:description of quark_analysis's crime in rule_obj
-label:the label in rule_obj
-score:the score in rule_obj
-weight:the weight in rule_obj
-confidence:the number of True values in check_item and multiplying by 20
-permissions:the permission in rule_obj
-native_api:list with dictionaries from quark_analysis.level_2_result
-combination:the value of the api attribute of rule_obj
-sequence:sequnce_show_up, information about the items in quark_analysis.level_4_result
-register:same_operation_show_up, information about the items in the quark_analysis.level_5_result
10. Append the crime dictionary to the json_report attribute of quark_analysis.
11. Add the weight to the weight_sum attribute of quark_analysis.
12. Add the score to the score_sum attribute of quark_analysis.
Here is the flowchart of generate_json_report
.
The code of generate_json_report
def generate_json_report(self, rule_obj):
"""
Show the json report.
:param rule_obj: the instance of the RuleObject
:return: None
"""
# Count the confidence
confidence = str(rule_obj.check_item.count(True) * 20) + "%"
conf = rule_obj.check_item.count(True)
weight = rule_obj.get_score(conf)
score = rule_obj.score
# Assign level 1 examine result
permissions = rule_obj.permission if rule_obj.check_item[0] else []
# Assign level 2 examine result
api = []
if rule_obj.check_item[1]:
for item2 in self.quark_analysis.level_2_result:
api.append(
{
"class": str(item2.class_name),
"method": str(item2.name),
"descriptor": str(item2.descriptor),
}
)
# Assign level 3 examine result
combination = []
if rule_obj.check_item[2]:
combination = rule_obj.api
# Assign level 4 - 5 examine result if exist
sequnce_show_up = []
same_operation_show_up = []
# Check examination has passed level 4
if self.quark_analysis.level_4_result and rule_obj.check_item[3]:
for item4 in self.quark_analysis.level_4_result:
sequnce_show_up.append(
{
item4.full_name: self.quark_analysis.parent_wrapper_mapping[
item4.full_name
]
}
)
# Check examination has passed level 5
if self.quark_analysis.level_5_result and rule_obj.check_item[4]:
for item5 in self.quark_analysis.level_5_result:
same_operation_show_up.append(
{
item5.full_name: self.quark_analysis.parent_wrapper_mapping[
item5.full_name
]
}
)
crime = {
"rule": rule_obj.rule_filename,
"crime": rule_obj.crime,
"label": rule_obj.label,
"score": score,
"weight": weight,
"confidence": confidence,
"permissions": permissions,
"native_api": api,
"combination": combination,
"sequence": sequnce_show_up,
"register": same_operation_show_up,
}
self.quark_analysis.json_report.append(crime)
# add the weight
self.quark_analysis.weight_sum += weight
# add the score
self.quark_analysis.score_sum += score
add_table_row
The algorithm of add_table_row
The function add_table_row
adds a list to the table.
Here is the process of add_table_row
.
1. The method add_row is then called with a list of parameters. This list includes:
name: filename of rule in rule_obj
rule_obj.crime: description of quark_analysis's crime in rule_obj
confidence: the number of True values in check_item and multiplying by 20
score: the score in rule_obj
weight: the weight in rule_obj
2. The add_row method takes these parameters and adds them as a new row in the summary_report_table.
Here is the flowchart of add_table_row
.
The code of add_table_row
def add_table_row(self, name, rule_obj, confidence, score, weight):
self.quark_analysis.summary_report_table.add_row(
[
name,
green(rule_obj.crime),
yellow(confidence),
score,
red(weight),
]
)
show_summary_report
The algorithm of show_summary_report
The function show_summary_report
generates a summary report.
Here is the process of show_summary_report
.
1. Calculate confidence by counting occurrences of True in rule_obj.check_item and multiplying it by 20 to get a percentage.
2. Calculate the weight using the confidence value through rule_obj.get_score, and retrieves score and rule_filename from rule_obj.
3. Check if a threshold is provided.
-If true, check if the confidence percentage is greater than or equal to the threshold.
-If true, calls add_table_row with relevant arguments.
-If false, calls add_table_row with relevant arguments.
4. Update the quark_analysis instance by adding the calculated weight and score to weight_sum and score_sum.
Here is the flowchart of show_summary_report
.
The code of show_summary_report
def show_summary_report(self, rule_obj, threshold=None):
"""
Show the summary report.
:param rule_obj: the instance of the RuleObject.
:return: None
"""
# Count the confidence
confidence = f"{rule_obj.check_item.count(True) * 20}%"
conf = rule_obj.check_item.count(True)
weight = rule_obj.get_score(conf)
score = rule_obj.score
name = rule_obj.rule_filename
if threshold:
if rule_obj.check_item.count(True) * 20 >= int(threshold):
self.add_table_row(name, rule_obj, confidence, score, weight)
else:
self.add_table_row(name, rule_obj, confidence, score, weight)
# add the weight
self.quark_analysis.weight_sum += weight
# add the score
self.quark_analysis.score_sum += score
show_label_report
The algorithm of show_label_report
The function show_label_report
generates a tabular report that summarizes statistical information.
Here is the process of show_label_report
.
1. Clear label_report_table and initializes label_desc.
2. Iterate through the all_labels dictionary.
3. Calculate the maximum, average, and standard deviation of the confidence values for each label.
4. Check if table_version is max.
- If true, set table header for table_version is max.
- If false, set table header for table_version is not max.
Here is the flowchart of show_label_report
.
The code of show_label_report
def show_label_report(self, rule_path, all_labels, table_version):
"""
Show the report based on label, last column represents max confidence for that label
:param rule_path: the path where may be present the file label_desc.csv.
:param all_labels: dictionary containing label:<array of confidence values associated to that label>
:return: None
"""
label_desc = {}
# clear table to manage max/detail version
self.quark_analysis.label_report_table.clear()
if os.path.isfile(os.path.join(rule_path, "label_desc.csv")):
# associate to each label a description
col_list = ["label", "description"]
# csv file on form <label,description>
# put this file in the folder of rules (it must not be a json file since it could create conflict with management of rules)
# remove temporarily
#df = pd.read_csv(
# os.path.join(rule_path, "label_desc.csv"), usecols=col_list
#)
#
#label_desc = dict(zip(df["label"], df["description"]))
for label_name in all_labels:
confidences = np.array(all_labels[label_name])
if table_version == "max":
self.quark_analysis.label_report_table.field_names = [
"Label",
"Description",
"Number of rules",
"MAX Confidence %",
]
self.quark_analysis.label_report_table.add_row(
[
green(label_name),
yellow(label_desc.get(label_name, "-")),
(len(confidences)),
red(np.max(confidences)),
]
)
else:
self.quark_analysis.label_report_table.field_names = [
"Label",
"Description",
"Number of rules",
"MAX Confidence %",
"AVG Confidence",
"Std Deviation",
"# of Rules with Confidence >= 80%",
]
self.quark_analysis.label_report_table.add_row(
[
green(label_name),
yellow(label_desc.get(label_name, "-")),
(len(confidences)),
red(np.max(confidences)),
magenta(round(np.mean(confidences), 2)),
lightblue(round(np.std(confidences), 2)),
lightyellow(np.count_nonzero(confidences >= 80)),
]
)
show_detail_report
The algorithm of show_detail_report
The function show_detail_report
prints a report summarizing the result of the five-level check and the confidence of the APK.
Here is the process of show_detail_report
.
1. Calculate the confidence of the APK by multiplying the number of passed levels by 20.
2. Check if the APK passed level 1.
- If passed, show the match permissions.
3. Check if the APK passed level 2.
- If passed, show the matched APIs.
4. Check if the APK passed level 3.
- If passed, show the matched API combinations.
5. Check if the APK passed level 4.
- If passed, show the matched API sequences.
6. Check if the APK passed level 5.
- If passed, show the matched API sequences that use the same register.
Here is the flowchart of show_detail_report
.
The code of show_detail_report
def show_detail_report(self, rule_obj):
"""
Show the detail report.
:param rule_obj: the instance of the RuleObject.
:return: None
"""
# Count the confidence
print("")
print(f"Confidence: {rule_obj.check_item.count(True) * 20}%")
print("")
if rule_obj.check_item[0]:
colorful_report("1.Permission Request")
for permission in rule_obj.permission:
print(f"\t\t {permission}")
if rule_obj.check_item[1]:
colorful_report("2.Native API Usage")
for api in self.quark_analysis.level_2_result:
print(f"\t\t {api.full_name}")
if rule_obj.check_item[2]:
colorful_report("3.Native API Combination")
for numbered_api, method_list in zip(
("First API", "Second API"), self.quark_analysis.level_3_result
):
print(f"\t\t {numbered_api} show up in:")
if method_list:
for comb_method in method_list:
print(f"\t\t {comb_method.full_name}")
else:
print("\t\t None")
if rule_obj.check_item[3]:
colorful_report("4.Native API Sequence")
print("\t\t Sequence show up in:")
for seq_method in self.quark_analysis.level_4_result:
print(f"\t\t {seq_method.full_name}")
if rule_obj.check_item[4]:
colorful_report("5.Native API Use Same Parameter")
for seq_operation in self.quark_analysis.level_5_result:
print(f"\t\t {seq_operation.full_name}")
show_call_graph
The algorithm of show_call_graph
The function show_call_graph
creates a call graph for each element in call_graph_analysis_list and displays messages to indicate progress.
Here is the process of show_call_graph
.
1. Display the message "Creating Call Graph..." in cyan color.
2. Create a call graph for each element in call_graph_analysis_list.
3. Display the message "Call Graph Completed" in green color.
Here is the flowchart of show_call_graph
.
The code of show_call_graph
def show_call_graph(self, output_format=None):
print_info("Creating Call Graph...")
for (
call_graph_analysis
) in self.quark_analysis.call_graph_analysis_list:
call_graph(call_graph_analysis, output_format)
print_success("Call Graph Completed")
show_rule_classification
The algorithm of show_rule_classification
The function show_rule_classification
extracts rule classification data, highlighting the links between parent functions and associated crimes. It then displays this data in table, JSON, and graphical formats.
Here is the process of show_rule_classification
.
1. Call the print_info function to display "Rules Classification".
2. Invoke the get_rule_classification_data function
- Use self.quark_analysis.call_graph_analysis_list and MAX_SEARCH_LAYER as parameters.
- Store the returned rule classification data in the data_bundle variable.
3. Call the output_parent_function_table function to display tables on the console.
- The first column of the table is "Parent Function", and the second column displays the name of that parent function.
- Subsequent rows list the "Crime Description" associated with that parent function.
4. Call the output_parent_function_json function to output a JSON file named "rules_classification.json".
- The structure of the file is a list containing multiple dictionaries. Each dictionary has two keys: parent and crime.
5. Call the output_parent_function_graph function to create a PNG format graphic file.
- This graphic displays the reference relationships between parent functions and the crime descriptions associated with each parent function.
Here is the flowchart of show_rule_classification
.
The code of show_rule_classification
def show_rule_classification(self):
print_info("Rules Classification")
data_bundle = get_rule_classification_data(
self.quark_analysis.call_graph_analysis_list, MAX_SEARCH_LAYER
)
output_parent_function_table(data_bundle)
output_parent_function_json(data_bundle)
output_parent_function_graph(data_bundle)
quark.utils.graph.py
wrapper_lookup
The algorithm of wrapper_lookup
The wrapper_lookup
method finds the method that calls the specified native API under the specified method.
1. Create a stack that stores the specified method.
2. While the stack has elements, keep doing steps 3, 4, and 5, otherwise return an empty list.
3. Check if the top element of the stack is visited.
- If YES, pop the top element and continue to the next loop.
- If NO, record the top element as visited.
4. Find methods that the top element calls, and we refer to them as submethods.
5. Check if the specified native API is one of the submethods.
- If YES, return a list containing the top element.
- If NO, push the submethods to the stack except Android APIs.
Here is the flowchart of wrapper_lookup
.
The code of wrapper_lookup
def wrapper_lookup(apkinfo, method, native_api):
visited_method_list = set()
stack = [method]
while stack:
parent = stack[-1]
if parent not in visited_method_list:
visited_method_list.add(parent)
submethods = {reference[0] for reference in apkinfo.lowerfunc(parent)}
if native_api in submethods:
return [parent]
next_level = filter(lambda m: not m.is_android_api(), submethods)
stack.extend(next_level)
else:
stack.pop()
return []
show_comparison_graph
The algorithm of show_comparison_graph
The show_comparison_graph
generates and displays a radar chart based on the maximum label confidence of rule labels which compares the behaviors between the designated APK samples.
1. The function "show_comparison_graph" takes four parameters:
- title: Text to be displayed as the chart's title.
- labels: Categories to be represented on the radar chart.
- malware_confidences: A mapping of malware identifiers to their respective lists of confidence scores.
- font_size: The textual size for the chart, defaulting to 22 if not specified.
2. Initialize the figure
- Create a blank figure object "fig" using "go.Figure()".
3. Set the layout of the figure using "fig.update_layout"
- Define the polar coordinate system with the radial axis visible, ranging from 0 to 100, and having a tick every 20 units.
- Enable the legend display.
- Set the chart title with bold text.
- Set the font size.
- Center the title horizontally.
- Configure the position and order of the legend.
4. Add data to the figure
- For each "malware_name" in the "malware_confidences" dictionary, create a radar chart data trace.
- Use "go.Scatterpolar", specifying the radius as the array of confidence values and the angle as the labels.
- Set the fill mode to "toself" to create a closed radar area.
- Name each trace as "malware_name" and customize the line width.
5. Display the figure
- Display the figure using the "fig.show()" method.
6. Check and create a storage directory
- Check if the directory "behaviors_comparison_radar_chart exists", and create it if it does not.
- If YES, proceed to the next step.
- If NOT, create the "behaviors_comparison_radar_chart" directory.
7. Save the figure as an image
- Save the figure as a JPEG image in the "behaviors_comparison_radar_chart" directory.
Here is the flowchart of show_comparison_graph
.
The code of show_comparison_graph
def show_comparison_graph(title, labels, malware_confidences, font_size=22):
"""
show radar chart based on max label confidence of several malwares
:param title: title of the graph to be displayed
:param labels: labels to be shown on the radar chart
:param malware_confidences: dictionary with structure, malware_name=[
array of confidences to be shown on radar chart]
:return: None
"""
fig = go.Figure()
# plot the graph with specific layout
fig.update_layout(
polar=dict(radialaxis=dict(visible=True, range=[0, 100], dtick=20)),
showlegend=True,
title={
"text": f"<b>{title}</b>",
},
font=dict(size=font_size),
title_x=0.5,
legend=dict(
y=0.5,
x=0.8,
traceorder="normal",
),
)
for malware_name in malware_confidences:
fig.add_trace(
go.Scatterpolar(
r=malware_confidences[malware_name],
theta=labels,
fill="toself",
name=malware_name,
line=dict(
width=4,
),
)
)
fig.show()
if not os.path.exists("behaviors_comparison_radar_chart"):
os.mkdir("behaviors_comparison_radar_chart")
fig.write_image("behaviors_comparison_radar_chart/compariso_image.jpeg")
call_graph
The algorithm of call_graph
The call_graph
method generates a call graph based on the two native APIs.
1. Check if the two methods in the call graph analysis are the native APIs.
- If NO, find the methods that call the native APIs under the two methods.
And we refer to them as wrappers.
- If YES, continue to the following steps.
2. Initialize the call graph and draw it in the following steps.
3. Draw the node representing the mutual parent function of the two native APIs.
4. Draw the nodes representing the wrappers.
5. Draw the edges representing the method calls between the wrappers.
6. Draw the nodes representing the two native APIs.
7. Draw the edges representing the method calls from the mutual parent function to the native APIs.
Here is the flowchart of call_graph
.
The code of call_graph
def call_graph(call_graph_analysis, output_format="png"):
"""
Generating a call graph based on two native Android APIs.
"""
parent_function = call_graph_analysis["parent"]
apkinfo = call_graph_analysis["apkinfo"]
first_call = call_graph_analysis["first_call"]
second_call = call_graph_analysis["second_call"]
first_api = call_graph_analysis["first_api"]
second_api = call_graph_analysis["second_api"]
crime = call_graph_analysis["crime"]
if first_call != first_api:
first_wrapper = wrapper_lookup(apkinfo, first_call, first_api)
if second_call != second_api:
second_wrapper = wrapper_lookup(apkinfo, second_call, second_api)
# Initialize the Digraph object
dot = Digraph(
filename=f"{parent_function.name}_{first_call.name}_{second_call.name}",
node_attr={"fontname": "Courier New Bold"},
comment="Quark-Engine Call Graph Result",
format=output_format,
graph_attr={
"label": f"Potential Malicious Activity: {crime}",
"labelloc": "top",
"center": "true",
},
)
dot.attr(compound="true")
with dot.subgraph(name="cluster_mutual") as mutual_parent_function_description:
mutual_parent_function_description.attr(
style="rounded",
penwidth="1",
fillcolor="white",
fontname="Courier New",
shape="box",
)
mutual_parent_function_description.attr(
label="Mutual Parent Function", fontname="Courier New Bold"
)
# mutual parent function node
p, r = str(parent_function.descriptor).split(")")
mutual_parent_function_description.node(
f"{parent_function.full_name}",
label=f"Access: {parent_function.access_flags}\nClass: {parent_function.class_name}\nMethod: {parent_function.name}\n Parameter: {p})\n Return: {r}",
shape="none",
fontcolor="blue",
fontname="Courier New",
)
with dot.subgraph(name="cluster_0") as wrapper:
wrapper.attr(label="Wrapped Functions", fontname="Courier New Bold")
wrapper.attr(style="rounded", penwidth="1", fillcolor="red", shape="box")
# Build the first call nodes
if first_call != first_api:
for wp_func in first_wrapper:
p, r = str(wp_func.descriptor).split(")")
wrapper.node(
f"{wp_func.full_name}",
label=f"Access: {wp_func.access_flags}\nClass: {wp_func.class_name}\nMethod: {wp_func.name}\n Parameter: {p})\n Return: {r}",
style="rounded",
fontcolor="blue",
penwidth="1",
fillcolor="white",
fontname="Courier New",
shape="none",
)
# wrapper -> wrapper
for i in range(len(first_wrapper) - 1, 0, -1):
wrapper.edge(
f"{first_wrapper[i].full_name}",
f"{first_wrapper[i - 1].full_name}",
"calls",
fontname="Courier New",
)
if second_call != second_api:
for wp_func in second_wrapper:
p, r = str(wp_func.descriptor).split(")")
wrapper.node(
f"{wp_func.full_name}",
label=f"Access: {wp_func.access_flags}\nClass: {wp_func.class_name}\nMethod: {wp_func.name}\n Parameter: {p})\n Return: {r}",
style="rounded",
fontcolor="blue",
penwidth="1",
fillcolor="white",
fontname="Courier New",
shape="none",
)
# wrapper -> wrapper
for i in range(len(second_wrapper) - 1, 0, -1):
wrapper.edge(
f"{second_wrapper[i].full_name}",
f"{second_wrapper[i - 1].full_name}",
"calls",
fontname="Courier New",
)
with dot.subgraph(name="cluster_1") as native_call_subgraph:
native_call_subgraph.attr(
style="rounded",
penwidth="1",
fillcolor="white",
fontname="Courier New",
shape="box",
)
native_call_subgraph.attr(label="Native API Calls", fontname="Courier New Bold")
# Native API Calls
native_call_subgraph.node(
f"{first_api.full_name}",
label=f"Class: {first_api.class_name}\nMethod: {first_api.name}",
fontcolor="blue",
shape="none",
fontname="Courier New",
)
native_call_subgraph.node(
f"{second_api.full_name}",
label=f"Class: {second_api.class_name}\nMethod: {second_api.name}",
fontcolor="blue",
shape="none",
fontname="Courier New",
)
# mutual parent function -> the first node of each node
if first_call != first_api:
dot.edge(
f"{parent_function.full_name}",
f"{first_wrapper[-1].full_name}",
"First Call",
fontname="Courier New",
)
dot.edge(
f"{first_wrapper[0].full_name}",
f"{first_api.full_name}",
"calls",
fontname="Courier New",
)
else:
dot.edge(
f"{parent_function.full_name}",
f"{first_api.full_name}",
"First Call",
fontname="Courier New",
)
if second_call != second_api:
dot.edge(
f"{parent_function.full_name}",
f"{second_wrapper[-1].full_name}",
"Second Call",
fontname="Courier New",
)
dot.edge(
f"{second_wrapper[0].full_name}",
f"{second_api.full_name}",
"calls",
fontname="Courier New",
)
else:
dot.edge(
f"{parent_function.full_name}",
f"{second_api.full_name}",
"Second Call",
fontname="Courier New",
)
dot.render(
f"call_graph_image/{parent_function.name}_{first_call.name}_{second_call.name}"
)
quark.utils.output.py
get_rule_classification_data
The algorithm of get_rule_classification_data
The get_rule_classification_data
method returns the crimes in the call graph analysis list and the cross-references of their parent functions.
1. Collect the crimes in the call graph analysis list.
2. Search for cross-references of their parent functions within the specified depth.
3. Return the results of the two steps above.
Here is the flowchart of get_rule_classification_data
.
The code of get_rule_classification_data
def get_rule_classification_data(call_graph_analysis_list, search_depth):
return _collect_crime_description(
call_graph_analysis_list
), _search_cross_references(call_graph_analysis_list, search_depth)