[Gluster-users] GlusterFS and inotify (more info)

Harshavardhana harsha at gluster.com
Tue Jan 26 13:18:31 UTC 2010


Hi Larry,

    May i know the glusterfs version under use?. Can you use stat-prefetch
on
    the client side above write-behind?.

--
Harshavardhana
Gluster - http://www.gluster.com


On Tue, Jan 26, 2010 at 6:41 PM, Larry Bates <larry.bates at vitalesafe.com>wrote:

>  Sure if you want to take a look.
>
>
>
> -Larry
>
>
>
> Two servers (gfs01 and gfs02) configured with following scripts (they
> differ only by server names):
>
>
>
> volume vol1
>
>   type storage/posix                          # POSIX FS translator
>
>   option directory /mnt/glusterfs/vol1        # Export this directory
>
>   option background-unlink yes                # unlink in background
>
> end-volume
>
>
>
> volume vol2
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol2
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol3
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol3
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol4
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol4
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol5
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol5
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol6
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol6
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol7
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol7
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume vol8
>
>   type storage/posix
>
>   option directory /mnt/glusterfs/vol8
>
>   option background-unlink yes
>
> end-volume
>
>
>
> volume iot1
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol1
>
> end-volume
>
>
>
> volume iot2
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol2
>
> end-volume
>
>
>
> volume iot3
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol3
>
> end-volume
>
>
>
> volume iot4
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol4
>
> end-volume
>
>
>
> volume iot5
>
>   type performance/io-threads
>
>   option thread-count 8
>
>   subvolumes vol5
>
> end-volume
>
>
>
> volume iot6
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol6
>
> end-volume
>
>
>
> volume iot7
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol7
>
> end-volume
>
>
>
> volume iot8
>
>   type performance/io-threads
>
>   option thread-count 4
>
>   subvolumes vol8
>
> end-volume
>
>
>
> volume gfs01brick1
>
>   type features/locks
>
>   subvolumes iot1
>
> end-volume
>
>
>
> volume gfs01brick2
>
>   type features/locks
>
>   subvolumes iot2
>
> end-volume
>
>
>
> volume gfs01brick3
>
>   type features/locks
>
>   subvolumes iot3
>
> end-volume
>
>
>
> volume gfs01brick4
>
>   type features/locks
>
>   subvolumes iot4
>
> end-volume
>
>
>
> volume gfs01brick5
>
>   type features/locks
>
>   subvolumes iot5
>
> end-volume
>
>
>
> volume gfs01brick6
>
>   type features/locks
>
>   subvolumes iot6
>
> end-volume
>
>
>
> volume gfs01brick7
>
>   type features/locks
>
>   subvolumes iot7
>
> end-volume
>
>
>
> volume gfs01brick8
>
>   type features/locks
>
>   subvolumes iot8
>
> end-volume
>
>
>
> ## Add network serving capability to volumes
>
> volume server
>
>   type protocol/server
>
>   option transport-type tcp                    # For TCP/IP transport
>
>   #
>
>   # Expose all the bricks to the clients
>
>   #
>
>   subvolumes gfs01brick1 gfs01brick2 gfs01brick3 gfs01brick4 gfs01brick5
> gfs01brick6 gfs01brick7 gfs01brick8
>
>   option auth.addr.gfs01brick1.allow 10.0.0.*
>
>   option auth.addr.gfs01brick2.allow 10.0.0.*
>
>   option auth.addr.gfs01brick3.allow 10.0.0.*
>
>   option auth.addr.gfs01brick4.allow 10.0.0.*
>
>   option auth.addr.gfs01brick5.allow 10.0.0.*
>
>   option auth.addr.gfs01brick6.allow 10.0.0.*
>
>   option auth.addr.gfs01brick7.allow 10.0.0.*
>
>   option auth.addr.gfs01brick8.allow 10.0.0.*
>
> end-volume
>
>
>
> Client config:
>
>
>
> #
>
> # Add client feature and attach to remote subvolumes of gfs01
>
> #
>
> volume gfs01vol1
>
>   type protocol/client
>
>   option transport-type tcp            # for TCP/IP transport
>
>   option remote-host gfs01             # IP/DNS address of the remote
> volume
>
>   option remote-subvolume gfs01brick1  # name of the remote volume
>
>   option transport.socket.nodelay on   # undocumented option for speed
>
> end-volume
>
>
>
> volume gfs01vol2
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick2
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol3
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick3
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol4
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick4
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol5
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick5
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol6
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick6
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol7
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick7
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol8
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs01
>
>   option remote-subvolume gfs01brick8
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> #
>
> # Add client feature and attach to remote subvolumes of gfs02
>
> #
>
> volume gfs02vol1
>
>   type protocol/client
>
>   option transport-type tcp            # for TCP/IP transport
>
>   option remote-host gfs02             # IP/DNS address of the remote
> volume
>
>   option remote-subvolume gfs02brick1  # name of the remote volume
>
>   option transport.socket.nodelay on   # undocumented option for speed
>
> end-volume
>
>
>
> volume gfs02vol2
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick2
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol3
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick3
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol4
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick4
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol5
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick5
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol6
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick6
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol7
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick7
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol8
>
>   type protocol/client
>
>   option transport-type tcp
>
>   option remote-host gfs02
>
>   option remote-subvolume gfs02brick8
>
>   option transport.socket.nodelay on
>
> end-volume
>
>
>
> #
>
> # Replicate volumes
>
> #
>
> volume afr-vol1
>
>   type cluster/replicate
>
>   subvolumes gfs01vol1 gfs02vol1
>
> end-volume
>
>
>
> volume afr-vol2
>
>   type cluster/replicate
>
>   subvolumes gfs01vol2 gfs02vol2
>
> end-volume
>
>
>
> volume afr-vol3
>
>   type cluster/replicate
>
>   subvolumes gfs01vol3 gfs02vol3
>
> end-volume
>
>
>
> volume afr-vol4
>
>   type cluster/replicate
>
>   subvolumes gfs01vol4 gfs02vol4
>
> end-volume
>
>
>
> volume afr-vol5
>
>   type cluster/replicate
>
>   subvolumes gfs01vol5 gfs02vol5
>
> end-volume
>
>
>
> volume afr-vol6
>
>   type cluster/replicate
>
>   subvolumes gfs01vol6 gfs02vol6
>
> end-volume
>
>
>
> volume afr-vol7
>
>   type cluster/replicate
>
>   subvolumes gfs01vol7 gfs02vol7
>
> end-volume
>
>
>
> volume afr-vol8
>
>   type cluster/replicate
>
>   subvolumes gfs01vol8 gfs02vol8
>
> end-volume
>
>
>
> #
>
> # Distribute files across bricks
>
> #
>
> volume dht-vol
>
>   type cluster/distribute
>
>   subvolumes afr-vol1 afr-vol2 afr-vol3 afr-vol4 afr-vol5 afr-vol6 afr-vol7
> afr-vol8
>
>   option min-free-disk 2%               # 2% of 1.8Tb volumes is 36Gb
>
> end-volume
>
>
>
> #
>
> # Writebehind performance addition
>
> #
>
> volume writebehind
>
>   type performance/write-behind
>
>   subvolumes dht-vol
>
>   option flush-behind on    # default value is 'off'
>
>   option cache-size 3MB
>
> end-volume
>
>
>
> The script is here, but I would need to include a bunch of things that get
> imported for it to run for you.
>
>
>
> import os
>
> import sys
>
> import time
>
> import getopt
>
> import signal
>
> import shutil
>
> import ConfigParser
>
> from ConfigParser import NoOptionError
>
> #
>
> # Get helpers
>
> #
>
> from loggerClass import loggerClass, loggerClassMixin
>
> import VESutils
>
> from VESutils import signon, getFromINI, blobpathfromblobid,
> blobidfromblobpath
>
> from VESutils import getAccountIdFromBlobId, getMemberInfo
>
> from VESutils import epochtime2S3time
>
> from VESutils import elapsedTimeToString
>
> from fmt_wcommas import FMC
>
> from singleinstance import singleinstance
>
> from globalconfig import globalinifilename
>
> #
>
> # Get S3 connection class
>
> #
>
> from boto.s3.connection import S3Connection, S3ResponseError
>
> from boto.exception import S3ResponseError, S3CreateError, S3DataError
>
>
>
> import pyinotify
>
> #from pyinotify import WatchManager, Notifier, ProcessEvent
>
> from xattr import xattr
>
> #
>
> # Get postgreSQL DBI interface
>
> #
>
> import psycopg2
>
>
>
> class Watcher(pyinotify.ProcessEvent, loggerClassMixin):
>
>     '''
>
>     default maximum for max_user_watches is 8192 (FC5)
>
>     set by logging in as root and entering following command:
>
>
>
>     sysctl -w fs.inotify.max_user_watches=65536
>
>     '''
>
>
>
>     def __init__(self, conn, bucket,
>
>                  logf = None, _trace = False, _debug = 0, _dryrun = False):
>
>
>
>         '''
>
>         MemberInfo - dictionary-like object holding cached websafe.members
> info
>
>         bucket = S3 bucket instance where new files are to be uploaded
>
>         '''
>
>         self.conn = conn
>
>         self.bucket = bucket
>
>         self._trace = _trace
>
>         self._debug = _debug
>
>         self._dryrun = _dryrun
>
>         #self.BACKUPS = '/mnt/BACKUPS/blobdata'
>
>         self.BACKUPS = None
>
>         #
>
>         # Cache members_info as I go to save DB accesses
>
>         #
>
>         self.members_info = dict()
>
>         #
>
>         # If user passed loggerClass instance use it, otherwise logging
> will
>
>         # go to screen (as provided for by loggerClassMixin.LOGF).
>
>         #
>
>         if logf is not None:
>
>             self.logf = logf
>
>
>
>         pyinotify.ProcessEvent.__init__(self)
>
>         self.len_eventq = 0
>
>         self.progress_count = 0
>
>
>
>     def process_PROGRESS(self, notifier):
>
>         len_eventq = len(notifier._eventq)
>
>         LOGF = self.LOGF
>
>         LM = "PROGRESS"
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>             LOGF("I", LM, "len_eventq=%s" % FMC(len_eventq))
>
>
>
>         self.progress_count += 1
>
>         #
>
>         # If eventq is shorter than last time, notifier didn't call
>
>         # .read_events() so I might need to do it here.  This code needs a
>
>         # second look because there are LONG pauses currently.
>
>         #
>
>         if len_eventq < self.len_eventq:
>
>             #
>
>             # It is too expensive to update the eventq on every file that
>
>             # is processed, so I will do it on every 1000th file as a
>
>             # compromise.
>
>             #
>
>             if (self.progress_count % 1000) == 0:
>
>                 notifier.read_events()
>
>
>
>         self.len_eventq = len_eventq
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>
>
>     def process_IN_Q_OVERFLOW(self, event):
>
>         '''
>
>         process_IN_Q_OVERFLOW - this is fired when events queue overflows.
>
>         '''
>
>         LM = 'process_IN_Q_OVERFLOW'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         LOGF("E", LM, "Queue overflow, set max_queued_events higher")
>
>         LOGF("E", LM, "sudo /sbin/sysctl -w
> fs.inotify.max_queued_events=#####")
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>         raise OverflowError('len_eventq= %s' % FMC(self.self.len_eventq))
>
>
>
>
>
>     def process_IN_MOVED_TO(self, event):
>
>         '''
>
>         process_IN_MOVED_TO - this is fired when upload .tmp file is moved
> to
>
>                               its final resting place.
>
>         '''
>
>         LM = 'process_IN_MOVED_TO'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         self._backup('M', event)
>
>         self._upload('M', event)
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>
>
>     def process_IN_DELETE(self, event):
>
>         LM = 'process_IN_DELETE'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         self._delete(event)
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>
>
>     def _delete(self, event):
>
>         LM = '_delete'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         src = os.path.join(event.path, event.name)
>
>         blob_id = blobidfromblobpath(src)
>
>         if self._debug:
>
>             LOGF("I", LM, "[%s] file=%s" %  (event.name, src))
>
>
>
>         #
>
>         # Make sure a .tmp file didn't trigger this event
>
>         #
>
>         if event.name.endswith('.tmp') or \
>
>            event.name.startswith('.'):
>
>
>
>             if self._debug:
>
>                 LOGF("I", LM, ".tmp file, skipped")
>
>
>
>             if self._trace:
>
>                 LOGF("T", LM, "Leaving")
>
>
>
>             return
>
>
>
>         #
>
>         # Also make sure that the file is a blob.  blobs have 28 character
>
>         # hex filenames.
>
>         #
>
>         if len(os.path.basename(event.name)) != 28:
>
>             if self._debug:
>
>                 LOGF("I", LM, "%s non-blob file deletion skiped")
>
>             return
>
>
>
>         #
>
>         # Make sure file hasn't "disappeared"
>
>         #
>
>         if not os.path.exists(src):
>
>             LOGF("W", LM, "src=%s, disappeared, skipped" % src)
>
>             if self._trace:
>
>                 LOGF("T", LM, "Leaving")
>
>
>
>             return
>
>
>
>         #
>
>         # Get S3path and delete the blob from S3
>
>         #
>
>         S3path = blobpathfromblobid('/', blob_id)
>
>         result = self.bucket.delete_key(S3path)
>
>         #
>
>         # See if I've got the information in file_data
>
>         #
>
>         email = None
>
>         account_id = getAccountIdFromBlobId(self.conn, blob_id)
>
>         if account_id is not None:
>
>             member_info = self.members_info.get(account_id,
>
>                             getMemberInfo(self.conn, account_id =
> account_id)
>
>                             )
>
>
>
>             if member_info is not None:
>
>                 email = member_info['email']
>
>
>
>         else:
>
>             LOGF("W", LM, "blob_id=%s not found in file_data" % \
>
>                           blob_id)
>
>
>
>         LOGF("I", LM, "[D]blob_id=%s email=%s" % (blob_id, email))
>
>         #
>
>         # If we are keeping local backup, delete it also
>
>         #
>
>         if self.BACKUPS is not None:
>
>             dst = blobpathfromblobid(self.BACKUPS, blob_id)
>
>             try:
>
>                  os.unlink(dst)
>
>
>
>             except: # Ugly but it works!
>
>                  pass
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>     def process_IN_ATTRIB(self, event):
>
>         '''
>
>         process_IN_ATTRIB - this is fired when an blob file has an
> attribute
>
>                             changed.  Normally attributes won't change, but
>
>                             this can be used to trigger me to do an upload
>
>                             on a blob file after-the-fact and provides for
> a
>
>                             'self-healing' filesystem.
>
>         '''
>
>         LM = 'process_IN_ATTRIB'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         self._backup('A', event)
>
>         self._upload('A',event)
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>     def _backup(self, parent, event):
>
>         LM = '_backup'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         if self._debug > 1:
>
>             LOGF("D", LM, "event.path=%s, event.name=%s" % \
>
>                           (event.path, event.name))
>
>
>
>         if self.BACKUPS is not None:
>
>             src = os.path.join(event.path, event.name)
>
>             blob_id = blobidfromblobpath(src)
>
>             if self._debug:
>
>                 LOGF("D", "[%s]" % parent, "src=%s" % src)
>
>
>
>             #
>
>             # Make sure a non-blob (.tmp/hidden) file didn't trigger this
> event
>
>             #
>
>             if event.name.endswith('.tmp') or \
>
>                event.name.startswith('.') or \
>
>                len(os.path.basename(event.name)) != 28:
>
>
>
>                 if self._debug:
>
>                     LOGF("I", LM, "non-blob file, skipped")
>
>
>
>                 if self._trace:
>
>                     LOGF("T", LM, "Leaving")
>
>
>
>                 return
>
>
>
>             #
>
>             # Copy the file to backup folder (iff it doesn't exist)
>
>             #
>
>             dst = blobpathfromblobid(self.BACKUPS, blob_id)
>
>             if not os.path.exists(dst):
>
>                 try:
>
>                     shutil.copy2(src, dst)
>
>
>
>                 except:
>
>                     LOGF("E", LM, "%s->%s backup failed" % (src, dst))
>
>                     ##raise
>
>
>
>         else:
>
>             if self._debug:
>
>                 LOGF("E", LM, "BACKUPS currently disabled, skipped")
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>
>
>     def _upload(self, parent, event):
>
>         LM = '_upload'
>
>         LOGF = self.LOGF
>
>         if self._trace:
>
>             LOGF("T", LM, "Entering")
>
>
>
>         if self._debug > 1:
>
>             LOGF("D", LM, "event.path=%s, event.name=%s" % \
>
>                           (event.path, event.name))
>
>
>
>         src = os.path.join(event.path, event.name)
>
>         if self._debug:
>
>             LOGF("D", "[%s]" % parent, "src=%s" % src)
>
>
>
>         #
>
>         # Make sure a .tmp file didn't trigger this event
>
>         #
>
>         if event.name.endswith('.tmp') or \
>
>            event.name.startswith('.'):
>
>
>
>             if self._debug:
>
>                 LOGF("I", LM, ".tmp file, skipped")
>
>
>
>             if self._trace:
>
>                 LOGF("T", LM, "Leaving")
>
>
>
>             return
>
>
>
>         #
>
>         # Also make sure that the file is a blob.  blobs have 28 character
>
>         # hex filenames.
>
>         #
>
>         if len(os.path.basename(event.name)) != 28:
>
>             if self._debug:
>
>                 LOGF("I", LM, "%s non-blob file skiped")
>
>             return
>
>
>
>         #
>
>         # Make sure file hasn't "disappeared"
>
>         #
>
>         if not os.path.exists(src):
>
>             LOGF("W", LM, "src=%s, not found, skipped" % src)
>
>             if self._trace:
>
>                 LOGF("T", LM, "Leaving")
>
>
>
>             return
>
>
>
>         #
>
>         # See if I've got the information in file_data
>
>         #
>
>         blob_id = blobidfromblobpath(src)
>
>         email = None
>
>         account_id = getAccountIdFromBlobId(self.conn, blob_id)
>
>         if account_id is not None:
>
>             member_info = self.members_info.get(account_id,
>
>                             getMemberInfo(self.conn, account_id =
> account_id)
>
>                             )
>
>
>
>             if member_info is not None:
>
>                 email = member_info['email']
>
>
>
>
>
>         else:
>
>             LOGF("W", LM, "blob_id=%s not found in file_data" % \
>
>                           blob_id)
>
>
>
>         S3path = blobpathfromblobid('/', blob_id)
>
>         size = os.path.getsize(src)
>
>         #
>
>         # Create a new key instance for S3 and pass in the
>
>         # meta-information
>
>         #
>
>         k = self.bucket.new_key(key_name = S3path)
>
>         #
>
>         # Meta-data needed to restore a file from S3 to local filesystem
>
>         # (e.g. to set ctime, mtime properly to work with rsync)
>
>         #
>
>         ctime = os.path.getctime(src)
>
>         mtime = os.path.getmtime(src)
>
>         VESctime = epochtime2S3time(ctime)
>
>         VESmtime = epochtime2S3time(mtime)
>
>         if self._debug > 1:
>
>             LOGF("D", LM, "setting VESctime=%s" % VESctime)
>
>             LOGF("D", LM, "setting VESmtime=%s" % VESmtime)
>
>
>
>         k.set_metadata('ctime', VESctime)
>
>         k.set_metadata('mtime', VESmtime)
>
>         age = time.time() - ctime
>
>         LOGF("I", LM, "[%s]%-28s %s(%11s)[%s]" % \
>
>                       (parent,
>
>                        email[:28],
>
>                        blob_id,
>
>                        FMC(size),
>
>                        elapsedTimeToString(age)
>
>                        )
>
>              )
>
>
>
>         if not self._dryrun:
>
>             #
>
>             # Upload the file to S3.  Use replace=False to short circuit
>
>             # the upload if the file already exists.  That way I'm not
> using
>
>             # upload bandwidth unnecessarily.
>
>             #
>
>             k.set_contents_from_filename(src, replace = False)
>
>             if self._trace:
>
>                 LOGF("I", LM, "done")
>
>
>
>         if self._trace:
>
>             LOGF("T", LM, "Leaving")
>
>
>
>
>
> def sigTERM(signal, frame):
>
>     global stopnow
>
>     stopnow = True
>
>
>
> def notifyCallback(notifier):
>
>     global _trace, LOGF
>
>     global stopnow
>
>     LM = 'notifyCallback'
>
>     if stopnow:
>
>         notifier.stop()
>
>
>
>
>
> def conlog(severity, LM, msg):
>
>     global _trace, _debug, _quiet, LOGF
>
>     if msg:
>
>         LOGF(severity, "%s-%s" % (LM,msg))
>
>
>
>     if not _quiet:
>
>         sys.stdout.write(msg + '\n')
>
>         sys.stdout.flush()
>
>
>
> pgm_ = 'monitorblobs'
>
> ver_ = '1.1.1.0'
>
> LM = 'main'
>
> #
>
> # V1.0.0.1 13 JUL 08 LAB Wrapped call to S3 to get HEAD after uploading (to
>
> #                        access last_modified meta-data) in while-try block
>
> #                        to work around boto issue #125)
>
> #
>
> # V1.0.0.2 13 JUL 08 LAB Added .strip() to arguments to work with
>
> #                         supervisord
>
> #
>
> # V1.0.0.3 14 JUL 08 LAB Added start program separator to logfile, fixed
>
> #                        typo (Xattrs instead of xattrs)
>
> #
>
> # V1.0.0.4 14 JUL 08 LAB Wrapped all boto calls inside _mexe framework
>
> #                        as a workaround to the bug #125.
>
> #
>
> # V1.0.0.5 15 JUL 08 LAB Removed the IN_MOVED_TO handler because every
> upload
>
> #                        triggered the IN_ATTRIB handler as well.
>
> #
>
> # V1.1.0.0 17 JUL 08 LAB Upgrded to V0.8 of pyinotify, replaced hard-coded
>
> #                        checking of the inotify variables with newly
>
> #                        provided SysCtlNotify class.
>
> #
>
> # V1.1.0.1 26 JUL 08 LAB Added hack to log len(_eventq) for debugging
>
> #
>
> # V1.1.0.2 29 JUL 08 LAB Added dbTableCache so I can display member info
>
> #                        in logfile as files are processed (unfinished
> until
>
> #                        fileid xattr is set from upload processing code).
>
>
>
> # V1.1.0.3 07 MAR 09 LAB Introduced getFromINI and _<runtimeparm> naming,
>
> #                        removed _mexe from class because it is now inside
> boto
>
> #
>
> # V1.1.0.4 12 MAR 09 LAB Shortened some logging messages, put others under
>
> #                        _trace control
>
> #
>
> # V1.1.0.5 30 MAR 09 LAB Added code to _upload that skips files that start
>
> #                        with a period (.).  These files are generated when
>
> #                        doing an rsync recovery on a monitored branch.
>
> #
>
> # V1.1.0.6 11 DEC 09 LAB Introduced the VES_STORAGE_S3 environment variable
>
> #                        that points to bucket_name
>
> #
>
> # V1.1.0.7 27 DEC 09 LAB Added copy to backup method to Watcher class (this
>
> #                        is temporary because I'm moving to GlusterFS auto
>
> #                        replication).
>
> #
>
> # V1.1.0.8 04 JAN 10 LAB Cleaned up upload code, eliminated xattr handlers
>
> #
>
> # V1.1.0.9 10 JAN 10 LAB Better logging for _delete, _upload.  Cleaned up
>
> #                        code for making/skipping backups (which won't be
>
> #                        needed when I switch to GlusterFS).
>
> #
>
> # V1.1.1.0 23 JAN 10 LAB Introduced globalconfig, removed BACKUPS because
>
> #                        of cutover to GlusterFS
>
> #
>
> # Register the signal handler
>
> #
>
> signal.signal(signal.SIGTERM, sigTERM)
>
> stopnow = False
>
>
>
> PGM_usage='''
>
>    Usage: %s [OPTIONS]
>
>    Monitor blobdata folders for changes and process files
>
>
>
>    -t, --trace            run in trace mode
>
>    -d, --debug=           run at debug level= (e.g 1,2,3)
>
>    -q, --quiet            run in quiet mode (minimal output)
>
>    -D, --dryrun           dry run, no changes
>
>    -l, --logfilename=     specify the path/filename to .LOG file
>
>    -i, --inifilename=     specify the path/filename to .INI file
>
>    -h, --help             help (this screen)
>
>    -V, --version          output version information and stop
>
>     ''' % pgm_
>
>
>
> #
>
> # Get the options that the user may have passed on processor call line
>
> #
>
> short_args = "td:ql:i:hVD"
>
> long_args = ["trace", "debug=", "quiet", "logfilename=",
>
>              "inifilename=", "help", "version", "dryrun",
>
>              ]
>
>
>
> #
>
> # Process command line options
>
> #
>
> opts, args = getopt.gnu_getopt(sys.argv[1:], short_args, long_args)
>
> #
>
> # Set inital values for potential argv parameters
>
> #
>
> _trace = False
>
> _debug = 0
>
> _quiet = False
>
> _dryrun = False
>
> _logfilename = None
>
> _inifilename = None
>
> #
>
> # Parse command line options
>
> #
>
> for option, value in opts:
>
>     if option in ("-h", "--help"):
>
>         sys.exit(PGM_usage)
>
>
>
>     if option in ("-V", "--version"):
>
>         print '%s Version %s' % (pgm_, ver_)
>
>         sys.exit(0)
>
>
>
>     if   option in ("-t", "--trace"):       _trace = True
>
>     elif option in ("-d", "--debug"):       _debug = int(value)
>
>     elif option in ("-q", "--quiet"):       _quiet = 1
>
>     elif option in ("-D", "--dryrun"):      _dryrun = True
>
>     elif option in ("-l", "--logfilename"): _logfilename = value.strip()
>
>     elif option in ("-i", "--inifilename"): _inifilename = value.strip()
>
>
>
> #
>
> # If user didn't specify inifilename on processor call, use default
>
> #
>
> if _inifilename is None:
>
>     _inifilename = "%s.ini" % pgm_
>
>
>
> if not os.path.exists(globalinifilename):
>
>     msg = "inifilename=%s, not found, aborting" % globalinifilename
>
>     raise RuntimeError(msg)
>
>
>
> if not os.path.exists(_inifilename):
>
>     msg = "inifilename=%s, not found, aborting" % _inifilename
>
>     #print "%s-%s.%s" % ("E", LM, msg)
>
>     raise RuntimeError(msg)
>
>
>
> #
>
> # Create ConfigParser instance to read .INI information
>
> #
>
> INI = ConfigParser.ConfigParser()
>
> #
>
> # Read .INI file
>
> #
>
> INI.read([globalinifilename, _inifilename])
>
>
>
> _logfilename = getFromINI(INI.get, 'init', 'logfilename',
>
>                                    _logfilename, "%s.log" % pgm_)
>
>
>
> logf = loggerClass(_logfilename, 'monitor', 1<<26) #64Mb max
>
> logf.initsessionlog()
>
> LOGF = logf.writelines
>
> LOGF("I", ("------%s V%s begin" % (pgm_, ver_)).ljust(50, '-'))
>
> #
>
> # Make sure there isn't another copy of me running already
>
> #
>
> _pidPath = getFromINI(INI.get, 'init', 'pidPath', None, None)
>
> myapp = singleinstance(pgm_, _pidPath)
>
> if myapp.alreadyrunning():
>
>     msg = "%s already running, exiting" % pgm_
>
>     raise RuntimeError(msg)
>
>
>
>
>
> _trace  = getFromINI(INI.getboolean, 'init', 'trace', _trace, False)
>
> _debug  = getFromINI(INI.getint, 'init', 'debug', _debug, 0)
>
> _quiet  = getFromINI(INI.getboolean, 'init', 'quiet', _quiet, False)
>
> _dryrun = getFromINI(INI.getboolean, 'init', 'dryrun', _dryrun, False)
>
>
>
> signon(pgm_, ver_, _quiet=_quiet)
>
> #
>
> # More items to get from the .INI file (or environment)
>
> #
>
> _STORAGE = getFromINI(INI.get, 'init', 'storage', None, None)
>
> _bucketname = getFromINI(INI.get, 'init', 'bucketname', None, None)
>
>
>
> #
>
> # Get files from .INI to read AWS credentials from
>
> #
>
> _accessKeyFile = getFromINI(INI.get, 'init', 'accesskeyfile', None, None)
>
> _secretKeyFile = getFromINI(INI.get, 'init', 'secretkeyfile', None, None)
>
>
>
> if _debug:
>
>     conlog("I", LM, "-----Options".ljust(50, '-'))
>
>     conlog("I", LM, "trace..........%s" % _trace)
>
>     conlog("I", LM, "debug..........%i" % _debug)
>
>     conlog("I", LM, "quiet..........%s" % _quiet)
>
>     conlog("I", LM, "dryrun.........%s" % _dryrun)
>
>     conlog("I", LM, "STORAGE........%s" % _STORAGE)
>
>     conlog("I", LM, "pidPath........%s" % _pidPath)
>
>     conlog("I", LM, "bucketname.....%s" % _bucketname)
>
>     conlog("I", LM, "accessKeyFile..%s" % _accessKeyFile)
>
>     conlog("I", LM, "secretKeyFile..%s" % _secretKeyFile)
>
>
>
> _PWMfile  = getFromINI(INI.get, 'init', 'pwmfile', None, None)
>
> _host     = getFromINI(INI.get, 'database', 'host', None, None)
>
> _database = getFromINI(INI.get, 'database', 'database', None, None)
>
> _dbport   = getFromINI(INI.getint, 'database', 'port', None, None)
>
> _user     = getFromINI(INI.get, 'database', 'user', None, None)
>
>
>
> conlog("I", LM, "PWMfile........%s" % _PWMfile)
>
> conlog("I", LM, "host...........%s" % _host)
>
> conlog("I", LM, "database.......%s" % _database)
>
> conlog("I", LM, "dbport.........%i" % _dbport)
>
> conlog("I", LM, "user...........%s" % _user)
>
>
>
> if not _quiet:
>
>     print
>
>
>
> #
>
> # Get database password from file
>
> #
>
> _password = open(_PWMfile, 'r').readline().rstrip()
>
> conn = psycopg2.connect(host=_host, database=_database, port=_dbport,
>
>                         user=_user, password=_password)
>
> #
>
> # Get the AccessKey and SecretAccessKey info
>
> #
>
> aws_ak = open(_accessKeyFile,'r').readline().rstrip()
>
> aws_sak = open(_secretKeyFile,'r').readline().rstrip()
>
> #
>
> # Create an instance of boto S3Connection using these credentials
>
> #
>
> S3obj = S3Connection(aws_ak, aws_sak)
>
> if _trace:
>
>     conlog("T", LM, "S3 connection object created")
>
>     conlog("T", LM, "Retrieving bucketname=%s from S3" % _bucketname)
>
>
>
> bucket = S3obj.get_bucket(_bucketname)
>
> if _trace:
>
>     conlog("T", LM, "bucketname = %s, retrieved" % _bucketname)
>
>
>
> #
>
> # Watch for move/delete events
>
> #
>
> #mask = pyinotify.IN_DELETE | pyinotify.IN_ATTRIB | pyinotify.IN_Q_OVERFLOW
>
> mask = pyinotify.IN_ATTRIB
>
> mask |= pyinotify.IN_Q_OVERFLOW
>
> mask |= pyinotify.IN_MOVED_TO
>
> #
>
> # Create instance of WatchManager class and notifier class
>
> #
>
> wm = pyinotify.WatchManager()
>
> if _trace:
>
>     conlog("T", LM, "Creating Watcher instance")
>
>
>
> Wobj = Watcher(conn, bucket, logf = logf,
>
>                _trace = _trace, _debug = _debug, _dryrun = _dryrun)
>
>
>
> if _trace:
>
>     conlog("T", LM, "Watcher instance created")
>
>     conlog("T", LM, "Creating Notifier instance")
>
>
>
> notifier = pyinotify.Notifier(wm, Wobj)
>
>
>
> if _trace:
>
>     conlog("T", LM, "Notifier instance created")
>
>
>
> #
>
> # If I'm debugging, get a loggerClass instance into notifier class instance
>
> # to log _eventq depth.
>
> #
>
> if _debug:
>
>     notifier.LOGF = logf.writelines
>
>
>
> if not _quiet:
>
>     print
>
> #
>
> # Folders to watch (this way I won't watch any folders except the ones
>
> # that actually hold blobs 00-ff->00-ff).  This keeps me from watching
>
> # temp/junk folders that might accidentally get created.
>
> #
>
> ##flist = ['%02x' % i for i in xrange(0, 256)]  #00-ff
>
> flist = ['%02x' % i for i in xrange(int('00',16), int('ff',16) + 1)]
>
> conlog("I", LM, "Watchlist STORAGE..%s (recursive)" % _STORAGE)
>
> conlog("I", LM, "Registering folders to watch...")
>
> foldersRegistered = 0
>
> for n, i in enumerate(flist):
>
>     l = n + 1
>
>     if (n % 16) == 0:
>
>         if not _quiet:
>
>             if n != 0:
>
>                 sys.stdout.write('(recursive)\n')
>
>
>
>             sys.stdout.write("Watchlist adding....%s " % i)
>
>             sys.stdout.flush()
>
>
>
>     else:
>
>         if not _quiet:
>
>             sys.stdout.write("%s " % i)
>
>             sys.stdout.flush()
>
>
>
>     for j in flist:
>
>         watch_folder = os.path.join(_STORAGE, i, j)
>
>         wm.add_watch(watch_folder, mask)
>
>         foldersRegistered +=1
>
>
>
> if not _quiet:
>
>     sys.stdout.write('(recursive)\n')
>
>     print
>
>
>
> conlog("I", LM, "%s folder monitors registered" % FMC(foldersRegistered))
>
> if _trace:
>
>     conlog("T", LM, "Entering notifier.loop")
>
>
>
> try:
>
>     notifier.loop(callback = notifyCallback)
>
>
>
> except KeyboardInterrupt:
>
>     print "KeyboardInterrupt, stopping..."
>
>     stopnow = True
>
>     #
>
>     # destroy the inotify's instance on this interrupt (stop monitoring)
>
>     #
>
>     notifier.stop()
>
>
>
> del myapp
>
> if _dryrun:
>
>     conlog("I", LM, "WARNING-dryrun = True, nothing committed")
>
>
>
>
>
> Hope this helps.
>
>
>
> -Larry
>
>
>
>
>
> *From:* harshavardhanacool at gmail.com [mailto:harshavardhanacool at gmail.com]
> *On Behalf Of *Harshavardhana
> *Sent:* Tuesday, January 26, 2010 2:01 AM
> *To:* Larry Bates
> *Cc:* gluster-users at gluster.org
> *Subject:* Re: [Gluster-users] GlusterFS and inotify (more info)
>
>
>
> Hi Larry,
>
>     Can you share with us the volume files you are using with GlusterFS?.
> Also the scripts you are trying to run.
>
> Thanks
> --
> Harshavardhana
> Gluster - http://www.gluster.com
>
>  On Tue, Jan 26, 2010 at 3:31 AM, Larry Bates <larry.bates at vitalesafe.com>
> wrote:
>
> Well it seems that I can register, but the registration of the subfolders
> is so
> slow that I thought it was not working.  I have subfolders 00-ff and
> subfolders
> under them 00-ff (64K total folder structure).  Registering on normal
> storage
> took about 30 seconds.  Registering inotify watcher (recursive=True) on
> GlusterFS mount takes over 1 hr, 15 min!  Walking the tree and registering
> them
> each individually takes 6 minutes.
>
> -Larry
>
> -----Original Message-----
> From: Larry Bates [mailto:larry.bates at vitalesafe.com]
> Sent: Monday, January 25, 2010 1:51 PM
> To: 'gluster-users at gluster.org'
> Subject: GlusterFS and inotify
>
> I recently moved my backend storage to GlusterFS V3.0 and with one
> exception
> everything is running great.  That exception is that I had a daemon using
> inotify that was watching my storage for new files.  Upon arrival this
> watcher
> uploaded a copy of the file to Amazon S3.  This daemon had been running
> just
> fine for well over a year.  Moving to GlusterFS seems to indicate that
> inotify
> doesn't work on GlusterFS volumes.  I don't know if it is a Gluster, Fuse,
> or
> some other problem.  inotify just refuses to allow me to register the top
> level
> folder to be watched.  Before I spend a lot of time on this, I thought I'd
> bounce it off of the "experts" on this list.
>
> Anyone have any ideas?
>
> Thanks in advance,
> Larry Bates
> vitalEsafe, Inc.
>
> _______________________________________________
> Gluster-users mailing list
> Gluster-users at gluster.org
> http://gluster.org/cgi-bin/mailman/listinfo/gluster-users
>
>
>


More information about the Gluster-users mailing list