该例子为用watchdog来监视新文件,当新文件来时候,调用相应的解析脚本,进行解析入库。
目录:
-scripts
--脚本1.py
--脚本2.py
-tmp
--已处理的文件1,
--已处理的文件2
config.py
watchdog.py
watchdog.py 文件,该业务只监听新文件创建的事件:
# coding=utf8import sysimport timeimport loggingimport impimport reimport scripts.CONFIGfrom watchdog.observers import Observerfrom watchdog.events import LoggingEventHandlerfrom watchdog.events import FileSystemEventHandlerclass CreatedEventHandler(FileSystemEventHandler): def __init__(self): FileSystemEventHandler.__init__(self) def on_created(handler,event): file_name = event.src_path[2:] print '--'+file_name moduleName = '' for key in parse_map.keys(): if(re.match(key,file_name)): moduleName = parse_map[key] break if(moduleName != ''): try: #动态加载相应的module parseModule = imp.load_module(moduleName,*imp.find_module(moduleName,['./scripts/'])) print ' load module: ' + moduleName parseModule.parse(file_name) except Exception,e: print e#正则匹配,将文件match到相应的解析脚本上parse_map={ '^test.xlsx$':'test', '^emt_finance.*\.xlsx':'emt_finance'}if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S') path = sys.argv[1] if len(sys.argv) > 1 else '.' event_handler = CreatedEventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=False) observer.start() print 'Watching...' try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
解析脚本test.py
# FileName: test.py# a simple test code import xlrdimport MySQLdbimport datetimeimport osimport statimport shutildef parse(file): values = [] if(file.split('.')[-1] != 'xlsx'): print '---skip' + file return try: data = xlrd.open_workbook(file) table = data.sheets()[0] for i in range(1,table.nrows): row = table.row_values(i) #excel date is the days from 1899/12/30 row[0] = datetime.date(1899,12,30) + datetime.timedelta(row[0]) values.append(row) except Exception,e: print e #print values try: conn = MySQLdb.connect(config.mysql_host,config.mysql_user,config.mysql_passwd,'test',config.mysql_port) cur = conn.cursor() #values[0][5]=4 for v in values: count = cur.execute('replace into testtable values(%s,%s,%s,%s,%s,%s)',v) conn.commit() print ' parse complete.' ''' results=cur.fetchmany(5) for r in results: print r ''' cur.close() conn.close() #os.remove('tmp/test.pyc') if(os.path.exists('tmp/'+file)): os.chmod('tmp/'+file,stat.S_IWRITE)#去掉只读属性 os.remove('tmp/'+file) #删除它 shutil.move(file,'tmp/') print ' move filt to temp: ' + file print ' success!' except MySQLdb.Error,e: print "Mysql Error %d: %s" % (e.args[0], e.args[1])if __name__ == '__main__': exec "import CONFIG as config" print '=='*10 #os.remove('../tmp/test.xlsx') #shutil.move('../test.xlsx','tmp/') #parse('../test.xlsx')else: exec "import scripts.CONFIG as config"