[7756] | 1 | import unittest |
---|
| 2 | import tempfile |
---|
| 3 | import os |
---|
| 4 | import shutil |
---|
[7745] | 5 | |
---|
[7780] | 6 | from anuga.utilities.file_utils import copy_code_files, get_all_swwfiles |
---|
| 7 | from anuga.utilities.file_utils import del_dir |
---|
[7820] | 8 | from sww_merge import sww_merge |
---|
[7745] | 9 | |
---|
[7756] | 10 | |
---|
[7745] | 11 | class Test_FileUtils(unittest.TestCase): |
---|
| 12 | |
---|
| 13 | def test_copy_code_files(self): |
---|
| 14 | '''test that the copy_code_files() function is sane.''' |
---|
| 15 | |
---|
| 16 | def create_file(f): |
---|
| 17 | fd = open(f, 'w') |
---|
| 18 | fd.write('%s\n' % f) |
---|
| 19 | fd.close() |
---|
| 20 | |
---|
| 21 | # create working directories and test files |
---|
| 22 | work_dir = tempfile.mkdtemp() |
---|
| 23 | dst_dir = tempfile.mkdtemp(dir=work_dir) |
---|
| 24 | src_dir = tempfile.mkdtemp(dir=work_dir) |
---|
| 25 | |
---|
| 26 | f1 = 'file1' |
---|
| 27 | filename1 = os.path.join(src_dir, f1) |
---|
| 28 | create_file(filename1) |
---|
| 29 | f2 = 'file2' |
---|
| 30 | filename2 = os.path.join(src_dir, f2) |
---|
| 31 | create_file(filename2) |
---|
| 32 | f3 = 'file3' |
---|
| 33 | filename3 = os.path.join(src_dir, f3) |
---|
| 34 | create_file(filename3) |
---|
| 35 | f4 = 'file4' |
---|
| 36 | filename4 = os.path.join(src_dir, f4) |
---|
| 37 | create_file(filename4) |
---|
| 38 | f5 = 'file5' |
---|
| 39 | filename5 = os.path.join(src_dir, f5) |
---|
| 40 | create_file(filename5) |
---|
| 41 | |
---|
| 42 | # exercise the copy function |
---|
| 43 | copy_code_files(dst_dir, filename1) |
---|
| 44 | copy_code_files(dst_dir, filename1, filename2) |
---|
| 45 | copy_code_files(dst_dir, (filename4, filename5, filename3)) |
---|
| 46 | |
---|
| 47 | # test that files were actually copied |
---|
[7756] | 48 | self.failUnless(os.access(os.path.join(dst_dir, f1), os.F_OK)) |
---|
| 49 | self.failUnless(os.access(os.path.join(dst_dir, f2), os.F_OK)) |
---|
| 50 | self.failUnless(os.access(os.path.join(dst_dir, f3), os.F_OK)) |
---|
| 51 | self.failUnless(os.access(os.path.join(dst_dir, f4), os.F_OK)) |
---|
| 52 | self.failUnless(os.access(os.path.join(dst_dir, f5), os.F_OK)) |
---|
[7745] | 53 | |
---|
| 54 | # clean up |
---|
| 55 | shutil.rmtree(work_dir) |
---|
[7780] | 56 | |
---|
| 57 | def test_get_all_swwfiles(self): |
---|
| 58 | try: |
---|
| 59 | swwfiles = get_all_swwfiles('','test.txt') #Invalid |
---|
| 60 | except IOError: |
---|
| 61 | pass |
---|
| 62 | else: |
---|
| 63 | raise 'Should have raised exception' |
---|
| 64 | |
---|
| 65 | def test_get_all_swwfiles1(self): |
---|
| 66 | |
---|
| 67 | temp_dir = tempfile.mkdtemp('','sww_test') |
---|
| 68 | filename0 = tempfile.mktemp('.sww','test',temp_dir) |
---|
| 69 | filename1 = tempfile.mktemp('.sww','test',temp_dir) |
---|
| 70 | filename2 = tempfile.mktemp('.sww','test',temp_dir) |
---|
| 71 | filename3 = tempfile.mktemp('.sww','test',temp_dir) |
---|
| 72 | |
---|
| 73 | #print'filename', filename0,filename1,filename2,filename3 |
---|
| 74 | |
---|
| 75 | fid0 = open(filename0, 'w') |
---|
| 76 | fid1 = open(filename1, 'w') |
---|
| 77 | fid2 = open(filename2, 'w') |
---|
| 78 | fid3 = open(filename3, 'w') |
---|
[7745] | 79 | |
---|
[7780] | 80 | fid0.write('hello') |
---|
| 81 | fid1.write('hello') |
---|
| 82 | fid2.write('hello') |
---|
| 83 | fid3.write('hello') |
---|
| 84 | |
---|
| 85 | fid0.close() |
---|
| 86 | fid1.close() |
---|
| 87 | fid2.close() |
---|
| 88 | fid3.close() |
---|
| 89 | |
---|
| 90 | |
---|
| 91 | dir, name0 = os.path.split(filename0) |
---|
| 92 | #print 'dir',dir,name0 |
---|
| 93 | |
---|
| 94 | iterate=get_all_swwfiles(dir,'test') |
---|
| 95 | |
---|
| 96 | del_dir(temp_dir) |
---|
| 97 | # removeall(temp_dir) |
---|
| 98 | |
---|
| 99 | _, name0 = os.path.split(filename0) |
---|
| 100 | #print'name0',name0[:-4],iterate[0] |
---|
| 101 | _, name1 = os.path.split(filename1) |
---|
| 102 | _, name2 = os.path.split(filename2) |
---|
| 103 | _, name3 = os.path.split(filename3) |
---|
| 104 | |
---|
| 105 | assert name0[:-4] in iterate |
---|
| 106 | assert name1[:-4] in iterate |
---|
| 107 | assert name2[:-4] in iterate |
---|
| 108 | assert name3[:-4] in iterate |
---|
| 109 | |
---|
| 110 | assert len(iterate)==4 |
---|
| 111 | |
---|
[7820] | 112 | def test_merge_swwfiles(self): |
---|
| 113 | from anuga.abstract_2d_finite_volumes.mesh_factory import rectangular, \ |
---|
| 114 | rectangular_cross |
---|
| 115 | from anuga.shallow_water.shallow_water_domain import Domain |
---|
| 116 | from anuga.file.sww import SWW_file |
---|
| 117 | from anuga.abstract_2d_finite_volumes.generic_boundary_conditions import \ |
---|
| 118 | Dirichlet_boundary |
---|
| 119 | |
---|
| 120 | Bd = Dirichlet_boundary([0.5, 0., 0.]) |
---|
| 121 | |
---|
| 122 | # Create shallow water domain |
---|
| 123 | domain = Domain(*rectangular_cross(2, 2)) |
---|
| 124 | domain.set_name('test1') |
---|
| 125 | domain.set_quantity('elevation', 2) |
---|
| 126 | domain.set_quantity('stage', 5) |
---|
| 127 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
| 128 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
| 129 | pass |
---|
| 130 | |
---|
| 131 | domain = Domain(*rectangular(3, 3)) |
---|
| 132 | domain.set_name('test2') |
---|
| 133 | domain.set_quantity('elevation', 3) |
---|
| 134 | domain.set_quantity('stage', 50) |
---|
| 135 | domain.set_boundary({'left': Bd, 'right': Bd, 'top': Bd, 'bottom': Bd}) |
---|
| 136 | for t in domain.evolve(yieldstep=0.5, finaltime=1): |
---|
| 137 | pass |
---|
| 138 | |
---|
| 139 | outfile = 'test_out.sww' |
---|
| 140 | sww_merge(['test1.sww', 'test2.sww'], outfile) |
---|
| 141 | self.failUnless(os.access(outfile, os.F_OK)) |
---|
| 142 | |
---|
| 143 | # remove temp files |
---|
| 144 | os.remove('test1.sww') |
---|
| 145 | os.remove('test2.sww') |
---|
| 146 | # os.remove(outfile) |
---|
| 147 | |
---|
| 148 | |
---|
| 149 | |
---|
[7745] | 150 | #------------------------------------------------------------- |
---|
| 151 | |
---|
| 152 | if __name__ == "__main__": |
---|
[7820] | 153 | suite = unittest.makeSuite(Test_FileUtils, 'test') |
---|
[7765] | 154 | runner = unittest.TextTestRunner() #verbosity=2) |
---|
| 155 | runner.run(suite) |
---|