001: #!/usr/bin/python
002: """
003: Usage: ale <action> [options...]
004: 
005: Actions:
006:   init      Initialize an empty ale namespace, rooted at this dir
007:   freeze    Update the cask metadata DB from the file system state
008:             After a file is checksummed, it is made read-only.
009:             TODO: accept arg for efficiency?
010:             --progress shows sha1sum progress
011:             --preview shows the files that haven't yet been frozen
012:   backup    backup any new files, using the configuration file in
013:             .ale/config.txt.  Potentially invokes 'send' multiple times.
014:   send      Write a list of blobs on stdout.  Piped to 'cask receieve'.
015:   scp       Alternative to ale send / cask receive, using plain scp (may incur
016:             connection latency)
017:   manifest  Print a manifest that can be used as input to "cask migrate".
018:             (The checksums are read from the metadata)
019: 
020: File Management
021:   mv        'ale mv' is a more efficient way to do Unix 'mv' and then
022:             'ale freeze'
023:   rm        'ale rm' is a more efficient way to do Unix 'rm' and then 'ale
024:              freeze'
025:   ls        List files in the repo (frozen files only; Unix 'ls' is for
026:             unfrozen files)
027: 
028:   du        show total size of files, but faster.  Consults the cask
029:             metadata rather than the file system.
030:   space     Show total disk space used
031:             (algorithm: scan DB)
032: 
033:   log       Dump the entry log
034:   check     Compare files on disk with recorded checksums.
035: 
036:   config    Show/edit the config that lists backing casks
037:   casks     List casks?  Run "cask status" remotely, in parallel, with
038:             shell script.
039:             --verbose will pass xargs --verbose to see the commands.
040:             NOTE: This could be another entry point:
041: 
042:             ale casks        # status by default - rowid/counter, space, if
043:                              # it's online/valid
044:             ale casks status # explicit
045:             ale casks stats  # checksum stats, i.e. last verified
046:             ale casks init   # could even recursively init according to the
047:                              # backup.cfg file!
048: 
049:   clean-casks print out a list of commands to delete old files?
050:               probably log these to a text file?
051: 
052:   debug     Dump state for debugging purposes.
053: 
054: """
055: 
056: import errno
057: import os
058: import subprocess
059: import sys
060: import time
061: 
062: import util
063: 
064: 
065: class RepoCorruptError(Exception):
066:   """Raised when invariants on the ale repo are violated."""
067: 
068: 
069: # Don't use autoincrement, just use the row ID
070: # https://www.sqlite.org/autoinc.html
071: 
072: # NOTE:
073: # - maximum row ID can serve as a key for a cache of the current directory
074: # structure
075: 
076: 
077: CREATE_SQL = """\
078: -- NOTE: implicit END TRANSACTION here by executescript, which is retarded!
079: 
080: -- Immutable log of files in the ale repo
081: CREATE TABLE entry_log(
082:   action      TEXT,    -- '+', or '-' for add/remove
083:   timestamp   INTEGER, -- timestamp of action
084:   rel_path    TEXT,    -- e.g. path within the ale namespace, e.g. 'foo/bar'
085:   num_bytes   INTEGER, -- file size
086:   mtime       INTEGER, -- file system mtime of the file, if add.  This is used
087:                        -- as a sanity check.
088:   sha1        BLOB     -- checksum of the file, if add
089: );
090: 
091: -- This whole table is for auditing purposes I guess?  Could get rid of it.
092: CREATE TABLE DEBUG_freeze_log(
093:   timestamp       INTEGER, -- timestamp of freeze, for UI only
094:   num_files       INTEGER, -- number of files frozen, for UI only
095:   total_bytes     INTEGER, -- total bytes, for UI only
096:   wall_time_secs  REAL,    -- how long it took to freeze
097:   ale_counter     INTEGER  -- latest rowid written to the metadata DB
098: );
099: 
100: -- Immutable log of successful backup jobs
101: CREATE TABLE backup_log(
102:   cask_host    TEXT,    -- id of cask
103:   cask_dir     TEXT,
104:   ale_counter  INTEGER, -- latest rowid backed up
105:   timestamp    INTEGER  -- time backed up, for UI only
106: );
107: 
108: BEGIN TRANSACTION;
109: """
110: 
111: def FindAleRootDir(current_dir):
112:   while True:
113:     ale_dir = os.path.join(current_dir, '.ale')
114:     if os.path.exists(ale_dir):
115:       return current_dir
116:     parent_dir = os.path.dirname(current_dir)
117:     if parent_dir == current_dir:  # reached /
118:       return None
119:     current_dir = parent_dir
120: 
121: 
122: def FindAleRepoOrDie():
123:   """
124:   Find the ale repo the current directory lives in.  An ale repo is rooted at a
125:   dir with an '.ale' subdirectory.
126:   """
127:   cwd = os.getcwd()
128:   ale_root = FindAleRootDir(cwd)
129:   if ale_root is None:
130:     raise RuntimeError("Couldn't find .ale directory (%s)" % cwd)
131:   db_name = os.path.join(ale_root, '.ale/metadata.sqlite3')
132:   return ale_root, db_name
133: 
134: 
135: def ParseConfig(f):
136:   """Parse the backup.cfg file."""
137:   config = []
138:   for line in f:
139:     line = line.strip()
140:     if not line:
141:       continue
142:     if line.startswith('#'):
143:       continue
144:     cask_host, cask_dir = line.split(None, 2)
145:     config.append((cask_host, cask_dir))
146:   return config
147: 
148: 
149: def ParseConfigOrDie(ale_root):
150:   config_path = os.path.join(ale_root, '.ale/backup.cfg')
151:   try:
152:     with open(config_path) as f:
153:       config = ParseConfig(f)
154:   except IOError as e:
155:     if e.errno == errno.ENOENT:
156:       raise RuntimeError('No casks configured (%s not found).' %
157:           config_path)
158:     else:
159:       raise
160:   return config
161: 
162: 
163: # mv algorithm:
164: # - search for rel_path that starts with the path.  Delete them all, and then
165: # add them back?
166: 
167: 
168: def AleInit(cursor):
169:   cursor.executescript(CREATE_SQL)
170: 
171: 
172: def List(cursor, args):
173:   """List files that are known to the metadata DB."""
174:   db_state = ReconstructState(cursor)
175: 
176:   debug_unicode = 0
177:   if debug_unicode:
178:     fmt = '%s %r'  # see raw bytes to see if it is utf-8.  YUP!
179:   else:
180:     fmt = '%s %s'
181: 
182:   # TODO: maybe ls -l shows mtime and size
183:   for name in sorted(db_state):
184:     num_bytes, mtime, sha1_bytes = db_state[name]
185:     print fmt % (sha1_bytes.encode('hex'), name)
186: 
187: 
188: class UpdateHandler(object):
189:   """
190:   Receives notice about files on the file system, compares it to existing
191:   state, and figures out which rows to write back into the database.
192:   """
193: 
194:   def __init__(self, db_state, printer):
195:     """
196:     Args:
197:       db_state: existing state
198:     """
199:     self.db_state = db_state
200:     self.printer = printer
201: 
202:     self.files_read = 0
203:     self.bytes_read = 0
204:     self.rows_to_insert = []
205: 
206:   def OnFile(self, full_path, rel_path, lstat):
207:     """Called with each file in the ale repo."""
208:     if rel_path in self.db_state:
209:       return  # do nothing
210: 
211:     num_bytes = lstat.st_size
212:     self.printer.OnChecksumBegin(rel_path, num_bytes)
213: 
214:     with open(full_path) as f:
215:       sha1_bytes = util.ChecksumFile(f, self.printer)
216: 
217:     # After the progress bar
218:     sys.stdout.write(' ' * 15)
219:     print sha1_bytes.encode('hex')
220: 
221:     # Whole second resolution is enough, and I think more portable.
222:     timestamp = int(time.time())  # no time zone for now
223:     mtime_int = int(lstat.st_mtime)
224: 
225:     row = ('+', timestamp, rel_path, num_bytes, mtime_int, buffer(sha1_bytes))
226:     self.rows_to_insert.append(row)
227: 
228:     self.files_read += 1
229:     self.bytes_read += num_bytes
230: 
231:   def GetResults(self):
232:     return self.rows_to_insert, self.files_read, self.bytes_read
233: 
234: 
235: def ReconstructState(cursor, rowid_range=None):
236:   """Play back the entry log and reconstruct the current files from it.
237: 
238:   Args:
239:     cursor: sqlite cursor
240:     rowid_range: An inclusive range as a tuple (min, max); or None if all rows
241:        should be returned
242: 
243:   Returns:
244:     A dict of { rel_path -> (num_bytes, mtime, sha1) }
245:   """
246:   # NOTE:
247:   # - rel_path should be normalized so it doesn't end with / ever
248: 
249:   base_query = """
250:     SELECT action, rel_path, num_bytes, mtime, sha1
251:     FROM entry_log
252:     """
253:   if rowid_range:
254:     query_str = base_query + 'WHERE ? <= rowid AND rowid <= ?'
255:     args = rowid_range
256:   else: 
257:     # Range is inculsive on both sides, like [3, 5].
258:     query_str = base_query
259:     args = ()
260: 
261:   state = {}
262:   for action, rel_path, num_bytes, mtime, sha1 in cursor.execute(
263:       query_str, args):
264:     if action == '+':
265:       state[rel_path] = (num_bytes, mtime, str(sha1))
266:     elif action == '-':
267:       del state[rel_path]  # TODO: exceptions
268:     else:
269:       raise RepoCorruptError('Invalid action %r' % action)
270:   return state
271: 
272: 
273: def Freeze(cursor, ale_root, args):
274:   """Make the metadata.sqlite3 DB reflect the file system state.
275: 
276:   Most work is in checksumming the files.
277: 
278:   Right now we assume
279:   """
280:   # TODO:
281:   # - don't checksum if the mtime matches that of the file in db_state.  if
282:   # the name matches, but the mtime doesn't, raise an error?  Files should not
283:   # be modified.
284:   # - Make the files read-only
285:   # - Also need handle deletions!  UpdateHandler should another dict and
286:   # compare it against db_state.  Set difference.
287: 
288:   start_time = time.time()
289: 
290:   db_state = ReconstructState(cursor)
291: 
292:   # TODO: Parameterize this on whether sys.stdout.isatty()
293:   printer = util.FancyPrinter()
294: 
295:   handler = UpdateHandler(db_state, printer)
296:   file_count, total_bytes = util.WalkTree(ale_root, '', handler)
297: 
298:   wall_time_secs = time.time() - start_time
299: 
300:   new_rows, files_read, bytes_read = handler.GetResults()
301: 
302:   mb_read = float(bytes_read) / 1e6
303:   util.stderr('Found %d files.', file_count)
304:   util.stderr(
305:       'Checksummed %d files of %.1f MB in %.1f seconds (%.1f MB/s).',
306:       files_read, mb_read, wall_time_secs, mb_read/wall_time_secs)
307: 
308:   if not new_rows:
309:     util.stderr('No new files in repo.')
310:     return
311: 
312:   cursor.executemany(
313:       'INSERT INTO entry_log VALUES(?, ?, ?, ?, ?, ?);', new_rows)
314: 
315:   util.stderr('Added %d files to repo.', len(new_rows))
316: 
317:   ale_counter = None
318:   for (latest_rowid,) in cursor.execute('SELECT MAX(rowid) FROM entry_log'):
319:     ale_counter = latest_rowid
320:   assert ale_counter is not None  # we would have done an early return
321: 
322:   wall_time_secs = time.time() - start_time
323:   freeze_row = (
324:       int(start_time), file_count, total_bytes, wall_time_secs,
325:       latest_rowid)
326: 
327:   cursor.execute(
328:       'INSERT INTO DEBUG_freeze_log VALUES(?, ?, ?, ?, ?);', freeze_row)
329: 
330: 
331: def GetFilesToBackup(cursor, latest_rowid, last_backup_rowid):
332:   # Range is inclusive.  Don't include the last backed up row.
333:   rowid_range = (last_backup_rowid + 1, latest_rowid)
334:   db_state = ReconstructState(cursor, rowid_range=rowid_range)
335:   for rel_path, (_, _, sha1_bytes) in db_state.iteritems():
336:     yield rel_path, sha1_bytes
337: 
338: 
339: def Backup(cursor, ale_root, config):
340:   """
341:   TODO: 
342:   - 'cask status' for everything in the config file
343:   - figure out what to copy where
344:   - call 'ale scp' to copy them
345: 
346: 
347:   Model:
348:   - A sequence of casks backs an ale.
349:   - A cask repo belongs to a single ale repo
350: 
351:   Should I accept explicit cask args?  Will that change the state?
352:   Policy:
353:   - always choose the last one, except when it's full.  Then choose a new one
354:     based on the config file?
355: 
356:   - Keep state in the ale about the last rowid backed up
357:   - And then when you to "ale backup", print a message like "ale was last
358:     backed up to cask homer.local:foo.cask at 2/01/16 at 9:34pm".  And then
359:     make sure the latest_rowid in cask and ale matches.  Otherwise you have to
360:     print a "missing cask" message.
361:   """
362:   # Later algorithm:
363:   # - Parse the config file to get a list of cask (host, dir)
364:   # - Run 'cask space' in parallel to get (host, dir, space)
365:   #
366:   # - Yeah it would be convenient to get (space, rowid) for each cask.  And
367:   # then choose the biggest one?  And then back up since the biggest.
368:   #   - Example: Current ale row ID is 500
369:   #   - One cask has 100-400, and the other one has 400-450
370:   #   - Then you know you need to sent rows 450-500
371:   #     - Do you care about deletions, or is that a separate "cask
372:   #     reclaim-space" step/
373:   #
374:   # What about offline casks?  I think you can comment them out of the
375:   # config file.
376: 
377:   # Get the maximum
378:   latest_rowid = -1
379:   for (rowid,) in cursor.execute('SELECT MAX(rowid) FROM entry_log'):
380:     latest_rowid = rowid
381:   if latest_rowid == -1:
382:     raise RepoCorruptError('Missing rowid in entry_log table')
383:   print 'LATEST rowid', latest_rowid
384: 
385:   # TODO: Find the last TWO freeze counters?
386:   # difference this vs. the last backup coutner!
387: 
388:   # Now find the cask with the maximum ID.  That is the last one we wrote to.
389:   #
390:   # Write to that one, unless it's full!
391:   last_backup_rowid = None
392:   last_cask_host = None
393:   last_cask_dir = None
394:   for (last_cask_host, last_cask_dir, last_backup_rowid,) in cursor.execute(
395:       """
396:       SELECT   cask_host, cask_dir, MAX(ale_counter)
397:       FROM     backup_log
398:       GROUP BY ale_counter
399:       """):
400:     pass
401: 
402:   if last_backup_rowid is None:
403:     last_backup_rowid = 0
404:     # Just use the first one for now
405:     cask_host, cask_dir = config[0]
406:     util.stderr('No backups yet!')
407:   else:
408:     cask_host = last_cask_host
409:     cask_dir = last_cask_dir
410: 
411:   print cask_host, cask_dir, last_backup_rowid
412: 
413:   path_sha1_pairs = list(
414:       GetFilesToBackup(cursor, latest_rowid, last_backup_rowid))
415: 
416:   Scp(ale_root, cask_host, cask_dir, path_sha1_pairs)
417: 
418:   new_ale_counter = latest_rowid
419:   backup_timestamp = int(time.time())
420:   backup_row = (cask_host, cask_dir, new_ale_counter, backup_timestamp)
421:   cursor.execute(
422:       'INSERT INTO backup_log VALUES(?, ?, ?, ?);', backup_row)
423: 
424: 
425: def Scp(ale_root, cask_host, cask_dir, path_sha1_pairs):
426:   # Query rel_path by sha1
427:   for rel_path, sha1_bytes in path_sha1_pairs:
428:     sha1_hex = sha1_bytes.encode('hex')
429:     dir_part = sha1_hex[:3]
430:     name_part = sha1_hex[3:]
431: 
432:     # Make the dir via SSH first.
433:     dest_dir = os.path.join(cask_dir, dir_part)
434:     ssh_argv = ['ssh', cask_host, 'mkdir', '-p', dest_dir]
435:     exit_code = subprocess.call(ssh_argv)
436:     if exit_code != 0:
437:       raise RuntimeError('%s failed with code %d' % (ssh_argv, exit_code))
438: 
439:     # Copy via SCP.
440:     src = os.path.join(ale_root, rel_path)
441:     dest = '%s:%s/%s/%s' % (cask_host, cask_dir, dir_part, name_part)
442:     scp_argv = ['scp', src, dest]
443: 
444:     exit_code = subprocess.call(scp_argv)
445:     if exit_code != 0:
446:       raise RuntimeError('%s failed with code %d' % (scp_argv, exit_code))
447: 
448: 
449: def AllCaskStatus(config):
450:   # Hm this should be a flag
451:   cask_cmd = '/home/andy/bin/cask'
452: 
453:   # TODO:
454:   # - Verify that the are attached to this 'ale' repo.
455:   # - Print the counter -- important state used for backup
456:   # - And then, number of files, space used, and last backup timestamp for
457:   # information
458:   #   - is the backup timestamp local or remote?  Maybe reconcile them
459:   #
460:   # Should you have a --json flag?  Probably
461:   for cask_host, cask_dir in config:
462:     argv = ['ssh', cask_host, cask_cmd, 'status', cask_dir]
463:     subprocess.check_call(argv)
464:     #if exit_code != 0:
465:     #  raise RuntimeError('%s failed with code %d' % (scp_argv, exit_code))
466: 
467: 
468: def main(argv):
469:   try:
470:     action = argv[1]
471:   except IndexError:
472:     action = 'help'
473: 
474:   if action == 'help':
475:     print __doc__
476: 
477:   elif action == 'init':
478:     try:
479:       ale_root = argv[2]
480:     except IndexError:
481:       ale_root = os.getcwd()
482:     util.MakeDir(ale_root)
483: 
484:     ale_dir = os.path.join(ale_root, '.ale')
485:     if os.path.exists(ale_dir):
486:       util.stderr('%s already exists', ale_dir)
487:       return 1
488: 
489:     util.MakeDir(ale_dir)
490: 
491:     db_name = os.path.join(ale_dir, 'metadata.sqlite3')
492: 
493:     with util.SqliteCursor(db_name, create=True) as cursor:
494:       AleInit(cursor)
495: 
496:     util.stderr('Initialized %s', db_name)
497: 
498:   elif action == 'ls':
499:     # Turn this into a context manager?
500:     # enter/exit
501:     # exit closes it
502:     # with AleMetadata() as conn:
503:     # 
504:     ale_root, db_name = FindAleRepoOrDie()
505:     with util.SqliteCursor(db_name) as cursor:
506:       List(cursor, argv[2:])
507: 
508:   elif action == 'freeze':
509:     ale_root, db_name = FindAleRepoOrDie()
510: 
511:     with util.SqliteCursor(db_name) as cursor:
512:       Freeze(cursor, ale_root, argv[2:])
513: 
514:   elif action == 'backup':
515:     ale_root, db_name = FindAleRepoOrDie()
516: 
517:     config = ParseConfigOrDie(ale_root)
518: 
519:     with util.SqliteCursor(db_name) as cursor:
520:       Backup(cursor, ale_root, config)
521: 
522:   elif action == 'scp':
523:     # Usage:
524:     #   ale scp HOST CASK <sha1>...
525:     ale_root, db_name = FindAleRepoOrDie()
526: 
527:     cask_host = argv[2]
528:     cask_dir = argv[3]
529:     rest = argv[4:]
530:     n = len(rest)
531:     if n % 2 != 0:
532:       raise RuntimeError('scp takes an even number of args')
533:     path_sha1_pairs = []
534:     for i in xrange(n/2):
535:       m = i * 2
536:       name = rest[m]
537:       sha1_hex = rest[m+1]
538:       path_sha1_pairs.append((name, sha1_hex.decode('hex')))
539: 
540:     Scp(ale_root, cask_host, cask_dir, path_sha1_pairs)
541: 
542:   elif action == 'casks':
543:     ale_root, db_name = FindAleRepoOrDie()
544:     config = ParseConfigOrDie(ale_root)
545:     AllCaskStatus(config)
546: 
547:   elif action == 'debug':
548:     # Debugging only
549: 
550:     subaction = argv[2]
551: 
552:     ale_root, db_name = FindAleRepoOrDie()
553:     with util.SqliteCursor(db_name) as cursor:
554:       if subaction == 'freeze-log':
555:         for result in cursor.execute('SELECT * FROM DEBUG_freeze_log'):
556:           print result
557:       else:
558:         raise RuntimeError('Invalid debug action %r' % subaction)
559: 
560:   else:
561:     raise RuntimeError('Invalid action %r' % action)
562: 
563: 
564: if __name__ == '__main__':
565:   try:
566:     sys.exit(main(sys.argv))
567:   except RuntimeError, e:
568:     print >>sys.stderr, 'FATAL: %s' % e
569:     sys.exit(1)
570: