博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python监控目录和文件变化
阅读量:6946 次
发布时间:2019-06-27

本文共 6193 字,大约阅读时间需要 20 分钟。

一、os.listdir

import os, timepath_to_watch = "."before = dict ([(f, None) for f in os.listdir (path_to_watch)])while 1:  time.sleep (10)  after = dict ([(f, None) for f in os.listdir (path_to_watch)])  added = [f for f in after if not f in before]  removed = [f for f in before if not f in after]  if added: print "Added: ", ", ".join (added)  if removed: print "Removed: ", ", ".join (removed)  before = after

二、FindFirstChangeNotification

import osimport win32fileimport win32eventimport win32conpath_to_watch = os.path.abspath (".")## FindFirstChangeNotification sets up a handle for watching#  file changes. The first parameter is the path to be#  watched; the second is a boolean indicating whether the#  directories underneath the one specified are to be watched;#  the third is a list of flags as to what kind of changes to#  watch for. We're just looking at file additions / deletions.#change_handle = win32file.FindFirstChangeNotification (  path_to_watch,  0,  win32con.FILE_NOTIFY_CHANGE_FILE_NAME)## Loop forever, listing any file changes. The WaitFor... will#  time out every half a second allowing for keyboard interrupts#  to terminate the loop.#try:  old_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])  while 1:    result = win32event.WaitForSingleObject (change_handle, 500)    #    # If the WaitFor... returned because of a notification (as    #  opposed to timing out or some error) then look for the    #  changes in the directory contents.    #    if result == win32con.WAIT_OBJECT_0:      new_path_contents = dict ([(f, None) for f in os.listdir (path_to_watch)])      added = [f for f in new_path_contents if not f in old_path_contents]      deleted = [f for f in old_path_contents if not f in new_path_contents]      if added: print "Added: ", ", ".join (added)      if deleted: print "Deleted: ", ", ".join (deleted)      old_path_contents = new_path_contents      win32file.FindNextChangeNotification (change_handle)finally:  win32file.FindCloseChangeNotification (change_handle)

三、ReadDirectoryChanges 

 
#coding:utf8 #author:lcamry
import osimport win32fileimport win32conACTIONS = {  1 : "Created",  2 : "Deleted",  3 : "Updated",  4 : "Renamed from something",  5 : "Renamed to something"}# Thanks to Claudio Grondi for the correct set of numbersFILE_LIST_DIRECTORY = 0x0001path_to_watch = "."hDir = win32file.CreateFile (  path_to_watch,  FILE_LIST_DIRECTORY,  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,  None,  win32con.OPEN_EXISTING,  win32con.FILE_FLAG_BACKUP_SEMANTICS,  None)while 1:  #  # ReadDirectoryChangesW takes a previously-created  # handle to a directory, a buffer size for results,  # a flag to indicate whether to watch subtrees and  # a filter of what changes to notify.  #  # NB Tim Juchcinski reports that he needed to up  # the buffer size to be sure of picking up all  # events when a large number of files were  # deleted at once.  #  results = win32file.ReadDirectoryChangesW (    hDir,    1024,    True,    win32con.FILE_NOTIFY_CHANGE_FILE_NAME |     win32con.FILE_NOTIFY_CHANGE_DIR_NAME |     win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |     win32con.FILE_NOTIFY_CHANGE_SIZE |     win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |     win32con.FILE_NOTIFY_CHANGE_SECURITY,    None,    None  )  for action, file in results:    full_filename = os.path.join (path_to_watch, file)    print full_filename, ACTIONS.get (action, "Unknown")

四、watchdog

 
#coding:utf8 #author:lcamry
from watchdog.observers import Observerfrom watchdog.events import *import timeclass FileEventHandler(FileSystemEventHandler):    def __init__(self):        FileSystemEventHandler.__init__(self)    def on_moved(self, event):        if event.is_directory:            print("directory moved from {0} to {1}".format(event.src_path,event.dest_path))        else:            print("file moved from {0} to {1}".format(event.src_path,event.dest_path))    def on_created(self, event):        if event.is_directory:            print("directory created:{0}".format(event.src_path))        else:            print("file created:{0}".format(event.src_path))    def on_deleted(self, event):        if event.is_directory:            print("directory deleted:{0}".format(event.src_path))        else:            print("file deleted:{0}".format(event.src_path))    def on_modified(self, event):        if event.is_directory:            print("directory modified:{0}".format(event.src_path))        else:            print("file modified:{0}".format(event.src_path))if __name__ == "__main__":    observer = Observer()    event_handler = FileEventHandler()    observer.schedule(event_handler,"d:/dcm",True)    observer.start()    try:        while True:            time.sleep(1)    except KeyboardInterrupt:        observer.stop()    observer.join()

五、linux下pyinotify

#coding:utf8 #author:lcamry import osimport pyinotifyfrom functions import * WATCH_PATH = '' #监控目录 if not WATCH_PATH:  wlog('Error',"The WATCH_PATH setting MUST be set.")  sys.exit()else:  if os.path.exists(WATCH_PATH):    wlog('Watch status','Found watch path: path=%s.' % (WATCH_PATH))  else:    wlog('Error','The watch path NOT exists, watching stop now: path=%s.' % (WATCH_PATH))    sys.exit() class OnIOHandler(pyinotify.ProcessEvent):  def process_IN_CREATE(self, event):    wlog('Action',"create file: %s " % os.path.join(event.path,event.name))   def process_IN_DELETE(self, event):    wlog('Action',"delete file: %s " % os.path.join(event.path,event.name))   def process_IN_MODIFY(self, event):    wlog('Action',"modify file: %s " % os.path.join(event.path,event.name)) def auto_compile(path = '.'):  wm = pyinotify.WatchManager()  mask = pyinotify.IN_CREATE | pyinotify.IN_DELETE | pyinotify.IN_MODIFY  notifier = pyinotify.ThreadedNotifier(wm, OnIOHandler())  notifier.start()  wm.add_watch(path, mask,rec = True,auto_add = True)  wlog('Start Watch','Start monitoring %s' % path)  while True:    try:      notifier.process_events()      if notifier.check_events():        notifier.read_events()    except KeyboardInterrupt:      notifier.stop()      break if __name__ == "__main__":   auto_compile(WATCH_PATH)

 

转载于:https://www.cnblogs.com/lcamry/p/8392376.html

你可能感兴趣的文章
Win10下安装Ubuntu16.04虚拟机并搭建TensorFlow1.3环境
查看>>
leetcode 108. Convert Sorted Array to Binary Search Tree
查看>>
【商城购物车】购物车逻辑
查看>>
PCIE协议解析 synopsys IP loopback 读书笔记(1)
查看>>
关于小程序你需要知道的事
查看>>
表服务器无法打开与报表服务器数据库的连接。所有请求和处理都要求与数据库建立连接...
查看>>
4月第4周业务风控关注 | 网络犯罪经济每年1.5万亿美元 GDP居全球第12位
查看>>
idea中gitlab新创建分支查找不到的原因
查看>>
php调试时echo,print_r(),var_dump()的区别
查看>>
vue 作用域插槽
查看>>
tfs 2013 利用 web deploy 完成asp.net站点自动发布
查看>>
dom对象常用的属性和方法有哪些?
查看>>
C#遍历XmlDocument对象所有节点名称、类型、属性(Attribute)
查看>>
范畴论-一个单子(Monad)说白了不过就是自函子范畴上的一个幺半群而已
查看>>
Spring cloud系列之Zuul配置项中sensitiveHeaders和ignoredHeaders
查看>>
51单片机交通灯(定时器+38译码器+中断)
查看>>
vue 总结
查看>>
深入理解java虚拟机(二):java内存溢出实战
查看>>
31.QPainter-rotate()函数分析-文字旋转不倾斜,图片旋转实现等待
查看>>
直接通过Binder的onTransact完成跨进程通信
查看>>