selenium/parallel/python/main.py

338 lines
9.4 KiB
Python
Executable File

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
import time
import pickle
import urllib
import base64
class Browser() :
def __init__(self, url) :
self.url = url
self.driver = self.make_driver()
def __del__(self) :
try :
self.driver.close()
except Exception :
pass
def make_driver(self) :
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-extensions')
last = None
for i in range(5) :
try :
driver = webdriver.Chrome('/usr/bin/chromedriver',chrome_options=chrome_options)
return driver
except Exception as e:
time.sleep(1)
last = e
raise(last)
def open(self, url=None) :
if not url :
url = self.url
self.driver.get(url)
# foo takes self.driver as an arg, like `
# lambda x: x.find_element_by_id("id")`
def until(elem, foo) :
import datetime
now = datetime.datetime.now()
while (datetime.datetime.now() - now).total_seconds() < 30 :
if foo(elem) :
return
time.sleep(1)
raise Exception("until failed")
# ("//form[1]")
# ("//form[@id='loginForm']")
def find_xpath(self, xpath) :
return self.driver.find_elements_by_xpath(xpath)
# find_element_by_partial_link_text('Conti')
def find_links(self, prefix) :
elements = self.driver.find_elements_by_partial_link_text(prefix)
return [e.get_attribute("href") for e in elements]
class Todays_Auctions() :
def __init__(self) :
self.b = Browser("https://www.copart.com/todaysAuction/")
def auctions_links(self) :
return self.b.find_links("Join Auction")
class Auction() :
def __init__(self, url) :
self.b = Browser(url)
iframe = self.b.find_xpath("//iframe")[0]
self.url = iframe.get_attribute("src")
self.b = Browser(self.url)
def final_bids(self) :
while True :
yield self.wait_car()
def wait_car(self) :
c = Car()
c.extract(self.b)
d = c.copy()
while c == d :
time.sleep(1)
c = d.copy()
d.extract(self.b)
return c
class Car() :
def __init__(self) :
self.bid = None
self.details = None
self.image_links = None
self.images = None
self.title = None
self.watched = False
for attr in self.get_attrs() :
setattr(self, attr, None)
def copy(self) :
c = Car()
for attr in self.get_attrs() :
v = getattr(self, attr)
setattr(c, attr, v)
return c
def __eq__(self, other) :
for attr in [ i for i in self.get_attrs() if not i in ["bid", "images"] ]:
a = getattr(self, attr)
b = getattr(other, attr)
if a != b :
return False
return True
def get_attrs(self) :
return [
"title",
"bid",
"details",
"image_links",
"watched",
]
def __str__(self) :
out = []
for attr in [ i for i in self.get_attrs() if not i.startswith("image") ] :
out.append("{}:{}".format(attr, getattr(self, attr)))
return ", ".join(out)
def extract(self, browser) :
last = None
for attr in self.get_attrs() :
for i in range(3) :
try :
foo = getattr(self, "extract_{}".format(attr))
setattr(self, attr, foo(browser))
last = None
break
except Exception as e :
last = e
time.sleep(1)
if last :
raise(last)
def extract_bid(self, elem) :
Browser.until(elem, lambda x: elem.find_xpath("//bidding-dialer-refactor"))
elem = elem.find_xpath("//bidding-dialer-refactor")[0]
def test(elem) :
e = elem.find_element_by_tag_name("text")
elem.last = e.get_property("innerHTML").strip()
return "$" in elem.last
elem.last = None
Browser.until(elem, test)
return elem.last
def extract_details(self, elem) :
details = {}
for i in ["primary", "secondary"] :
Browser.until(elem, lambda x: elem.find_xpath("//lot-details-"+i+"-refactored"))
e = elem.find_xpath("//lot-details-"+i+"-refactored//perfect-scrollbar")[0]
e = e.find_element_by_class_name("ps-content")
l = e.find_element_by_tag_name("label")
v = l.parent.find_element_by_class_name("txtvalue").get_property("innerHTML")
l = l.get_property("innerHTML")
print(e) # TODO
print(l, v)
exit(1)
return elem.get_property("innerText")
def extract_image_links(self, elem) :
Browser.until(elem, lambda x: elem.find_xpath("//ngx-carousel"))
elem = elem.find_xpath("//ngx-carousel")[0]
def test(elem) :
last = []
e = elem.find_elements_by_tag_name("ngx-item")
if not e :
return False
for item in e :
images = item.find_elements_by_tag_name("img")
for image in images :
last.append(image.get_attribute("src"))
elem.last = last
return last
elem.last = None
Browser.until(elem, test)
return elem.last
def extract_title(self, elem) :
Browser.until(elem, lambda x: elem.find_xpath("//lot-header"))
elem = elem.find_xpath("//lot-header")[0]
elem = elem.find_element_by_tag_name("section")
elem = elem.find_element_by_tag_name("div")
elem = elem.find_element_by_tag_name("div")
return elem.get_property("innerText")
def extract_watched(self, elem) :
Browser.until(elem, lambda x: elem.find_xpath("//widget-header-sale"))
elem = elem.find_xpath("//widget-header-sale")[0]
elem = elem.find_elements_by_class_name("watchlist")
return len(elem) > 0
def extract_images(self) :
images = []
for link in self.image_links :
images.append(Image(link))
return images
class Image() :
def __init__(self, url) :
self.ext = url.split(".")[-1]
if len(self.ext) > 10 :
self.ext = "jpg"
self.raw = urllib.request.urlopen(url).read()
def save(self, path) :
if not path.endswith(self.ext) :
path += ".{}".format(self.ext)
with open(path, "wb") as f :
f.write(self.raw)
class Store() :
def __init__(self) :
self._list = self.remote_list()
def set(self, key, value) :
self._set(key, value)
self._list.append(key)
self._set("LIST", self._list)
def _set(self, key, value) :
url = self.key_to_url(key)
value = pickle.dumps(value)
self.DO(url, method="PUT", body=value)
def get(self, key) :
url = self.key_to_url(key)
try :
value = self.DO(url, method="GET")
return pickle.loads(value)
except urllib.error.HTTPError as e :
if "404" in str(e) :
return None
raise(e)
def key_to_url(self, key) :
return "http://localhost:21412/{}".format(self.clean_key(key))
def clean_key(self, key) :
return base64.urlsafe_b64encode(str.encode(key)).decode()
def list(self, prefix="") :
return [i for i in self._list if i.startswith(prefix)]
def remote_list(self) :
value = self.get("LIST")
if not value :
return []
return value
def DO(self, url, method="GET", body=None) :
r = urllib.request.Request(url, method=method, data=body)
resp = urllib.request.urlopen(r).read()
return resp
def main(args) :
print("todays auctions...")
today = Todays_Auctions()
store = Store()
auctions = []
print("auctions links....")
for link in today.auctions_links() :
auctions.append(Auction(link))
break
for car in auctions[0].final_bids() :
print(car)
store.set(str(car), car)
def test(args) :
c = Car()
s = Store()
print(s.set("hello", "world"))
print(s.get("hello"))
c.title = "title of car"
c.bid = "$1,100"
c.image_links = ["https://www.gravatar.com/avatar/24a41cddd8faf69e3fbd0a778ba6fedf?s=32&d=identicon&r=PG&f=1"]
c.watched = True
c.images = c.extract_images()
s.set(str(c), c)
print(c)
print(s.get(str(c)))
print(type(s.get(str(c))))
print(c.images)
c.images[0].save("./out")
print(s.list())
print(s.list("ti"))
print(s.list("it"))
s.set("hi", "mom")
print(s.list())
def main(args) :
a = Auction("https://www.copart.com/auctionDashboard?auctionDetails=880-A")
c = Car()
s = Store()
print(c.extract_details(a.b))
return
s.set(str(c), c)
print(c)
print(s.list())
def main(args) :
def new() :
from sys import stderr
return webdriver.chrome.service.Service('/usr/bin/chromedriver', port=58080, service_args=[
"--headless",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-extensions",
"--disable-gpu",
"--verbose",
])
service = new()
print(service.start())
print(service.port)
print(service.process)
print(service.service_url)
while True :
if not service.is_connectable() :
service.stop()
service = new()
service.start()
time.sleep(1)
service.stop()
if __name__ == "__main__" :
from sys import argv
main(argv[:])