2022-01-30 07:11:15 +01:00

255 lines
8.7 KiB
Python

import requests
import mimetypes
import os
import sys
import urllib.request
import unicodedata
import re
from pathlib import Path
from datetime import datetime
import time
import socket
# SYNPOSIS:
# To download posts from an artist:
# python3 grab.py mixppl
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def logMsg(msg, mode):
col = 0
prefix = 0
if mode == "okdl":
col = bcolors.OKCYAN
prefix = "[OK_DL ]"
elif mode == "okndl":
col = bcolors.OKBLUE
prefix = "[OK_NO_DL]"
elif mode == "warn":
col = bcolors.WARNING
prefix = "[WARNING ]"
elif mode == "err":
col = bcolors.FAIL
prefix = "[ERROR ]"
else:
print(bcolors.FAIL + "SUPPLIED INVALID LOG MODE!!! USE EITHER okdl, okndl, warn, or err!")
timestamp = getCurrentTimestamp()
# Log to console
print(col + f"[{timestamp}][{artist_name}]: " + msg)
# Log to logfile
logfile = open("./logs/" + slugify(artist_name) + ".txt", "a") # Open existing or create
logfile.write(prefix + " " + "[" + timestamp + "]: " + msg + "\n")
logfile.close()
def extensionFromUrl(url):
rurl = url[::-1]
rext = ""
for c in rurl:
if c != '.':
rext = rext + c
else:
break
ext = rext[::-1]
# Now remove the get parameters
foundQuestionmark = False
actualExt = ""
for c in ext:
if c == '?':
foundQuestionmark = True
if not foundQuestionmark:
actualExt = actualExt + c
return actualExt
def slugify(value, allow_unicode=False):
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[-\s]+', '-', value).strip('-_')
def getCurrentTimestamp():
return datetime.utcfromtimestamp(time.time()).strftime("%m-%d-%Y %H-%M")
def isPostAlreadySaved(post_id):
idset_filename = "./already_saved/" + slugify(artist_name) + ".txt"
# Does the index file even exist yet?
if not os.path.exists(idset_filename):
return False
# Open the index file
index_file = open(idset_filename, "r") # Open existing or create
# Store lines in array
already_downloaded_post_ids = index_file.readlines()
return (post_id + "\n") in already_downloaded_post_ids
def markPostAsSaved(post_id):
idset_filename = "./already_saved/" + slugify(artist_name) + ".txt"
# Open the index file
index_file = open(idset_filename, "a") # Open existing or create
index_file.write(post_id + "\n")
index_file.close()
def downloadMedia(url, filename):
# Prepare and execute query to download images
opener = urllib.request.build_opener()
opener.addheaders = image_request_headers
urllib.request.install_opener(opener)
source = urllib.request.urlretrieve(asset_image_url, filename)
project_fetch_headers = {
'authority': 'www.artstation.com',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'sec-fetch-site': 'none',
'sec-fetch-mode': 'navigate',
'sec-fetch-user': '?1',
'sec-fetch-dest': 'document',
'accept-language': 'de-DE,de;q=0.9',
'authority': 'api.reddit.com'
}
image_request_headers = [
('authority', 'cdna.artstation.com'),
('pragma', 'no-cache'),
('cache-control', 'no-cache'),
('sec-ch-ua', '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"'),
('sec-ch-ua-mobile', '?0'),
('sec-ch-ua-platform', '"Windows"'),
('upgrade-insecure-requests', '1'),
('user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36'),
('accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9'),
('sec-fetch-site', 'none'),
('sec-fetch-mode', 'navigate'),
('sec-fetch-user', '?1'),
('sec-fetch-dest', 'document'),
('accept-language', 'de-DE,de;q=0.9')
]
# 2 minute timeout in case something gets stuck.
socket.setdefaulttimeout(120)
artist_name = str.lower(sys.argv[1])
# Create artist directory if it doesn't exist
artist_directory = "./downloads/" + slugify(artist_name) + "/"
Path(artist_directory).mkdir(parents=True, exist_ok=True)
# Create directory for already saved posts, and generate filename
Path("./already_saved/").mkdir(parents=True, exist_ok=True)
# Create directory for logging, and generate filename
Path("./logs/").mkdir(parents=True, exist_ok=True)
# Request project info for artist
lastPageReached = False
pageCounter = 1
try:
while not lastPageReached:
logMsg(f"Fetching page {pageCounter} of {artist_name}...", "okndl")
projects_data = requests.get(f"https://www.artstation.com/users/{artist_name}/projects.json?page={pageCounter}", headers=project_fetch_headers)
projects = projects_data.json()["data"]
page_num_projects = len(projects)
lastPageReached = page_num_projects < 50 # Each full page contains 50 projects. If it has less than 50, it is the last page
if not lastPageReached:
pageCounter = pageCounter + 1
logMsg(f"Page contains {page_num_projects} projects...", "okndl")
else:
logMsg(f"Page contains {page_num_projects} projects... That's the last page!", "okndl")
# For each project in all of the artists projects
for project in projects:
project_name = project["title"]
project_hash_id = project["hash_id"]
logMsg(f"Found project '{project_name}' with id {project_hash_id}. Fetching more info about it...", "okndl")
# Have we already downloaded this post?
if not isPostAlreadySaved(project_hash_id):
# Fetch information about the project
project_info = requests.get(f"https://www.artstation.com/projects/{project_hash_id}.json", headers=project_fetch_headers)
assets = project_info.json()["assets"]
# For each asset in the project (might be multiple images)
for asset in assets:
asset_type = asset["asset_type"]
# If the asset is an image
if asset_type == "image":
asset_image_url = asset["image_url"]
asset_position = asset["position"]
# Generate a download filename
filename = artist_directory + slugify(project_name[:60] + "_" + project_hash_id + "_" + str(asset_position)) + "." + extensionFromUrl(asset_image_url)
logMsg(f"Found image-asset for project '{project_name}' [{project_hash_id}] at position {asset_position}. Downloading to '{filename}'...", "okdl")
# Download it
downloadMedia(asset_image_url, filename)
else:
logMsg(f"Found non-image-asset for project '{project_name}' [{project_hash_id}] at position {asset_position}. Skipping...", "okdl")
# After downloading all assets, mark the project as downloaded.
markPostAsSaved(project_hash_id)
# Project is already downloaded
else:
logMsg(f"Skipping project '{project_name}' [{project_hash_id}] because it is already downloaded.", "okndl")
logMsg(f"Finished all pages of {artist_name}... Total pages of this artist scanned: {pageCounter}", "okndl")
except socket.timeout:
logMsg("Socket timeout of two minutes reached! We'll get 'em next time, boys!", "err")
except:
logMsg("Failed for some reason!", "err")