338 lines
9.4 KiB
Python
Executable File
338 lines
9.4 KiB
Python
Executable File
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.keys import Keys
|
|
import time
|
|
import pickle
|
|
import urllib
|
|
import base64
|
|
|
|
class Browser() :
|
|
def __init__(self, url) :
|
|
self.url = url
|
|
self.driver = self.make_driver()
|
|
|
|
def __del__(self) :
|
|
try :
|
|
self.driver.close()
|
|
except Exception :
|
|
pass
|
|
|
|
def make_driver(self) :
|
|
chrome_options = Options()
|
|
chrome_options.add_argument('--headless')
|
|
chrome_options.add_argument('--no-sandbox')
|
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
|
chrome_options.add_argument('--disable-extensions')
|
|
last = None
|
|
for i in range(5) :
|
|
try :
|
|
driver = webdriver.Chrome('/usr/bin/chromedriver',chrome_options=chrome_options)
|
|
return driver
|
|
except Exception as e:
|
|
time.sleep(1)
|
|
last = e
|
|
raise(last)
|
|
|
|
def open(self, url=None) :
|
|
if not url :
|
|
url = self.url
|
|
self.driver.get(url)
|
|
|
|
# foo takes self.driver as an arg, like `
|
|
# lambda x: x.find_element_by_id("id")`
|
|
def until(elem, foo) :
|
|
import datetime
|
|
now = datetime.datetime.now()
|
|
while (datetime.datetime.now() - now).total_seconds() < 30 :
|
|
if foo(elem) :
|
|
return
|
|
time.sleep(1)
|
|
raise Exception("until failed")
|
|
|
|
# ("//form[1]")
|
|
# ("//form[@id='loginForm']")
|
|
def find_xpath(self, xpath) :
|
|
return self.driver.find_elements_by_xpath(xpath)
|
|
|
|
# find_element_by_partial_link_text('Conti')
|
|
def find_links(self, prefix) :
|
|
elements = self.driver.find_elements_by_partial_link_text(prefix)
|
|
return [e.get_attribute("href") for e in elements]
|
|
|
|
class Todays_Auctions() :
|
|
def __init__(self) :
|
|
self.b = Browser("https://www.copart.com/todaysAuction/")
|
|
|
|
def auctions_links(self) :
|
|
return self.b.find_links("Join Auction")
|
|
|
|
class Auction() :
|
|
def __init__(self, url) :
|
|
self.b = Browser(url)
|
|
iframe = self.b.find_xpath("//iframe")[0]
|
|
self.url = iframe.get_attribute("src")
|
|
self.b = Browser(self.url)
|
|
|
|
def final_bids(self) :
|
|
while True :
|
|
yield self.wait_car()
|
|
|
|
def wait_car(self) :
|
|
c = Car()
|
|
c.extract(self.b)
|
|
d = c.copy()
|
|
while c == d :
|
|
time.sleep(1)
|
|
c = d.copy()
|
|
d.extract(self.b)
|
|
return c
|
|
|
|
class Car() :
|
|
def __init__(self) :
|
|
self.bid = None
|
|
self.details = None
|
|
self.image_links = None
|
|
self.images = None
|
|
self.title = None
|
|
self.watched = False
|
|
for attr in self.get_attrs() :
|
|
setattr(self, attr, None)
|
|
|
|
def copy(self) :
|
|
c = Car()
|
|
for attr in self.get_attrs() :
|
|
v = getattr(self, attr)
|
|
setattr(c, attr, v)
|
|
return c
|
|
|
|
def __eq__(self, other) :
|
|
for attr in [ i for i in self.get_attrs() if not i in ["bid", "images"] ]:
|
|
a = getattr(self, attr)
|
|
b = getattr(other, attr)
|
|
if a != b :
|
|
return False
|
|
return True
|
|
|
|
def get_attrs(self) :
|
|
return [
|
|
"title",
|
|
"bid",
|
|
"details",
|
|
"image_links",
|
|
"watched",
|
|
]
|
|
|
|
def __str__(self) :
|
|
out = []
|
|
for attr in [ i for i in self.get_attrs() if not i.startswith("image") ] :
|
|
out.append("{}:{}".format(attr, getattr(self, attr)))
|
|
return ", ".join(out)
|
|
|
|
def extract(self, browser) :
|
|
last = None
|
|
for attr in self.get_attrs() :
|
|
for i in range(3) :
|
|
try :
|
|
foo = getattr(self, "extract_{}".format(attr))
|
|
setattr(self, attr, foo(browser))
|
|
last = None
|
|
break
|
|
except Exception as e :
|
|
last = e
|
|
time.sleep(1)
|
|
if last :
|
|
raise(last)
|
|
|
|
def extract_bid(self, elem) :
|
|
Browser.until(elem, lambda x: elem.find_xpath("//bidding-dialer-refactor"))
|
|
elem = elem.find_xpath("//bidding-dialer-refactor")[0]
|
|
def test(elem) :
|
|
e = elem.find_element_by_tag_name("text")
|
|
elem.last = e.get_property("innerHTML").strip()
|
|
return "$" in elem.last
|
|
elem.last = None
|
|
Browser.until(elem, test)
|
|
return elem.last
|
|
|
|
def extract_details(self, elem) :
|
|
details = {}
|
|
for i in ["primary", "secondary"] :
|
|
Browser.until(elem, lambda x: elem.find_xpath("//lot-details-"+i+"-refactored"))
|
|
e = elem.find_xpath("//lot-details-"+i+"-refactored//perfect-scrollbar")[0]
|
|
e = e.find_element_by_class_name("ps-content")
|
|
l = e.find_element_by_tag_name("label")
|
|
v = l.parent.find_element_by_class_name("txtvalue").get_property("innerHTML")
|
|
l = l.get_property("innerHTML")
|
|
print(e) # TODO
|
|
print(l, v)
|
|
exit(1)
|
|
return elem.get_property("innerText")
|
|
|
|
def extract_image_links(self, elem) :
|
|
Browser.until(elem, lambda x: elem.find_xpath("//ngx-carousel"))
|
|
elem = elem.find_xpath("//ngx-carousel")[0]
|
|
def test(elem) :
|
|
last = []
|
|
e = elem.find_elements_by_tag_name("ngx-item")
|
|
if not e :
|
|
return False
|
|
for item in e :
|
|
images = item.find_elements_by_tag_name("img")
|
|
for image in images :
|
|
last.append(image.get_attribute("src"))
|
|
elem.last = last
|
|
return last
|
|
elem.last = None
|
|
Browser.until(elem, test)
|
|
return elem.last
|
|
|
|
def extract_title(self, elem) :
|
|
Browser.until(elem, lambda x: elem.find_xpath("//lot-header"))
|
|
elem = elem.find_xpath("//lot-header")[0]
|
|
elem = elem.find_element_by_tag_name("section")
|
|
elem = elem.find_element_by_tag_name("div")
|
|
elem = elem.find_element_by_tag_name("div")
|
|
return elem.get_property("innerText")
|
|
|
|
def extract_watched(self, elem) :
|
|
Browser.until(elem, lambda x: elem.find_xpath("//widget-header-sale"))
|
|
elem = elem.find_xpath("//widget-header-sale")[0]
|
|
elem = elem.find_elements_by_class_name("watchlist")
|
|
return len(elem) > 0
|
|
|
|
def extract_images(self) :
|
|
images = []
|
|
for link in self.image_links :
|
|
images.append(Image(link))
|
|
return images
|
|
|
|
class Image() :
|
|
def __init__(self, url) :
|
|
self.ext = url.split(".")[-1]
|
|
if len(self.ext) > 10 :
|
|
self.ext = "jpg"
|
|
self.raw = urllib.request.urlopen(url).read()
|
|
|
|
def save(self, path) :
|
|
if not path.endswith(self.ext) :
|
|
path += ".{}".format(self.ext)
|
|
with open(path, "wb") as f :
|
|
f.write(self.raw)
|
|
|
|
class Store() :
|
|
def __init__(self) :
|
|
self._list = self.remote_list()
|
|
|
|
def set(self, key, value) :
|
|
self._set(key, value)
|
|
self._list.append(key)
|
|
self._set("LIST", self._list)
|
|
|
|
def _set(self, key, value) :
|
|
url = self.key_to_url(key)
|
|
value = pickle.dumps(value)
|
|
self.DO(url, method="PUT", body=value)
|
|
|
|
def get(self, key) :
|
|
url = self.key_to_url(key)
|
|
try :
|
|
value = self.DO(url, method="GET")
|
|
return pickle.loads(value)
|
|
except urllib.error.HTTPError as e :
|
|
if "404" in str(e) :
|
|
return None
|
|
raise(e)
|
|
|
|
def key_to_url(self, key) :
|
|
return "http://localhost:21412/{}".format(self.clean_key(key))
|
|
|
|
def clean_key(self, key) :
|
|
return base64.urlsafe_b64encode(str.encode(key)).decode()
|
|
|
|
def list(self, prefix="") :
|
|
return [i for i in self._list if i.startswith(prefix)]
|
|
|
|
def remote_list(self) :
|
|
value = self.get("LIST")
|
|
if not value :
|
|
return []
|
|
return value
|
|
|
|
def DO(self, url, method="GET", body=None) :
|
|
r = urllib.request.Request(url, method=method, data=body)
|
|
resp = urllib.request.urlopen(r).read()
|
|
return resp
|
|
|
|
def main(args) :
|
|
print("todays auctions...")
|
|
today = Todays_Auctions()
|
|
store = Store()
|
|
auctions = []
|
|
print("auctions links....")
|
|
for link in today.auctions_links() :
|
|
auctions.append(Auction(link))
|
|
break
|
|
for car in auctions[0].final_bids() :
|
|
print(car)
|
|
store.set(str(car), car)
|
|
|
|
def test(args) :
|
|
c = Car()
|
|
s = Store()
|
|
print(s.set("hello", "world"))
|
|
print(s.get("hello"))
|
|
c.title = "title of car"
|
|
c.bid = "$1,100"
|
|
c.image_links = ["https://www.gravatar.com/avatar/24a41cddd8faf69e3fbd0a778ba6fedf?s=32&d=identicon&r=PG&f=1"]
|
|
c.watched = True
|
|
c.images = c.extract_images()
|
|
s.set(str(c), c)
|
|
print(c)
|
|
print(s.get(str(c)))
|
|
print(type(s.get(str(c))))
|
|
print(c.images)
|
|
c.images[0].save("./out")
|
|
print(s.list())
|
|
print(s.list("ti"))
|
|
print(s.list("it"))
|
|
s.set("hi", "mom")
|
|
print(s.list())
|
|
|
|
def main(args) :
|
|
a = Auction("https://www.copart.com/auctionDashboard?auctionDetails=880-A")
|
|
c = Car()
|
|
s = Store()
|
|
print(c.extract_details(a.b))
|
|
return
|
|
s.set(str(c), c)
|
|
print(c)
|
|
print(s.list())
|
|
|
|
def main(args) :
|
|
def new() :
|
|
from sys import stderr
|
|
return webdriver.chrome.service.Service('/usr/bin/chromedriver', port=58080, service_args=[
|
|
"--headless",
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-extensions",
|
|
"--disable-gpu",
|
|
"--verbose",
|
|
])
|
|
service = new()
|
|
print(service.start())
|
|
print(service.port)
|
|
print(service.process)
|
|
print(service.service_url)
|
|
while True :
|
|
if not service.is_connectable() :
|
|
service.stop()
|
|
service = new()
|
|
service.start()
|
|
time.sleep(1)
|
|
service.stop()
|
|
|
|
if __name__ == "__main__" :
|
|
from sys import argv
|
|
main(argv[:])
|