Source code for pypixplore.remote

import xmlrpc.client as xmlrpcclient
import datetime
import time
import json
import requests
import concurrent.futures
from ratelimit import rate_limited
import pickle
import dbm
import os
import random as rd

[docs]class Index: """ Connects with remote server. PyPI by default. """ def __init__(self, server='https://pypi.python.org/pypi', cache_path=os.path.join(os.path.expanduser('~'), '.pypiexplorer_cache')): self.client = xmlrpcclient.ServerProxy(server) self.cache = dbm.open(cache_path, 'c') # self.cache.reorganize() # optimize the cache @rate_limited(10) def _get_JSON(self, package_name, update_cache=True): """ Gets JSON record for a given package :param package_name: name of the package :return: dictionary """ results = self.cache.get(package_name, None) # TODO: check if the package data has been updated since last time. if results is not None: data = pickle.loads(results) # print("fetched from cache") else: try: url = 'http://pypi.python.org/pypi/{}/json'.format(package_name) ans = requests.get(url, timeout=15) data = ans.json() if update_cache: self._update_cache(package_name, data) except (ValueError, requests.exceptions.ConnectionError): data = [] except: data = [] return data
[docs] def get_multiple_JSONs(self, pkg_list): output = {} with concurrent.futures.ThreadPoolExecutor(max_workers=150) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(self._get_JSON, pkg_name, False): pkg_name for pkg_name in pkg_list} for future in concurrent.futures.as_completed(future_to_url): pkg_name = future_to_url[future] try: JSON = future.result() output[pkg_name] = JSON except Exception as exc: print('%r generated an exception: %s' % (pkg_name, exc)) return output
[docs] def package_info(self, pkgn): a = self._get_JSON(pkgn) name = a["info"]["name"] description = a["info"]["description"] if len(description) > 2000: description = description[:2000] + " [...]" return name, description
def _update_cache(self, package_name, data): # self.cache.insert(data) self.cache[package_name] = pickle.dumps(data)
[docs] def get_latest_releases(self, package_name): return self.client.package_releases(package_name)
# moved get_dependencies and dependency_graph to local.py, as they can't be obtained remotely
[docs] def get_downloads(self, package_name): """ Gets number of downloads for a given package :param package_name: name of the package :return: dictionary of number of downloads. keys are 'last_month', 'last_week' and 'last_day' """ return self._get_JSON(package_name)["info"]["downloads"]
[docs] def release_series(self, package_name): """ Gets most recent releases for a given package :param package_name: name of the package :return: List of itens of the last 10 most recent releases of the package """ releases_list = list(self._get_JSON(package_name)['releases'].keys()) releases_list.sort(reverse=True) last_ten = releases_list[:10] return last_ten
[docs] def get_by_TROVE_classifier(self, trove): raise NotImplementedError
[docs] def get_well_maintained(self): """ Get packages which have had at least one release in the last six months, sorted by most recently updated """ raise NotImplementedError
[docs] def count_releases(self, json, time_days): """ This function count how many releases a package received in a period of time in days. :param json: The json of a package. :param time_days: The period of time that the function will use to count how many releases the package has. :return: The amount of releases a package received in the given period. """ time_days = int(time_days) if json == []: return 0 keys = json["releases"].keys() order_process = {i.replace('.', ''): i for i in keys } keys_in_order = [order_process[i] for i in sorted(order_process.keys(), reverse=True)] count = 0 for key in keys_in_order: if json["releases"][key] == []: break date = json["releases"][key][0]["upload_time"] date = datetime.datetime(int(date[0:4]), int(date[5:7]), int(date[8:10]), 0, 0, 0) current_time = time.strftime("%Y-%m-%d").split('-') current_time = datetime.datetime(int(current_time[0]), int(current_time[1]), int(current_time[2]), 0, 0, 0) difference = current_time - date if difference.days < time_days: count += 1 else: break return count
[docs] def rank_of_packages_by_recent_release(self, time_days = 30, list_size = None, rank_size = None): """ This function gets the packages and rank them by amount of releases in a period of time. :param time_days: The period of time in days that de function count_releases will use. :param list_size: If given a -list_size-, the function use the first -list_size- packages of the list_of_all_packages. :param rank_size: If given a -rank_size-, the function will return the first -rank_size- of the rank. :return: The rank by recent release using the time in days, the -list_size- and the -rank_size- given. """ list_of_packages = self.client.list_packages() list_of_packages= list_of_packages[0:list_size] dict_package_json = self.get_multiple_JSONs(list_of_packages) dictionary = {i : self.count_releases(dict_package_json[i], time_days) for i in list_of_packages} rank = sorted(dictionary, key=dictionary.get, reverse=True) rank = rank[0:rank_size] return(rank)
[docs] def get_len_response(self, response): if response.ok: count = len(json.loads(response.text)) else: count = None return count
[docs] def get_github_repo_by_name(self, hyperlink): parts = hyperlink.split('/') user = parts[-2] repo = parts[-1] return 'https://api.github.com/repos/{}/{}/'.format(user, repo)
[docs] def get_git_stats(self, of='', package_name=''): if of == '': raise AttributeError('No information specified on "of:"') if package_name == '': raise AttributeError('No package specified') json = self._get_JSON(package_name) if len(json) == 0: print('Package not found') raise AttributeError hyperlink = json["info"]['home_page'] name = json['info']['name'] if 'github' in hyperlink: git_repo_api = self.get_github_repo_by_name(hyperlink) else: print('Package does not have a GitHub Repo as an official homepage.\n') return None # git_repo_api = self.get_github_repo_by_search(name) # get info from github api if of == 'forks': response = requests.get(git_repo_api + 'forks') elif of == 'stars': response = requests.get(git_repo_api + 'stargazers') elif of == 'watchers': response = requests.get(git_repo_api + 'subscribers') else: print('{} is not a possible option for "of".\n If you think that it should be implemented, implement!') raise AttributeError return self.get_len_response(response)
[docs] def how_many_packages_version_py(self, n_sample=700): """ print('This command can take a while, do you wish to continue? /n type Y or N') aux = input() aux = aux.capitalize() if aux == 'N': return None elif aux != 'Y': print('Por favor, digite S para sim ou N para não') self.how_many_packages_version_py() """ n_sample = int(n_sample) list_of_all_packages = self.client.list_packages() rd.shuffle(list_of_all_packages) all_packages = self.get_multiple_JSONs(list_of_all_packages[:n_sample]) count2master = 0 count3master = 0 both = 0 unknown = 0 for key, package in all_packages.items(): if len(package) > 0: package_classifiers = package['info']['classifiers'] else: continue pyt2 = any(['Python :: 2' in version_control for version_control in package_classifiers]) pyt3 = any(['Python :: 3' in version_control for version_control in package_classifiers]) if pyt2 and pyt3: both += 1 elif pyt2 and not pyt3: count2master += 1 elif pyt3 and not pyt2: count3master += 1 else: unknown += 1 count_final = [round((both / n_sample), 2) * 100, round((count2master / n_sample), 2) * 100, round((count3master / n_sample), 2) * 100, round((unknown / n_sample), 2) * 100] return count_final
[docs] def print_graphics(self, results): b, python2, python3, u = results string_to_print = 'Both 2.x and 3.x |{} {}%\nOnly Python 2.x.x|{} {}%\n' \ 'Only Python 3.x.x|{} {}%\nUnknown |{} {}%'.format('*' * int(b), b, '*' * int(python2), python2, '*' * int(python3), python3, '*' * int(u), u) return string_to_print