001: #!/usr/bin/python
002: """
003: Usage: cask <action> [options...]
004: 
005: Actions:
006:   init     Initialize an empty cask, rooted at this dir.  (NOTE: Can create
007:            000/ dirs.  Create _logs dir?)
008:   migrate  <manifest> <input dir> <output cask dir>
009:            Given a manifest of files and checksums, use a series of "mv"
010:            operations to transform the input directory tree into a cask tree.
011:   status   Show cask state and total disk space used
012:            (algorithm: maintain DB of 4096 mtimes, and then only recalculate
013:            if mtime changed)
014:   check    Check that checksums match their named files.
015:            Check that there aren't any stray files.
016:            Check that everything is read-only / immutable.
017: """
018: # TODO:
019: # 
020: # - receive: parse the protocol by 'ale send', and then update state.
021: #   - for small files, this could be better than scp?
022: #   - receive a tar file?
023: #     - <sha1> <contents>
024: #   - or you could use your own netstring thing
025: #
026: # - should there also be a 'lock' at the root?
027: #
028: # State: RANGES of ale row IDs?  In case a cask is offline or has to be
029: # repaired?  A remote cask might be down.  We always want to be able to accept
030: # backups somewhere.  We don't want to be blocked.
031: #
032: # If data is deleted, that is a new range.
033: #
034: # The ale side can receive a LIST of ranges from every cask.  And verify that
035: # their union adds up to the entire entry_log!
036: 
037: import json
038: import os
039: import sys
040: import time
041: 
042: import util
043: 
044: 
045: class RepoCorruptError(Exception):
046:   """Raised when invariants on the cask repo are violated."""
047: 
048: 
049: # ale_repo_name: Written on first push?  Or on init?  Init be easier code.
050: CREATE_SQL = """\
051: CREATE TABLE cask_state(
052:   ale_repo_name   TEXT,    -- name of the ale we're associated with
053:   ale_counter     INTEGER  -- rowid of the ale entry_log table that we have
054:                                seen
055: );
056: """
057: # NOTE: Could also have a file size cache table, so you don't have to
058: # recompute it on hundreds of thousands of files.
059: 
060: INIT_SQL = """\
061: INSERT INTO cask_state VALUES(?, 0);
062: """
063: 
064: 
065: def CaskInit(cursor, ale_repo_name):
066:   # Hm doesn't apsw let you execute multiple statements at once?
067:   cursor.execute(CREATE_SQL)
068:   cursor.execute(INIT_SQL, (ale_repo_name,))
069: 
070: 
071: class NullHandler(object):
072: 
073:   def __init__(self):
074:     pass
075:   def OnFile(self, *args):
076:     pass
077: 
078: 
079: class CheckHandler(object):
080:   """Check integrity of files."""
081: 
082:   def __init__(self):
083:     self.printer = util.FancyPrinter()
084: 
085:   def OnFile(self, full_path, rel_path, lstat):
086:     """Called with each file in the cask."""
087:     num_bytes = lstat.st_size
088:     self.printer.OnChecksumBegin(rel_path, num_bytes)
089:     with open(full_path) as f:
090:       sha1_bytes = util.ChecksumFile(f, self.printer)
091:     expected = rel_path.replace('/', '')
092:     actual = sha1_bytes.encode('hex')
093:     if expected == actual:
094:       sys.stdout.write(' ' * 15)
095:       print 'OK'
096:     else:
097:       raise RuntimeError(
098:           'Corruption error: file %s has unexpected checksum %s' %
099:           (rel_path, actual))
100: 
101: 
102: # Make sure we dont' count this
103: def SkipCask(name):
104:   return name == 'cask.sqlite3'
105: 
106: 
107: def CaskStatus(cursor, cask_root):
108:   # Hm doesn't apsw let you execute multiple statements at once?
109:   ale_repo_name = None
110:   ale_counter = None
111:   for a, l in cursor.execute(
112:     'SELECT ale_repo_name, ale_counter FROM cask_state'):
113:     ale_repo_name = a
114:     ale_counter = l
115: 
116:   if ale_repo_name is None:
117:     raise RepoCorruptError('Expected ale_repo_name in cask_state table')
118: 
119:   handler = NullHandler()
120: 
121:   # For now, walk the tree to get space used.  For trees with many files, it
122:   # might be useful to cache file sizes in sqlite.
123:   file_count, total_bytes = util.WalkTree(cask_root, '', handler,
124:       skip_func=SkipCask)
125: 
126:   record = {
127:       'aleRepoName': ale_repo_name,
128:       'aleCounter': ale_counter,
129:       'numFiles': file_count,
130:       'totalBytes': total_bytes,
131:       }
132: 
133:   print json.dumps(record, indent=2)
134: 
135: 
136: def Check(cask_root):
137:   # TODO: It should take an argument to check everything only things that
138:   # haven't been checked recently or EVER:
139:   #
140:   # - cask check --all
141:   # - cask check --new  # things that have NEVER been checked
142:   # - cask check --last '>30d'  # last checked more than 30 days ago
143: 
144:   start_time = time.time()
145: 
146:   handler = CheckHandler()
147:   file_count, total_bytes = util.WalkTree(cask_root, '', handler,
148:       skip_func=SkipCask)
149: 
150:   wall_time_secs = time.time() - start_time
151: 
152: 
153:   #new_rows, files_read, bytes_read = handler.GetResults()
154: 
155:   #mb_read = float(bytes_read) / 1e6
156:   util.stderr('Found %d files.', file_count)
157: 
158:   # TODO: Share some of this code with Freeze() in ale.py.  It shows the
159:   # MB/s.
160:   util.stderr('%.1f seconds', wall_time_secs)
161: 
162:   #util.stderr(
163:   #    'Checksummed %d files of %.1f MB in %.1f seconds (%.1f MB/s).',
164:   #    files_read, mb_read, wall_time_secs, mb_read/wall_time_secs)
165: 
166:   #if not new_rows:
167:   #  util.stderr('No new files in repo.')
168:   #  return
169: 
170: 
171: 
172: def main(argv):
173:   try:
174:     action = argv[1]
175:   except IndexError:
176:     action = 'help'
177: 
178:   if action == 'help':
179:     print __doc__
180: 
181:   elif action == 'init':
182:     #try:
183:     #  ale_repo_name = argv[2]
184:     #except IndexError:
185:     #  raise RuntimeError('Usage: cask init <ale name> [dir]')
186: 
187:     try:
188:       cask_root = argv[2]
189:     except IndexError:
190:       cask_root = os.getcwd()
191:     util.MakeDir(cask_root)
192: 
193:     db_name = os.path.join(cask_root, 'cask.sqlite3')
194:     if os.path.exists(db_name):
195:       util.stderr('%s already exists', db_name)
196:       return 1
197: 
198:     # TODO: Do it on first push!
199:     ale_repo_name = 'DUMMY'
200: 
201:     with util.SqliteCursor(db_name, create=True) as cursor:
202:       CaskInit(cursor, ale_repo_name)
203: 
204:     util.stderr('Initialized %s', cask_root)
205: 
206:   elif action == 'status':
207:     try:
208:       cask_root = argv[2]
209:     except IndexError:
210:       cask_root = os.getcwd()
211: 
212:     db_name = os.path.join(cask_root, 'cask.sqlite3')
213:     with util.SqliteCursor(db_name) as cursor:
214:       CaskStatus(cursor, cask_root)
215: 
216:   elif action == 'migrate':
217:     print 'TODO migrate'
218: 
219:   elif action == 'check':
220:     try:
221:       cask_root = argv[2]
222:     except IndexError:
223:       cask_root = os.getcwd()
224: 
225:     Check(cask_root)
226: 
227:   else:
228:     raise RuntimeError('Invalid action %r' % action)
229: 
230: 
231: if __name__ == '__main__':
232:   try:
233:     sys.exit(main(sys.argv))
234:   except RuntimeError, e:
235:     print >>sys.stderr, 'FATAL: %s' % e
236:     sys.exit(1)
237: