@ -5,9 +5,10 @@ import copy
import platform
import shutil
import string
import threading
# import undetected_chromedriver as uc
from utils import detect_optimizable , download_image , extract_text_from_html , get_output_code , isnotnull , lowercase_tags_in_xpath , myMySQL , new_line , \
on_press_creator , on_release_creator , readCode , replace_field_values , send_email , split_text_by_lines , write_to_csv , write_to_excel , write_to_json
on_press_creator , on_release_creator , readCode , rename_downloaded_file , re place_field_values , send_email , split_text_by_lines , write_to_csv , write_to_excel , write_to_json
from myChrome import MyChrome
from threading import Thread , Event
from PIL import Image
@ -112,9 +113,13 @@ class BrowserThread(Thread):
self . print_and_log ( " Save Name for task ID " , id , " is: " , self . saveName )
if not os . path . exists ( " Data/Task_ " + str ( id ) ) :
os . mkdir ( " Data/Task_ " + str ( id ) )
if not os . path . exists ( " Data/Task_ " + str ( id ) + " / " + self . saveName ) :
os . mkdir ( " Data/Task_ " + str ( id ) + " / " +
self . saveName ) # 创建保存文件夹用来保存截图
self . downloadFolder = " Data/Task_ " + str ( id ) + " / " + self . saveName
if not os . path . exists ( self . downloadFolder ) :
os . mkdir ( self . downloadFolder ) # 创建保存文件夹用来保存截图和文件
if not os . path . exists ( self . downloadFolder + " /files " ) :
os . mkdir ( self . downloadFolder + " /files " )
if not os . path . exists ( self . downloadFolder + " /images " ) :
os . mkdir ( self . downloadFolder + " /images " )
self . getDataStep = 0
self . startSteps = 0
try :
@ -142,12 +147,21 @@ class BrowserThread(Thread):
self . print_and_log ( " Loading stealth.min.js " )
self . browser . execute_cdp_cmd ( ' Page.addScriptToEvaluateOnNewDocument ' , {
' source ' : js } ) # TMALL 反扒
self . browser . execute_cdp_cmd ( " Page.addScriptToEvaluateOnNewDocument " , {
" source " : """
Object . defineProperty ( navigator , ' webdriver ' , {
get : ( ) = > undefined
} )
"""
} )
WebDriverWait ( self . browser , 10 )
self . browser . command_executor . _commands [ " send_command " ] = ( " POST " , ' /session/$sessionId/chromium/send_command ' )
path = os . path . join ( os . path . abspath ( " ./ " ) , " Data " , " Task_ " + str ( self . id ) )
path = os . path . join ( os . path . abspath ( " ./ " ) , " Data " , " Task_ " + str ( self . id ) , self . saveName , " files " )
self . paramss = { ' cmd ' : ' Page.setDownloadBehavior ' , ' params ' : { ' behavior ' : ' allow ' , ' downloadPath ' : path } }
self . browser . execute ( " send_command " , self . paramss ) # 下载地址改变
self . browser . execute ( " send_command " , self . paramss ) # 下载目录改变
self . monitor_event = threading . Event ( )
self . monitor_thread = threading . Thread ( target = rename_downloaded_file , args = ( path , self . monitor_event ) ) #path后面的逗号不能省略,是元组固定写法
self . monitor_thread . start ( )
# self.browser.get('about:blank')
self . procedure = service [ " graph " ] # 程序执行流程
try :
@ -187,12 +201,19 @@ class BrowserThread(Thread):
self . links = list ( filter ( isnotnull , service [ " url " ] ) ) # 要执行的link
self . OUTPUT = [ ] # 采集的数据
try :
self . dataWriteMode = service [ " dataWriteMode " ] # 数据写入模式,1为追加,2为覆盖
self . dataWriteMode = service [ " dataWriteMode " ] # 数据写入模式,1为追加,2为覆盖,3为重命名文件
except :
self . dataWriteMode = 1
if self . outputFormat == " csv " or self . outputFormat == " txt " or self . outputFormat == " xlsx " or self . outputFormat == " json " :
if self . dataWriteMode == 2 and os . path . exists ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' . ' + self . outputFormat ) :
os . remove ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' . ' + self . outputFormat )
if os . path . exists ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' . ' + self . outputFormat ) :
if self . dataWriteMode == 2 :
os . remove ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' . ' + self . outputFormat )
elif self . dataWriteMode == 3 :
i = 2
while os . path . exists ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' _ ' + str ( i ) + ' . ' + self . outputFormat ) :
i = i + 1
self . saveName = self . saveName + ' _ ' + str ( i )
self . print_and_log ( " 文件已存在,已重命名为 " , self . saveName )
self . writeMode = 1 # 写入模式,0为新建,1为追加
if self . outputFormat == " csv " or self . outputFormat == " txt " or self . outputFormat == " xlsx " :
if not os . path . exists ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName + ' . ' + self . outputFormat ) :
@ -521,7 +542,7 @@ class BrowserThread(Thread):
" / " , len ( self . links ) )
self . executeNode ( 0 )
self . urlId = self . urlId + 1
files = os . listdir ( " Data/Task_ " + str ( self . id ) + " / " + self . saveName )
# files = os.listdir("Data/Task_" + str(self.id) + "/" + self.saveName )
# 如果目录为空,则删除该目录
# if not files:
# os.rmdir("Data/Task_" + str(self.id) + "/" + self.saveName)
@ -544,6 +565,7 @@ class BrowserThread(Thread):
shutil . rmtree ( self . option [ " tmp_user_data_folder " ] )
except :
pass
self . monitor_event . set ( )
self . print_and_log ( " 清理完成!|Clean up completed! " )
self . print_and_log ( " 您现在可以安全的关闭此窗口了。|You can safely close this window now. " )
@ -768,6 +790,8 @@ class BrowserThread(Thread):
elif int ( codeMode ) == 5 :
try :
code = readCode ( code )
# global_namespace = globals().copy()
# global_namespace["self"] = self
output = exec ( code )
self . recordLog ( " 执行下面的代码: " + code )
self . recordLog ( " Execute the following code: " + code )
@ -847,6 +871,23 @@ class BrowserThread(Thread):
self . print_and_log ( " 根据设置的自定义操作,任务已刷新页面|Task refreshed page according to custom operation " )
elif codeMode == 9 : # 发送邮件
send_email ( node [ " parameters " ] [ " emailConfig " ] )
elif codeMode == 10 : # 清空所有字段值
self . clearOutputParameters ( )
elif codeMode == 11 : # 生成新的数据行
line = new_line ( self . outputParameters ,
self . maxViewLength , self . outputParametersRecord )
self . OUTPUT . append ( line )
elif codeMode == 12 : # 退出程序
self . print_and_log ( " 根据设置的自定义操作,任务已退出|Task exited according to custom operation " )
self . saveData ( exit = True )
self . browser . quit ( )
self . print_and_log ( " 正在清理临时用户目录……|Cleaning up temporary user directory... " )
try :
shutil . rmtree ( self . option [ " tmp_user_data_folder " ] )
except :
pass
self . print_and_log ( " 清理完成!|Clean up completed! " )
os . _exit ( 0 )
else : # 0 1 5 6
output = self . execute_code (
codeMode , code , max_wait_time , iframe = params [ " iframe " ] )
@ -1106,7 +1147,25 @@ class BrowserThread(Thread):
self . recordLog (
" 判断条件内所有条件分支的条件都不满足|None of the conditions in the judgment condition are met " )
def handleHistory ( self , node , xpath , thisHistoryURL , thisHistoryLength , index , element = None , elements = None ) :
def handleHistory ( self , node , xpath , thisHandle , thisHistoryURL , thisHistoryLength , index , element = None , elements = None ) :
try :
changed_handle = self . browser . current_window_handle != thisHandle
except : # 如果网页被意外关闭了的情况下
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
changed_handle = self . browser . window_handles [ - 1 ] != thisHandle
if changed_handle : # 如果执行完一次循环之后标签页的位置发生了变化
try :
while True : # 一直关闭窗口直到当前标签页
self . browser . close ( ) # 关闭使用完的标签页
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
if self . browser . current_window_handle == thisHandle :
break
except Exception as e :
self . print_and_log ( " 关闭标签页发生错误: " , e )
self . print_and_log (
" Error occurred while closing tab: " , e )
if self . history [ " index " ] != thisHistoryLength and self . history [ " handle " ] == self . browser . current_window_handle : # 如果执行完一次循环之后历史记录发生了变化,注意当前页面的判断
difference = thisHistoryLength - self . history [ " index " ] # 计算历史记录变化差值
self . browser . execute_script ( ' history.go( ' + str ( difference ) + ' ) ' ) # 回退历史记录
@ -1132,12 +1191,13 @@ class BrowserThread(Thread):
if self . browser . current_url == thisHistoryURL or ti > thisHistoryLength : # 如果执行完一次循环之后网址发生了变化
break
time . sleep ( 2 )
if element == None : # 不固定元素列表
element = self . browser . find_elements ( By . XPATH , xpath , iframe = node [ " parameters " ] [ " iframe " ] )
else : # 固定元素列表
element = self . browser . find_element ( By . XPATH , xpath , iframe = node [ " parameters " ] [ " iframe " ] )
# if index > 0:
# index -= 1 # 如果是data:开头的网址,就要重试一次
if xpath != " " :
if element == None : # 不固定元素列表
element = self . browser . find_elements ( By . XPATH , xpath , iframe = node [ " parameters " ] [ " iframe " ] )
else : # 固定元素列表
element = self . browser . find_element ( By . XPATH , xpath , iframe = node [ " parameters " ] [ " iframe " ] )
# if index > 0:
# index -= 1 # 如果是data:开头的网址,就要重试一次
else :
if element == None :
element = elements
@ -1321,25 +1381,7 @@ class BrowserThread(Thread):
if self . BREAK :
self . BREAK = False
break
try :
changed_handle = self . browser . current_window_handle != thisHandle
except : # 如果网页被意外关闭了的情况下
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
changed_handle = self . browser . window_handles [ - 1 ] != thisHandle
if changed_handle : # 如果执行完一次循环之后标签页的位置发生了变化
try :
while True : # 一直关闭窗口直到当前标签页
self . browser . close ( ) # 关闭使用完的标签页
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
if self . browser . current_window_handle == thisHandle :
break
except Exception as e :
self . print_and_log ( " 关闭标签页发生错误: " , e )
self . print_and_log (
" Error occurred while closing tab: " , e )
index , elements = self . handleHistory ( node , xpath , thisHistoryURL , thisHistoryLength , index , elements = elements )
index , elements = self . handleHistory ( node , xpath , thisHandle , thisHistoryURL , thisHistoryLength , index , elements = elements )
if int ( node [ " parameters " ] [ " breakMode " ] ) > 0 : # 如果设置了退出循环的脚本条件
output = self . execute_code ( int (
node [ " parameters " ] [ " breakMode " ] ) - 1 , node [ " parameters " ] [ " breakCode " ] ,
@ -1381,25 +1423,7 @@ class BrowserThread(Thread):
if self . BREAK :
self . BREAK = False
break
try :
changed_handle = self . browser . current_window_handle != thisHandle
except : # 如果网页被意外关闭了的情况下
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
changed_handle = self . browser . window_handles [ - 1 ] != thisHandle
if changed_handle : # 如果执行完一次循环之后标签页的位置发生了变化
try :
while True : # 一直关闭窗口直到当前标签页
self . browser . close ( ) # 关闭使用完的标签页
self . browser . switch_to . window (
self . browser . window_handles [ - 1 ] )
if self . browser . current_window_handle == thisHandle :
break
except Exception as e :
self . print_and_log ( " 关闭标签页发生错误: " , e )
self . print_and_log (
" Error occurred while closing tab: " , e )
index , element = self . handleHistory ( node , path , thisHistoryURL , thisHistoryLength , index , element = element )
index , element = self . handleHistory ( node , path , thisHandle , thisHistoryURL , thisHistoryLength , index , element = element )
except NoSuchElementException :
self . print_and_log ( " Loop element not found: " , path )
self . print_and_log ( " 找不到循环元素: " , path )
@ -1447,6 +1471,7 @@ class BrowserThread(Thread):
code = get_output_code ( output )
if code < = 0 :
break
index , _ = self . handleHistory ( node , " " , thisHandle , thisHistoryURL , thisHistoryLength , index )
elif int ( node [ " parameters " ] [ " loopType " ] ) == 4 : # 固定网址列表
# tempList = node["parameters"]["textList"].split("\r\n")
urlList = list (
@ -1715,6 +1740,21 @@ class BrowserThread(Thread):
script = ' var result = document.evaluate(` ' + path + \
' `, document, null, XPathResult.ANY_TYPE, null);for(let i=0;i<arguments[0];i++){result.iterateNext();} result.iterateNext().click(); '
self . browser . execute_script ( script , str ( index ) ) # 用js的点击方法
elif click_way == 2 : # 双击
try :
actions = ActionChains ( self . browser ) # 实例化一个action对象
actions . double_click ( element ) . perform ( )
except Exception as e :
self . browser . execute_script ( " arguments[0].scrollIntoView(); " , element )
try :
actions = ActionChains ( self . browser ) # 实例化一个action对象
actions . double_click ( element ) . perform ( )
except Exception as e :
self . print_and_log ( f " Selenium双击元素{path}失败,将尝试使用JavaScript双击 " )
self . print_and_log ( f " Failed to double click element {path} with Selenium, will try to double click with JavaScript " )
script = ' var result = document.evaluate(` ' + path + \
' `, document, null, XPathResult.ANY_TYPE, null);for(let i=0;i<arguments[0];i++){result.iterateNext();} result.iterateNext().click(); '
self . browser . execute_script ( script , str ( index ) ) # 用js的点击方法
self . recordLog ( " 点击元素|Click element: " + path )
except TimeoutException :
self . print_and_log (
@ -1797,7 +1837,6 @@ class BrowserThread(Thread):
self . print_and_log ( " History Length Error " )
self . history [ " index " ] = 0
self . scrollDown ( param ) # 根据参数配置向下滚动
# rt.end()
def get_content ( self , p , element ) :
content = " "
@ -1824,7 +1863,7 @@ class BrowserThread(Thread):
downloadPic = 0
if downloadPic == 1 :
download_image ( self , content , " Data/Task_ " +
str ( self . id ) + " / " + self . saveName + " / " , element )
str ( self . id ) + " / " + self . saveName + " /images " , element )
else : # 普通节点
if p [ " splitLine " ] == 1 :
text = extract_text_from_html ( element . get_attribute ( ' outerHTML ' ) )
@ -1853,7 +1892,7 @@ class BrowserThread(Thread):
downloadPic = 0
if downloadPic == 1 :
download_image ( self , content , " Data/Task_ " +
str ( self . id ) + " / " + self . saveName + " / " , element )
str ( self . id ) + " / " + self . saveName + " /images " , element )
else :
command = ' var arr = []; \
var content = arguments [ 0 ] ; \
@ -1965,6 +2004,8 @@ class BrowserThread(Thread):
content = element . get_attribute ( attribute_name )
except :
content = " "
elif p [ " contentType " ] == 15 : # 常量值
content = p [ " JS " ]
if content == None :
content = " "
return content
@ -2208,7 +2249,7 @@ if __name__ == '__main__':
" server_address " : " http://localhost:8074 " ,
" keyboard " : True , # 是否监听键盘输入
" pause_key " : " p " , # 暂停键
" version " : " 0.6.0 " ,
" version " : " 0.6.2 " ,
}
c = Config ( config )
print ( c )
@ -2283,7 +2324,9 @@ if __name__ == '__main__':
options . add_argument (
" --disable-blink-features=AutomationControlled " ) # TMALL 反扒
# 阻止http -> https的重定向
options . add_argument ( " --disable-features=CrossSiteDocumentBlockingIfIsolating,CrossSiteDocumentBlockingAlways,IsolateOrigins,site-per-process " )
options . add_argument ( " --disable-web-security " ) # 禁用同源策略
options . add_argument ( ' -ignore-certificate-errors ' )
options . add_argument ( ' -ignore -ssl-errors ' )
@ -2370,8 +2413,8 @@ if __name__ == '__main__':
cloudflare = 0
if cloudflare == 0 :
options . add_argument ( ' log-level=3 ' ) # 隐藏日志
path = os . path . join ( os . path . abspath ( " ./ " ) , " Data " , " Task_ " + str ( id ) )
print ( " Data path:" , path )
path = os . path . join ( os . path . abspath ( " ./ " ) , " Data " , " Task_ " + str ( id ) , " files " )
print ( " 文件下载路径|File Download path:" , path )
options . add_experimental_option ( " prefs " , {
# 设置文件下载路径
" download.default_directory " : path ,