-
-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathdeviantart_scratcher.py
executable file
·208 lines (176 loc) · 6.03 KB
/
deviantart_scratcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python
"""
This file is dedicated to obtain a .csv record report for DeviantArt
data.
"""
# Standard library
import os
import sys
import traceback
# Third-party
import pandas as pd
import requests
from dotenv import load_dotenv
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
sys.path.append(".")
# First-party/Local
import quantify # noqa: E402
# Setup paths, Date and LOGGER using quantify.setup()
(
PATH_REPO_ROOT,
PATH_WORK_DIR,
PATH_DOTENV,
DATETIME_TODAY,
LOGGER,
) = quantify.setup(__file__)
# Load environment variables
load_dotenv(PATH_DOTENV)
# Global Variable for API_KEYS indexing
API_KEYS_IND = 0
# Gets API_KEYS and PSE_KEY from .env file
API_KEYS = os.getenv("GOOGLE_API_KEYS").split(",")
PSE_KEY = os.getenv("PSE_KEY")
# Set up file path for CSV report
DATA_WRITE_FILE = os.path.join(
PATH_WORK_DIR,
f"/data_deviantart_"
f"{DATETIME_TODAY.year}_{DATETIME_TODAY.month}_{DATETIME_TODAY.day}.csv",
)
# Log the start of the script execution
LOGGER.info("Script execution started.")
def get_license_list():
"""
Provides the list of license from 2018's record of Creative Commons.
Returns:
- np.array:
An np array containing all license types that should be searched
via Programmable Search Engine (PSE).
"""
LOGGER.info("Retrieving list of license from Creative Commons' record.")
# Read license data from file
cc_license_data = pd.read_csv(
f"{PATH_REPO_ROOT}/legal-tool-paths.txt", header=None
)
# Define regex pattern to extract license types
license_pattern = r"((?:[^/]+/){2}(?:[^/]+)).*"
license_list = (
cc_license_data[0]
.str.extract(license_pattern, expand=False)
.dropna()
.unique()
)
return license_list[4:]
def get_request_url(license):
"""
Provides the API Endpoint URL for specified parameter combinations.
Args:
- license (str): A string representing the type of license. It's a
segment of the URL towards the license description.
Returns:
- str: The API Endpoint URL for the query specified by parameters.
"""
LOGGER.info(
f"Generating API Endpoint URL for specified license: {license}"
)
try:
api_key = API_KEYS[API_KEYS_IND]
return (
"https://customsearch.googleapis.com/customsearch/v1"
f"?key={api_key}&cx={PSE_KEY}"
"&q=_&relatedSite=deviantart.com"
f'&linkSite=creativecommons.org{license.replace("/", "%2F")}'
)
except Exception as e:
if isinstance(e, IndexError):
LOGGER.error("Depleted all API Keys provided")
else:
raise e
def get_response_elems(license):
"""
Provides the metadata for query of specified parameters
Args:
- license (str): A string representing the type of license.
It's a segment of the URL towards the license description. If not provided,
it defaults to None, indicating no assumption about the license type.
Returns:
- dict: A dictionary mapping metadata to its value provided from the API
query.
"""
LOGGER.info("Making a request to the API and handling potential retries.")
try:
# Make a request to the API and handle potential retries
request_url = get_request_url(license)
max_retries = Retry(
total=5,
backoff_factor=10,
status_forcelist=[403, 408, 500, 502, 503, 504],
# 429 is Quota Limit Exceeded, which will be handled alternatively
)
session = requests.Session()
session.mount("https://", HTTPAdapter(max_retries=max_retries))
with session.get(request_url) as response:
response.raise_for_status()
search_data = response.json()
search_data_dict = {
"totalResults": search_data["searchInformation"]["totalResults"]
}
return search_data_dict
except Exception as e:
if isinstance(e, requests.exceptions.HTTPError):
# If quota limit exceeded, switch to the next API key
global API_KEYS_IND
API_KEYS_IND += 1
LOGGER.error("Changing API KEYS due to depletion of quota")
return get_response_elems(license)
else:
raise e
def set_up_data_file():
# Writes the header row to the file to contain DeviantArt data.
header_title = "LICENSE TYPE,Document Count"
with open(DATA_WRITE_FILE, "w") as f:
f.write(f"{header_title}\n")
def record_license_data(license_type):
"""Writes the row for LICENSE_TYPE to the file to contain DeviantArt data.
Args:
- license_type:
A string representing the type of license, and should be a segment
of its URL towards the license description. Alternatively, the
default None value stands for having no assumption about license
type.
"""
LOGGER.info(
"Writing the row for license type %s to contain DeviantArt data",
license_type,
)
data_log = (
f"{license_type},"
f"{get_response_elems(license_type)['totalResults']}"
)
with open(DATA_WRITE_FILE, "a") as f:
f.write(f"{data_log}\n")
def record_all_licenses():
"""
Records the data for all available license types listed in the license
list and writes this data into the DATA_WRITE_FILE, as specified by the
constant.
"""
# Gets the list of license types and record data for each license type
license_list = get_license_list()
for license_type in license_list:
record_license_data(license_type)
def main():
set_up_data_file()
record_all_licenses()
if __name__ == "__main__":
try:
main()
except SystemExit as e:
LOGGER.error(f"System exit with code: {e.code}")
sys.exit(e.code)
except KeyboardInterrupt:
LOGGER.info("(130) Halted via KeyboardInterrupt.")
sys.exit(130)
except Exception:
LOGGER.exception(f"(1) Unhandled exception: {traceback.format_exc()}")
sys.exit(1)