diff options
author | M S Vishwanath Bhat <vishwanath@gluster.com> | 2012-02-24 13:18:56 +0530 |
---|---|---|
committer | Vijay Bellur <vijay@gluster.com> | 2012-03-07 23:18:29 -0800 |
commit | 5fdd65f5f4f5df1d28b0fb4f7efed226d5db1b3c (patch) | |
tree | 377a94774c5cd9f55b16ba6fcd1c7b5ec51bfa3b /glusterfs-hadoop/0.20.2/src | |
parent | e1ab347720f25ed2e7db633a7202f7b873f4b90a (diff) |
renaming hdfs -> glusterfs-hadoop
Change-Id: Ibb937af1231f6bbed9a2d4eaeabc6e9d4000887f
BUG: 797064
Signed-off-by: M S Vishwanath Bhat <vishwanath@gluster.com>
Reviewed-on: http://review.gluster.com/2811
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vijay@gluster.com>
Diffstat (limited to 'glusterfs-hadoop/0.20.2/src')
7 files changed, 1453 insertions, 0 deletions
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java new file mode 100644 index 00000000000..e633b8aae80 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java @@ -0,0 +1,109 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +public class GlusterFSBrickClass { + String host; + String exportedFile; + long start; + long end; + boolean isChunked; + int stripeSize; // Stripe size in bytes + int nrStripes; // number of stripes + int switchCount; // for SR, DSR - number of replicas of each stripe + // -1 for others + + public GlusterFSBrickClass (String brick, long start, long len, boolean flag, + int stripeSize, int nrStripes, int switchCount) + throws IOException { + this.host = brick2host(brick); + this.exportedFile = brick2file(brick); + this.start = start; + this.end = start + len; + this.isChunked = flag; + this.stripeSize = stripeSize; + this.nrStripes = nrStripes; + this.switchCount = switchCount; + } + + public boolean isChunked () { + return isChunked; + } + + public String brickIsLocal(String hostname) { + String path = null; + File f = null; + if (host.equals(hostname)) + path = exportedFile; + + return path; + } + + public int[] getBrickNumberInTree(long start, int len) { + long end = len; + int startNodeInTree = ((int) (start / stripeSize)) % nrStripes; + int endNodeInTree = ((int) ((start + len) / stripeSize)) % nrStripes; + + if (startNodeInTree != endNodeInTree) { + end = (start - (start % stripeSize)) + stripeSize; + end -= start; + } + + return new int[] {startNodeInTree, endNodeInTree, (int) end}; + } + + public boolean brickHasFilePart(int nodeInTree, int nodeLoc) { + if (switchCount == -1) + return (nodeInTree == nodeLoc); + + nodeInTree *= switchCount; + for (int i = nodeInTree; i < (nodeInTree + switchCount); i++) { + if (i == nodeLoc) + return true; + } + + return false; + } + + public String brick2host (String brick) + throws IOException { + String[] hf = null; + + hf = brick.split(":"); + if (hf.length != 2) + throw new IOException("Error getting hostname from brick"); + + return hf[0]; + } + + public String brick2file (String brick) + throws IOException { + String[] hf = null; + + hf = brick.split(":"); + if (hf.length != 2) + throw new IOException("Error getting hostname from brick"); + + return hf[1]; + } + +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java new file mode 100644 index 00000000000..11454e6368a --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java @@ -0,0 +1,52 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +public class GlusterFSBrickRepl { + private String[] replHost; + private long start; + private long len; + private int cnt; + + GlusterFSBrickRepl(int replCount, long start, long len) { + this.replHost = new String[replCount]; + this.start = start; + this.len = len; + this.cnt = 0; + } + + public void addHost (String host) { + this.replHost[cnt++] = host; + } + + public String[] getReplHosts () { + return this.replHost; + } + + public long getStartLen () { + return this.start; + } + + public long getOffLen () { + return this.len; + } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java new file mode 100644 index 00000000000..18e9003b43e --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java @@ -0,0 +1,471 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.net.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.*; +import java.util.HashMap; +import java.util.TreeMap; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.fs.BlockLocation; + +public class GlusterFSXattr { + + public enum LAYOUT { D, S, R, DS, DR, SR, DSR } + public enum CMD { GET_HINTS, GET_REPLICATION, GET_BLOCK_SIZE, CHECK_FOR_QUICK_IO } + + private static String hostname; + + public GlusterFSXattr() { } + + public static String brick2host (String brick) + throws IOException { + String[] hf = null; + + hf = brick.split(":"); + if (hf.length != 2) { + System.out.println("brick not of format hostname:path"); + throw new IOException("Error getting hostname from brick"); + } + + return hf[0]; + } + + public static String brick2file (String brick) + throws IOException { + String[] hf = null; + + hf = brick.split(":"); + if (hf.length != 2) { + System.out.println("brick not of format hostname:path"); + throw new IOException("Error getting hostname from brick"); + } + + return hf[1]; + } + + public static BlockLocation[] getPathInfo (String filename, long start, long len) + throws IOException { + HashMap<String, ArrayList<String>> vol = null; + HashMap<String, Integer> meta = new HashMap<String, Integer>(); + + vol = execGetFattr(filename, meta, CMD.GET_HINTS); + + return getHints(vol, meta, start, len, null); + } + + public static long getBlockSize (String filename) + throws IOException { + HashMap<String, ArrayList<String>> vol = null; + HashMap<String, Integer> meta = new HashMap<String, Integer>(); + + vol = execGetFattr(filename, meta, CMD.GET_BLOCK_SIZE); + + if (!meta.containsKey("block-size")) + return 0; + + return (long) meta.get("block-size"); + + } + + public static short getReplication (String filename) + throws IOException { + HashMap<String, ArrayList<String>> vol = null; + HashMap<String, Integer> meta = new HashMap<String, Integer>(); + + vol = execGetFattr(filename, meta, CMD.GET_REPLICATION); + + return (short) getReplicationFromLayout(vol, meta); + + } + + public static TreeMap<Integer, GlusterFSBrickClass> quickIOPossible (String filename, long start, + long len) + throws IOException { + String realpath = null; + HashMap<String, ArrayList<String>> vol = null; + HashMap<String, Integer> meta = new HashMap<String, Integer>(); + TreeMap<Integer, GlusterFSBrickClass> hnts = new TreeMap<Integer, GlusterFSBrickClass>(); + + vol = execGetFattr(filename, meta, CMD.GET_HINTS); + getHints(vol, meta, start, len, hnts); + + if (hnts.size() == 0) + return null; // BOOM !! + + // DEBUG - dump hnts here + return hnts; + } + + public static HashMap<String, ArrayList<String>> execGetFattr (String filename, + HashMap<String, Integer> meta, + CMD cmd) + throws IOException { + Process p = null; + BufferedReader brInput = null; + String s = null; + String cmdOut = null; + String getfattrCmd = null; + String xlator = null; + String enclosingXl = null; + String enclosingXlVol = null; + String key = null; + String layout = ""; + int rcount = 0; + int scount = 0; + int dcount = 0; + int count = 0; + + HashMap<String, ArrayList<String>> vol = new HashMap<String, ArrayList<String>>(); + + getfattrCmd = "getfattr -m . -n trusted.glusterfs.pathinfo " + filename; + + p = Runtime.getRuntime().exec(getfattrCmd); + brInput = new BufferedReader(new InputStreamReader(p.getInputStream())); + + cmdOut = ""; + while ( (s = brInput.readLine()) != null ) + cmdOut += s; + + /** + * TODO: Use a single regex for extracting posix paths as well + * as xlator counts for layout matching. + */ + + Pattern pattern = Pattern.compile("<(.*?)[:\\(](.*?)>"); + Matcher matcher = pattern.matcher(cmdOut); + + Pattern p_px = Pattern.compile(".*?:(.*)"); + Matcher m_px; + String gibberish_path; + + s = null; + while (matcher.find()) { + xlator = matcher.group(1); + if (xlator.equalsIgnoreCase("posix")) { + if (enclosingXl.equalsIgnoreCase("replicate")) + count = rcount; + else if (enclosingXl.equalsIgnoreCase("stripe")) + count = scount; + else if (enclosingXl.equalsIgnoreCase("distribute")) + count = dcount; + else + throw new IOException("Unknown Translator: " + enclosingXl); + + key = enclosingXl + "-" + count; + + if (vol.get(key) == null) + vol.put(key, new ArrayList<String>()); + + gibberish_path = matcher.group(2); + + /* extract posix path from the gibberish string */ + m_px = p_px.matcher(gibberish_path); + if (!m_px.find()) + throw new IOException("Cannot extract posix path"); + + vol.get(key).add(m_px.group(1)); + continue; + } + + enclosingXl = xlator; + enclosingXlVol = matcher.group(2); + + if (xlator.equalsIgnoreCase("replicate")) + if (rcount++ != 0) + continue; + + if (xlator.equalsIgnoreCase("stripe")) { + if (scount++ != 0) + continue; + + + Pattern ps = Pattern.compile("\\[(\\d+)\\]"); + Matcher ms = ps.matcher(enclosingXlVol); + + if (ms.find()) { + if (((cmd == CMD.GET_BLOCK_SIZE) || (cmd == CMD.GET_HINTS)) + && (meta != null)) + meta.put("block-size", Integer.parseInt(ms.group(1))); + } else + throw new IOException("Cannot get stripe size"); + } + + if (xlator.equalsIgnoreCase("distribute")) + if (dcount++ != 0) + continue; + + layout += xlator.substring(0, 1); + } + + if ((dcount == 0) && (scount == 0) && (rcount == 0)) + throw new IOException("Cannot get layout"); + + if (meta != null) { + meta.put("dcount", dcount); + meta.put("scount", scount); + meta.put("rcount", rcount); + } + + vol.put("layout", new ArrayList<String>(1)); + vol.get("layout").add(layout); + + return vol; + } + + static BlockLocation[] getHints (HashMap<String, ArrayList<String>> vol, + HashMap<String, Integer> meta, + long start, long len, + TreeMap<Integer, GlusterFSBrickClass> hnts) + throws IOException { + String brick = null; + String key = null; + boolean done = false; + int i = 0; + int counter = 0; + int stripeSize = 0; + long stripeStart = 0; + long stripeEnd = 0; + int nrAllocs = 0; + int allocCtr = 0; + BlockLocation[] result = null; + ArrayList<String> brickList = null; + ArrayList<String> stripedBricks = null; + Iterator<String> it = null; + + String[] blks = null; + GlusterFSBrickRepl[] repl = null; + int dcount, scount, rcount; + + LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0)); + dcount = meta.get("dcount"); + scount = meta.get("scount"); + rcount = meta.get("rcount"); + + switch (l) { + case D: + key = "DISTRIBUTE-" + dcount; + brick = vol.get(key).get(0); + + if (hnts == null) { + result = new BlockLocation[1]; + result[0] = new BlockLocation(null, new String[] {brick2host(brick)}, start, len); + } else + hnts.put(0, new GlusterFSBrickClass(brick, start, len, false, -1, -1, -1)); + break; + + case R: + case DR: + /* just the name says it's striped - the volume isn't */ + stripedBricks = new ArrayList<String>(); + + for (i = 1; i <= rcount; i++) { + key = "REPLICATE-" + i; + brickList = vol.get(key); + it = brickList.iterator(); + while (it.hasNext()) { + stripedBricks.add(it.next()); + } + } + + nrAllocs = stripedBricks.size(); + if (hnts == null) { + result = new BlockLocation[1]; + blks = new String[nrAllocs]; + } + + for (i = 0; i < nrAllocs; i++) { + if (hnts == null) + blks[i] = brick2host(stripedBricks.get(i)); + else + hnts.put(i, new GlusterFSBrickClass(stripedBricks.get(i), start, len, false, -1, -1, -1)); + } + + if (hnts == null) + result[0] = new BlockLocation(null, blks, start, len); + + break; + + case SR: + case DSR: + int rsize = 0; + ArrayList<ArrayList<String>> replicas = new ArrayList<ArrayList<String>>(); + + stripedBricks = new ArrayList<String>(); + + if (rcount == 0) + throw new IOException("got replicated volume with replication count 0"); + + for (i = 1; i <= rcount; i++) { + key = "REPLICATE-" + i; + brickList = vol.get(key); + it = brickList.iterator(); + replicas.add(i - 1, new ArrayList<String>()); + while (it.hasNext()) { + replicas.get(i - 1).add(it.next()); + } + } + + rsize = replicas.get(0).size(); + stripeSize = meta.get("block-size"); + + nrAllocs = (int) (((len - start) / stripeSize) + 1); + if (hnts == null) { + result = new BlockLocation[nrAllocs]; + repl = new GlusterFSBrickRepl[nrAllocs]; + } + + // starting stripe position + counter = (int) ((start / stripeSize) % rcount); + stripeStart = start; + + key = null; + int currAlloc = 0; + boolean hntsDone = false; + while ((stripeStart < len) && !done) { + stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1; + if (stripeEnd > start + len) { + stripeEnd = start + len - 1; + done = true; + } + + if (hnts == null) + repl[allocCtr] = new GlusterFSBrickRepl(rsize, stripeStart, (stripeEnd - stripeStart)); + + for (i = 0; i < rsize; i++) { + brick = replicas.get(counter).get(i); + currAlloc = (allocCtr * rsize) + i; + + if (hnts == null) + repl[allocCtr].addHost(brick2host(brick)); + else + if (currAlloc <= (rsize * rcount) - 1) { + hnts.put(currAlloc, new GlusterFSBrickClass(brick, stripeStart, + (stripeEnd - stripeStart), + true, stripeSize, rcount, rsize)); + } else + hntsDone = true; + } + + if (hntsDone) + break; + + stripeStart = stripeEnd + 1; + + allocCtr++; + counter++; + + if (counter >= replicas.size()) + counter = 0; + } + + if (hnts == null) + for (int k = 0; k < nrAllocs; k++) + result[k] = new BlockLocation(null, repl[k].getReplHosts(), repl[k].getStartLen(), repl[k].getOffLen()); + + break; + + case S: + case DS: + if (scount == 0) + throw new IOException("got striped volume with stripe count 0"); + + stripedBricks = new ArrayList<String>(); + stripeSize = meta.get("block-size"); + + key = "STRIPE-" + scount; + brickList = vol.get(key); + it = brickList.iterator(); + while (it.hasNext()) { + stripedBricks.add(it.next()); + } + + nrAllocs = (int) ((len - start) / stripeSize) + 1; + if (hnts == null) + result = new BlockLocation[nrAllocs]; + + // starting stripe position + counter = (int) ((start / stripeSize) % stripedBricks.size()); + stripeStart = start; + + key = null; + while ((stripeStart < len) && !done) { + brick = stripedBricks.get(counter); + + stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1; + if (stripeEnd > start + len) { + stripeEnd = start + len - 1; + done = true; + } + + if (hnts == null) + result[allocCtr] = new BlockLocation(null, new String[] {brick2host(brick)}, + stripeStart, (stripeEnd - stripeStart)); + else + if (allocCtr <= stripedBricks.size()) { + hnts.put(allocCtr, new GlusterFSBrickClass(brick, stripeStart, (stripeEnd - stripeStart), + true, stripeSize, stripedBricks.size(), -1)); + } else + break; + + stripeStart = stripeEnd + 1; + + counter++; + allocCtr++; + + if (counter >= stripedBricks.size()) + counter = 0; + } + + break; + } + + return result; + } + + /* TODO: use meta{dcount,scount,rcount} for checking */ + public static int getReplicationFromLayout (HashMap<String, ArrayList<String>> vol, + HashMap<String, Integer> meta) + throws IOException { + int replication = 0; + LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0)); + + switch (l) { + case D: + case S: + case DS: + replication = 1; + break; + + case R: + case DR: + case SR: + case DSR: + final String key = "REPLICATION-1"; + replication = vol.get(key).size(); + } + + return replication; + } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java new file mode 100644 index 00000000000..e92237aeea2 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java @@ -0,0 +1,205 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; +import java.util.TreeMap; + +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + + +public class GlusterFUSEInputStream extends FSInputStream { + File f; + boolean lastActive; + long pos; + boolean closed; + String thisHost; + RandomAccessFile fuseInputStream; + RandomAccessFile fsInputStream; + GlusterFSBrickClass thisBrick; + int nodeLocation; + TreeMap<Integer, GlusterFSBrickClass> hnts; + + public GlusterFUSEInputStream (File f, TreeMap<Integer, GlusterFSBrickClass> hnts, + String hostname) throws IOException { + this.f = f; + this.pos = 0; + this.closed = false; + this.hnts = hnts; + this.thisHost = hostname; + this.fsInputStream = null; + this.fuseInputStream = new RandomAccessFile(f.getPath(), "r"); + + this.lastActive = true; // true == FUSE, false == backed file + + String directFilePath = null; + if (this.hnts != null) { + directFilePath = findLocalFile(f.getPath(), this.hnts); + if (directFilePath != null) { + this.fsInputStream = new RandomAccessFile(directFilePath, "r"); + this.lastActive = !this.lastActive; + } + } + } + + public String findLocalFile (String path, TreeMap<Integer, GlusterFSBrickClass> hnts) { + int i = 0; + String actFilePath = null; + GlusterFSBrickClass gfsBrick = null; + + gfsBrick = hnts.get(0); + + /* do a linear search for the matching host not worrying + about file stripes */ + for (i = 0; i < hnts.size(); i++) { + gfsBrick = hnts.get(i); + actFilePath = gfsBrick.brickIsLocal(this.thisHost); + if (actFilePath != null) { + this.thisBrick = gfsBrick; + this.nodeLocation = i; + break; + } + } + + return actFilePath; + } + + public long getPos () throws IOException { + return pos; + } + + public synchronized int available () throws IOException { + return (int) ((f.length()) - getPos()); + } + + public void seek (long pos) throws IOException { + fuseInputStream.seek(pos); + if (fsInputStream != null) + fsInputStream.seek(pos); + } + + public boolean seekToNewSource (long pos) throws IOException { + return false; + } + + public RandomAccessFile chooseStream (long start, int[] nlen) + throws IOException { + GlusterFSBrickClass gfsBrick = null; + RandomAccessFile in = fuseInputStream; + boolean oldActiveStream = lastActive; + lastActive = true; + + if ((hnts != null) && (fsInputStream != null)) { + gfsBrick = hnts.get(0); + if (!gfsBrick.isChunked()) { + in = fsInputStream; + lastActive = false; + } else { + // find the current location in the tree and the amount of data it can serve + int[] nodeInTree = thisBrick.getBrickNumberInTree(start, nlen[0]); + + // does this node hold the byte ranges we have been requested for ? + if ((nodeInTree[2] != 0) && thisBrick.brickHasFilePart(nodeInTree[0], nodeLocation)) { + in = fsInputStream; + nlen[0] = nodeInTree[2]; // the amount of data that can be read from the stripe + lastActive = false; + } + } + } + + return in; + } + + public synchronized int read () throws IOException { + int byteRead = 0; + RandomAccessFile in = null; + + if (closed) + throw new IOException("Stream Closed."); + + int[] nlen = { 1 }; + + in = chooseStream(getPos(), nlen); + + byteRead = in.read(); + if (byteRead >= 0) { + pos++; + syncStreams(1); + } + + return byteRead; + } + + public synchronized int read (byte buff[], int off, int len) throws + IOException { + int result = 0; + RandomAccessFile in = null; + + if (closed) + throw new IOException("Stream Closed."); + + int[] nlen = {len}; // hack to make len mutable + in = chooseStream(pos, nlen); + + result = in.read(buff, off, nlen[0]); + if (result > 0) { + pos += result; + syncStreams(result); + } + + return result; + } + + /** + * TODO: use seek() insted of skipBytes(); skipBytes does I/O + */ + public void syncStreams (int bytes) throws IOException { + if ((hnts != null) && (hnts.get(0).isChunked()) && (fsInputStream != null)) + if (!this.lastActive) + fuseInputStream.skipBytes(bytes); + else + fsInputStream.skipBytes(bytes); + } + + public synchronized void close () throws IOException { + if (closed) + throw new IOException("Stream closed."); + + super.close(); + if (fsInputStream != null) + fsInputStream.close(); + fuseInputStream.close(); + + closed = true; + } + + // Not supported - mark () and reset () + + public boolean markSupported () { + return false; + } + + public void mark (int readLimit) {} + + public void reset () throws IOException { + throw new IOException("Mark/Reset not supported."); + } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java new file mode 100644 index 00000000000..5192a0a56cb --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java @@ -0,0 +1,86 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.fs.FileSystem; + +public class GlusterFUSEOutputStream extends OutputStream { + File f; + long pos; + boolean closed; + OutputStream fuseOutputStream; + + public GlusterFUSEOutputStream (String file, boolean append) throws + IOException { + this.f = new File(file); /* not needed ? */ + this.pos = 0; + this.fuseOutputStream = new FileOutputStream(file, append); + this.closed = false; + } + + public long getPos () throws IOException { + return pos; + } + + public void write (int v) throws IOException { + if (closed) + throw new IOException("Stream closed."); + + byte[] b = new byte[1]; + b[0] = (byte) v; + + write(b, 0, 1); + } + + public void write (byte b[]) throws IOException { + if (closed) + throw new IOException("Stream closed."); + + fuseOutputStream.write(b, 0, b.length); + pos += (long) b.length; + } + + public void write (byte b[], int off, int len) throws IOException { + if (closed) + throw new IOException("Stream closed."); + + fuseOutputStream.write(b, off, len); + pos += (long) len; + } + + public void flush () throws IOException { + if (closed) + throw new IOException("Stream closed."); + + fuseOutputStream.flush(); + } + + public void close () throws IOException { + if (closed) + throw new IOException("Stream closed."); + + flush(); + fuseOutputStream.close(); + closed = true; + } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java new file mode 100644 index 00000000000..b0501cced27 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java @@ -0,0 +1,492 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +/** + * Implements the Hadoop FileSystem Interface to allow applications to store + * files on GlusterFS and run Map/Reduce jobs on the data. + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; +import java.net.*; + +import java.util.regex.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.util.TreeMap; + +/* + * This package provides interface for hadoop jobs (incl. Map/Reduce) + * to access files in GlusterFS backed file system via FUSE mount + */ +public class GlusterFileSystem extends FileSystem { + + private FileSystem glusterFs = null; + private URI uri = null; + private Path workingDir = null; + private String glusterMount = null; + private boolean mounted = false; + + /* for quick IO */ + private boolean quickSlaveIO = false; + + /* extended attribute class */ + private GlusterFSXattr xattr = null; + + /* hostname of this machine */ + private static String hostname; + + public GlusterFileSystem () { + + } + + public URI getUri () { + return uri; + } + + public boolean FUSEMount (String volname, String server, String mount) + throws IOException, InterruptedException { + boolean ret = true; + int retVal = 0; + Process p = null; + String s = null; + String mountCmd = null; + + mountCmd = "mount -t glusterfs " + server + ":" + "/" + volname + " " + mount; + + try { + p = Runtime.getRuntime().exec(mountCmd); + + retVal = p.waitFor(); + if (retVal != 0) + ret = false; + + } catch (IOException e) { + System.out.println ("Problem mounting FUSE mount on: " + mount); + e.printStackTrace(); + System.exit(-1); + } + + return ret; + } + + public void initialize (URI uri, Configuration conf) throws IOException { + boolean ret = false; + String volName = null; + String remoteGFSServer = null; + String needQuickRead = null; + + if (this.mounted) + return; + + System.out.println("Initializing GlusterFS"); + + try { + volName = conf.get("fs.glusterfs.volname", ""); + glusterMount = conf.get("fs.glusterfs.mount", ""); + remoteGFSServer = conf.get("fs.glusterfs.server", ""); + needQuickRead = conf.get("quick.slave.io", ""); + + /* + * bail out if we do not have enough information to do a FUSE + * mount + */ + if ( (volName.length() == 0) || (remoteGFSServer.length() == 0) || + (glusterMount.length() == 0) ) + System.exit (-1); + + ret = FUSEMount(volName, remoteGFSServer, glusterMount); + if (!ret) { + System.out.println("Failed to initialize GlusterFS"); + System.exit(-1); + } + + if ((needQuickRead.length() != 0) + && (needQuickRead.equalsIgnoreCase("yes") + || needQuickRead.equalsIgnoreCase("on") + || needQuickRead.equals("1"))) + this.quickSlaveIO = true; + + this.mounted = true; + this.glusterFs = FileSystem.getLocal(conf); + this.workingDir = new Path(glusterMount); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + this.xattr = new GlusterFSXattr(); + + InetAddress addr = InetAddress.getLocalHost(); + this.hostname = addr.getHostName(); + + setConf(conf); + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Unable to initialize GlusterFS"); + System.exit(-1); + } + } + + @Deprecated + public String getName () { + return getUri().toString(); + } + + public Path getWorkingDirectory () { + return this.workingDir; + } + + public Path getHomeDirectory () { + return this.workingDir; + } + + public Path makeAbsolute (Path path) { + String pth = path.toUri().getPath(); + if (pth.startsWith(workingDir.toUri().getPath())) { + return path; + } + + return new Path(workingDir + "/" + pth); + } + + public void setWorkingDirectory (Path dir) { + this.workingDir = makeAbsolute(dir); + } + + public boolean exists (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + return f.exists(); + } + + public boolean mkdirs (Path path, FsPermission permission + ) throws IOException { + boolean created = false; + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + if (f.exists()) { + System.out.println("Directory " + f.getPath() + " already exist"); + return true; + } + + return f.mkdirs(); + } + + @Deprecated + public boolean isDirectory (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + return f.isDirectory(); + } + + public boolean isFile (Path path) throws IOException { + return !isDirectory(path); + } + + public Path[] listPaths (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File (absolute.toUri().getPath()); + String relPath = path.toUri().getPath(); + String[] fileList = null; + Path[] filePath = null; + int fileCnt = 0; + + fileList = f.list(); + + filePath = new Path[fileList.length]; + + for (; fileCnt < fileList.length; fileCnt++) { + filePath[fileCnt] = new Path(relPath + "/" + fileList[fileCnt]); + } + + return filePath; + } + + public FileStatus[] listStatus (Path path) throws IOException { + int fileCnt = 0; + Path absolute = makeAbsolute(path); + String relpath = path.toUri().getPath(); + String[] strFileList = null; + FileStatus[] fileStatus = null; + File f = new File(absolute.toUri().getPath()); + + if (!f.exists()) { + return null; + } + + if (f.isFile()) + return new FileStatus[] { + getFileStatus(path) + }; + + if (relpath.charAt(relpath.length() - 1) != '/') + relpath += "/"; + + strFileList = f.list(); + + fileStatus = new FileStatus[strFileList.length]; + + for (; fileCnt < strFileList.length; fileCnt++) { + fileStatus[fileCnt] = getFileStatusFromFileString(relpath + strFileList[fileCnt]); + } + + return fileStatus; + } + + public FileStatus getFileStatusFromFileString (String path) + throws IOException { + Path nPath = new Path(path); + return getFileStatus(nPath); + } + + public FileStatus getFileStatus (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + if (!f.exists ()) + throw new FileNotFoundException("File " + f.getPath() + " does not exist."); + + if (f.isDirectory ()) + return new FileStatus(0, true, 1, 0, f.lastModified(), path.makeQualified(this)); + else + return new FileStatus(f.length(), false, 0, getDefaultBlockSize(), + f.lastModified(), path.makeQualified(this)); + + } + + /* + * creates a new file in glusterfs namespace. internally the file + * descriptor is an instance of OutputStream class. + */ + public FSDataOutputStream create (Path path, FsPermission permission, + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) + throws IOException { + Path absolute = makeAbsolute(path); + Path parent = null; + File f = null; + File fParent = null; + FSDataOutputStream glusterFileStream = null; + + f = new File(absolute.toUri().getPath()); + + if (f.exists ()) { + if (overwrite) + f.delete (); + else + throw new IOException(f.getPath() + " already exist"); + } + + parent = path.getParent(); + fParent = new File ((makeAbsolute(parent)).toUri().getPath()); + if ((parent != null) && (fParent != null) && (!fParent.exists())) + if (!fParent.mkdirs()) + throw new IOException("cannot create parent directory: " + fParent.getPath()); + + glusterFileStream = new FSDataOutputStream(new GlusterFUSEOutputStream + (f.getPath(), false)); + + return glusterFileStream; + } + + /* + * open the file in read mode (internally the file descriptor is an + * instance of InputStream class). + * + * if quick read mode is set then read the file by by-passing FUSE + * if we are on same slave where the file exist + */ + public FSDataInputStream open (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + FSDataInputStream glusterFileStream = null; + TreeMap<Integer, GlusterFSBrickClass> hnts = null; + + if (!f.exists()) + throw new IOException("File " + f.getPath() + " does not exist."); + + if (quickSlaveIO) + hnts = xattr.quickIOPossible(f.getPath(), 0, f.length()); + + glusterFileStream = new FSDataInputStream(new GlusterFUSEInputStream(f, hnts, hostname)); + return glusterFileStream; + } + + public FSDataInputStream open (Path path, int bufferSize) throws IOException { + return open(path); + } + + public FSDataOutputStream append (Path f, int bufferSize, Progressable progress) + throws IOException { + throw new IOException ("append not supported (as yet)."); + } + + public boolean rename (Path src, Path dst) throws IOException { + Path absoluteSrc = makeAbsolute(src); + Path absoluteDst = makeAbsolute(dst); + + File fSrc = new File(absoluteSrc.toUri().getPath()); + File fDst = new File(absoluteDst.toUri().getPath()); + + if (fDst.isDirectory()) { + fDst = null; + String newPath = absoluteDst.toUri().getPath() + "/" + fSrc.getName(); + fDst = new File(newPath); + } + return fSrc.renameTo(fDst); + } + + @Deprecated + public boolean delete (Path path) throws IOException { + return delete(path, true); + } + + public boolean delete (Path path, boolean recursive) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + if (f.isFile()) + return f.delete(); + + FileStatus[] dirEntries = listStatus(absolute); + if ((!recursive) && (dirEntries != null) && (dirEntries.length != 0)) + throw new IOException ("Directory " + path.toString() + " is not empty"); + + if (dirEntries != null) + for (int i = 0; i < dirEntries.length; i++) + delete(new Path(absolute, dirEntries[i].getPath()), recursive); + + return f.delete(); + } + + @Deprecated + public long getLength (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + if (!f.exists()) + throw new IOException(f.getPath() + " does not exist."); + + return f.length(); + } + + @Deprecated + public short getReplication (Path path) throws IOException { + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + if (!f.exists()) + throw new IOException(f.getPath() + " does not exist."); + + return xattr.getReplication(f.getPath()); + } + + public short getDefaultReplication (Path path) throws IOException { + return getReplication(path); + } + + public boolean setReplication (Path path, short replication) + throws IOException { + return true; + } + + public long getBlockSize (Path path) throws IOException { + long blkSz; + Path absolute = makeAbsolute(path); + File f = new File(absolute.toUri().getPath()); + + blkSz = xattr.getBlockSize(f.getPath()); + if (blkSz == 0) + blkSz = getLength(path); + + return blkSz; + } + + public long getDefaultBlockSize () { + return 1 << 26; /* default's from hdfs, kfs */ + } + + @Deprecated + public void lock (Path path, boolean shared) throws IOException { + } + + @Deprecated + public void release (Path path) throws IOException { + } + + public BlockLocation[] getFileBlockLocations (FileStatus file, long start, long len) + throws IOException { + + Path absolute = makeAbsolute(file.getPath()); + File f = new File(absolute.toUri().getPath()); + BlockLocation[] result = null; + + if (file == null) + return null; + + result = xattr.getPathInfo(f.getPath(), start, len); + if (result == null) { + System.out.println("Problem getting destination host for file " + + f.getPath()); + return null; + } + + return result; + } + + // getFileBlockLocations (FileStatus, long, long) is called by hadoop + public BlockLocation[] getFileBlockLocations (Path p, long start, long len) + throws IOException { + return null; + } + + public void copyFromLocalFile (boolean delSrc, Path src, Path dst) + throws IOException { + FileUtil.copy(glusterFs, src, this, dst, delSrc, getConf()); + } + + public void copyToLocalFile (boolean delSrc, Path src, Path dst) + throws IOException { + FileUtil.copy(this, src, glusterFs, dst, delSrc, getConf()); + } + + public Path startLocalOutput (Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return tmpLocalFile; + } + + public void completeLocalOutput (Path fsOutputFile, Path tmpLocalFile) + throws IOException { + moveFromLocalFile(tmpLocalFile, fsOutputFile); + } +} diff --git a/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java new file mode 100644 index 00000000000..21e188c52bc --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.fs.glusterfs; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} |