summaryrefslogtreecommitdiffstats
path: root/glusterfs-hadoop/0.20.2
diff options
context:
space:
mode:
authorM S Vishwanath Bhat <vishwanath@gluster.com>2012-02-24 13:18:56 +0530
committerVijay Bellur <vijay@gluster.com>2012-03-07 23:18:29 -0800
commit5fdd65f5f4f5df1d28b0fb4f7efed226d5db1b3c (patch)
tree377a94774c5cd9f55b16ba6fcd1c7b5ec51bfa3b /glusterfs-hadoop/0.20.2
parente1ab347720f25ed2e7db633a7202f7b873f4b90a (diff)
renaming hdfs -> glusterfs-hadoop
Change-Id: Ibb937af1231f6bbed9a2d4eaeabc6e9d4000887f BUG: 797064 Signed-off-by: M S Vishwanath Bhat <vishwanath@gluster.com> Reviewed-on: http://review.gluster.com/2811 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vijay@gluster.com>
Diffstat (limited to 'glusterfs-hadoop/0.20.2')
-rw-r--r--glusterfs-hadoop/0.20.2/conf/core-site.xml38
-rw-r--r--glusterfs-hadoop/0.20.2/pom.xml36
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java109
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java52
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java471
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java205
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java86
-rw-r--r--glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java492
-rw-r--r--glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java38
-rw-r--r--glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py212
10 files changed, 1739 insertions, 0 deletions
diff --git a/glusterfs-hadoop/0.20.2/conf/core-site.xml b/glusterfs-hadoop/0.20.2/conf/core-site.xml
new file mode 100644
index 00000000..d7f75fca
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/conf/core-site.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>fs.glusterfs.impl</name>
+ <value>org.apache.hadoop.fs.glusterfs.GlusterFileSystem</value>
+ </property>
+
+ <property>
+ <name>fs.default.name</name>
+ <value>glusterfs://192.168.1.36:9000</value>
+ </property>
+
+ <property>
+ <name>fs.glusterfs.volname</name>
+ <value>volume-dist-rep</value>
+ </property>
+
+ <property>
+ <name>fs.glusterfs.mount</name>
+ <value>/mnt/glusterfs</value>
+ </property>
+
+ <property>
+ <name>fs.glusterfs.server</name>
+ <value>192.168.1.36</value>
+ </property>
+
+ <property>
+ <name>quick.slave.io</name>
+ <value>Off</value>
+ </property>
+
+</configuration>
diff --git a/glusterfs-hadoop/0.20.2/pom.xml b/glusterfs-hadoop/0.20.2/pom.xml
new file mode 100644
index 00000000..fe661d40
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/pom.xml
@@ -0,0 +1,36 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop.fs.glusterfs</groupId>
+ <artifactId>glusterfs</artifactId>
+ <packaging>jar</packaging>
+ <version>0.20.2-0.1</version>
+ <name>glusterfs</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.5</source>
+ <target>1.5</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java
new file mode 100644
index 00000000..e633b8aa
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.io.*;
+
+public class GlusterFSBrickClass {
+ String host;
+ String exportedFile;
+ long start;
+ long end;
+ boolean isChunked;
+ int stripeSize; // Stripe size in bytes
+ int nrStripes; // number of stripes
+ int switchCount; // for SR, DSR - number of replicas of each stripe
+ // -1 for others
+
+ public GlusterFSBrickClass (String brick, long start, long len, boolean flag,
+ int stripeSize, int nrStripes, int switchCount)
+ throws IOException {
+ this.host = brick2host(brick);
+ this.exportedFile = brick2file(brick);
+ this.start = start;
+ this.end = start + len;
+ this.isChunked = flag;
+ this.stripeSize = stripeSize;
+ this.nrStripes = nrStripes;
+ this.switchCount = switchCount;
+ }
+
+ public boolean isChunked () {
+ return isChunked;
+ }
+
+ public String brickIsLocal(String hostname) {
+ String path = null;
+ File f = null;
+ if (host.equals(hostname))
+ path = exportedFile;
+
+ return path;
+ }
+
+ public int[] getBrickNumberInTree(long start, int len) {
+ long end = len;
+ int startNodeInTree = ((int) (start / stripeSize)) % nrStripes;
+ int endNodeInTree = ((int) ((start + len) / stripeSize)) % nrStripes;
+
+ if (startNodeInTree != endNodeInTree) {
+ end = (start - (start % stripeSize)) + stripeSize;
+ end -= start;
+ }
+
+ return new int[] {startNodeInTree, endNodeInTree, (int) end};
+ }
+
+ public boolean brickHasFilePart(int nodeInTree, int nodeLoc) {
+ if (switchCount == -1)
+ return (nodeInTree == nodeLoc);
+
+ nodeInTree *= switchCount;
+ for (int i = nodeInTree; i < (nodeInTree + switchCount); i++) {
+ if (i == nodeLoc)
+ return true;
+ }
+
+ return false;
+ }
+
+ public String brick2host (String brick)
+ throws IOException {
+ String[] hf = null;
+
+ hf = brick.split(":");
+ if (hf.length != 2)
+ throw new IOException("Error getting hostname from brick");
+
+ return hf[0];
+ }
+
+ public String brick2file (String brick)
+ throws IOException {
+ String[] hf = null;
+
+ hf = brick.split(":");
+ if (hf.length != 2)
+ throw new IOException("Error getting hostname from brick");
+
+ return hf[1];
+ }
+
+}
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java
new file mode 100644
index 00000000..11454e63
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.io.*;
+
+public class GlusterFSBrickRepl {
+ private String[] replHost;
+ private long start;
+ private long len;
+ private int cnt;
+
+ GlusterFSBrickRepl(int replCount, long start, long len) {
+ this.replHost = new String[replCount];
+ this.start = start;
+ this.len = len;
+ this.cnt = 0;
+ }
+
+ public void addHost (String host) {
+ this.replHost[cnt++] = host;
+ }
+
+ public String[] getReplHosts () {
+ return this.replHost;
+ }
+
+ public long getStartLen () {
+ return this.start;
+ }
+
+ public long getOffLen () {
+ return this.len;
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java
new file mode 100644
index 00000000..18e9003b
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java
@@ -0,0 +1,471 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.net.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.io.*;
+import java.util.HashMap;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.BlockLocation;
+
+public class GlusterFSXattr {
+
+ public enum LAYOUT { D, S, R, DS, DR, SR, DSR }
+ public enum CMD { GET_HINTS, GET_REPLICATION, GET_BLOCK_SIZE, CHECK_FOR_QUICK_IO }
+
+ private static String hostname;
+
+ public GlusterFSXattr() { }
+
+ public static String brick2host (String brick)
+ throws IOException {
+ String[] hf = null;
+
+ hf = brick.split(":");
+ if (hf.length != 2) {
+ System.out.println("brick not of format hostname:path");
+ throw new IOException("Error getting hostname from brick");
+ }
+
+ return hf[0];
+ }
+
+ public static String brick2file (String brick)
+ throws IOException {
+ String[] hf = null;
+
+ hf = brick.split(":");
+ if (hf.length != 2) {
+ System.out.println("brick not of format hostname:path");
+ throw new IOException("Error getting hostname from brick");
+ }
+
+ return hf[1];
+ }
+
+ public static BlockLocation[] getPathInfo (String filename, long start, long len)
+ throws IOException {
+ HashMap<String, ArrayList<String>> vol = null;
+ HashMap<String, Integer> meta = new HashMap<String, Integer>();
+
+ vol = execGetFattr(filename, meta, CMD.GET_HINTS);
+
+ return getHints(vol, meta, start, len, null);
+ }
+
+ public static long getBlockSize (String filename)
+ throws IOException {
+ HashMap<String, ArrayList<String>> vol = null;
+ HashMap<String, Integer> meta = new HashMap<String, Integer>();
+
+ vol = execGetFattr(filename, meta, CMD.GET_BLOCK_SIZE);
+
+ if (!meta.containsKey("block-size"))
+ return 0;
+
+ return (long) meta.get("block-size");
+
+ }
+
+ public static short getReplication (String filename)
+ throws IOException {
+ HashMap<String, ArrayList<String>> vol = null;
+ HashMap<String, Integer> meta = new HashMap<String, Integer>();
+
+ vol = execGetFattr(filename, meta, CMD.GET_REPLICATION);
+
+ return (short) getReplicationFromLayout(vol, meta);
+
+ }
+
+ public static TreeMap<Integer, GlusterFSBrickClass> quickIOPossible (String filename, long start,
+ long len)
+ throws IOException {
+ String realpath = null;
+ HashMap<String, ArrayList<String>> vol = null;
+ HashMap<String, Integer> meta = new HashMap<String, Integer>();
+ TreeMap<Integer, GlusterFSBrickClass> hnts = new TreeMap<Integer, GlusterFSBrickClass>();
+
+ vol = execGetFattr(filename, meta, CMD.GET_HINTS);
+ getHints(vol, meta, start, len, hnts);
+
+ if (hnts.size() == 0)
+ return null; // BOOM !!
+
+ // DEBUG - dump hnts here
+ return hnts;
+ }
+
+ public static HashMap<String, ArrayList<String>> execGetFattr (String filename,
+ HashMap<String, Integer> meta,
+ CMD cmd)
+ throws IOException {
+ Process p = null;
+ BufferedReader brInput = null;
+ String s = null;
+ String cmdOut = null;
+ String getfattrCmd = null;
+ String xlator = null;
+ String enclosingXl = null;
+ String enclosingXlVol = null;
+ String key = null;
+ String layout = "";
+ int rcount = 0;
+ int scount = 0;
+ int dcount = 0;
+ int count = 0;
+
+ HashMap<String, ArrayList<String>> vol = new HashMap<String, ArrayList<String>>();
+
+ getfattrCmd = "getfattr -m . -n trusted.glusterfs.pathinfo " + filename;
+
+ p = Runtime.getRuntime().exec(getfattrCmd);
+ brInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
+
+ cmdOut = "";
+ while ( (s = brInput.readLine()) != null )
+ cmdOut += s;
+
+ /**
+ * TODO: Use a single regex for extracting posix paths as well
+ * as xlator counts for layout matching.
+ */
+
+ Pattern pattern = Pattern.compile("<(.*?)[:\\(](.*?)>");
+ Matcher matcher = pattern.matcher(cmdOut);
+
+ Pattern p_px = Pattern.compile(".*?:(.*)");
+ Matcher m_px;
+ String gibberish_path;
+
+ s = null;
+ while (matcher.find()) {
+ xlator = matcher.group(1);
+ if (xlator.equalsIgnoreCase("posix")) {
+ if (enclosingXl.equalsIgnoreCase("replicate"))
+ count = rcount;
+ else if (enclosingXl.equalsIgnoreCase("stripe"))
+ count = scount;
+ else if (enclosingXl.equalsIgnoreCase("distribute"))
+ count = dcount;
+ else
+ throw new IOException("Unknown Translator: " + enclosingXl);
+
+ key = enclosingXl + "-" + count;
+
+ if (vol.get(key) == null)
+ vol.put(key, new ArrayList<String>());
+
+ gibberish_path = matcher.group(2);
+
+ /* extract posix path from the gibberish string */
+ m_px = p_px.matcher(gibberish_path);
+ if (!m_px.find())
+ throw new IOException("Cannot extract posix path");
+
+ vol.get(key).add(m_px.group(1));
+ continue;
+ }
+
+ enclosingXl = xlator;
+ enclosingXlVol = matcher.group(2);
+
+ if (xlator.equalsIgnoreCase("replicate"))
+ if (rcount++ != 0)
+ continue;
+
+ if (xlator.equalsIgnoreCase("stripe")) {
+ if (scount++ != 0)
+ continue;
+
+
+ Pattern ps = Pattern.compile("\\[(\\d+)\\]");
+ Matcher ms = ps.matcher(enclosingXlVol);
+
+ if (ms.find()) {
+ if (((cmd == CMD.GET_BLOCK_SIZE) || (cmd == CMD.GET_HINTS))
+ && (meta != null))
+ meta.put("block-size", Integer.parseInt(ms.group(1)));
+ } else
+ throw new IOException("Cannot get stripe size");
+ }
+
+ if (xlator.equalsIgnoreCase("distribute"))
+ if (dcount++ != 0)
+ continue;
+
+ layout += xlator.substring(0, 1);
+ }
+
+ if ((dcount == 0) && (scount == 0) && (rcount == 0))
+ throw new IOException("Cannot get layout");
+
+ if (meta != null) {
+ meta.put("dcount", dcount);
+ meta.put("scount", scount);
+ meta.put("rcount", rcount);
+ }
+
+ vol.put("layout", new ArrayList<String>(1));
+ vol.get("layout").add(layout);
+
+ return vol;
+ }
+
+ static BlockLocation[] getHints (HashMap<String, ArrayList<String>> vol,
+ HashMap<String, Integer> meta,
+ long start, long len,
+ TreeMap<Integer, GlusterFSBrickClass> hnts)
+ throws IOException {
+ String brick = null;
+ String key = null;
+ boolean done = false;
+ int i = 0;
+ int counter = 0;
+ int stripeSize = 0;
+ long stripeStart = 0;
+ long stripeEnd = 0;
+ int nrAllocs = 0;
+ int allocCtr = 0;
+ BlockLocation[] result = null;
+ ArrayList<String> brickList = null;
+ ArrayList<String> stripedBricks = null;
+ Iterator<String> it = null;
+
+ String[] blks = null;
+ GlusterFSBrickRepl[] repl = null;
+ int dcount, scount, rcount;
+
+ LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0));
+ dcount = meta.get("dcount");
+ scount = meta.get("scount");
+ rcount = meta.get("rcount");
+
+ switch (l) {
+ case D:
+ key = "DISTRIBUTE-" + dcount;
+ brick = vol.get(key).get(0);
+
+ if (hnts == null) {
+ result = new BlockLocation[1];
+ result[0] = new BlockLocation(null, new String[] {brick2host(brick)}, start, len);
+ } else
+ hnts.put(0, new GlusterFSBrickClass(brick, start, len, false, -1, -1, -1));
+ break;
+
+ case R:
+ case DR:
+ /* just the name says it's striped - the volume isn't */
+ stripedBricks = new ArrayList<String>();
+
+ for (i = 1; i <= rcount; i++) {
+ key = "REPLICATE-" + i;
+ brickList = vol.get(key);
+ it = brickList.iterator();
+ while (it.hasNext()) {
+ stripedBricks.add(it.next());
+ }
+ }
+
+ nrAllocs = stripedBricks.size();
+ if (hnts == null) {
+ result = new BlockLocation[1];
+ blks = new String[nrAllocs];
+ }
+
+ for (i = 0; i < nrAllocs; i++) {
+ if (hnts == null)
+ blks[i] = brick2host(stripedBricks.get(i));
+ else
+ hnts.put(i, new GlusterFSBrickClass(stripedBricks.get(i), start, len, false, -1, -1, -1));
+ }
+
+ if (hnts == null)
+ result[0] = new BlockLocation(null, blks, start, len);
+
+ break;
+
+ case SR:
+ case DSR:
+ int rsize = 0;
+ ArrayList<ArrayList<String>> replicas = new ArrayList<ArrayList<String>>();
+
+ stripedBricks = new ArrayList<String>();
+
+ if (rcount == 0)
+ throw new IOException("got replicated volume with replication count 0");
+
+ for (i = 1; i <= rcount; i++) {
+ key = "REPLICATE-" + i;
+ brickList = vol.get(key);
+ it = brickList.iterator();
+ replicas.add(i - 1, new ArrayList<String>());
+ while (it.hasNext()) {
+ replicas.get(i - 1).add(it.next());
+ }
+ }
+
+ rsize = replicas.get(0).size();
+ stripeSize = meta.get("block-size");
+
+ nrAllocs = (int) (((len - start) / stripeSize) + 1);
+ if (hnts == null) {
+ result = new BlockLocation[nrAllocs];
+ repl = new GlusterFSBrickRepl[nrAllocs];
+ }
+
+ // starting stripe position
+ counter = (int) ((start / stripeSize) % rcount);
+ stripeStart = start;
+
+ key = null;
+ int currAlloc = 0;
+ boolean hntsDone = false;
+ while ((stripeStart < len) && !done) {
+ stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1;
+ if (stripeEnd > start + len) {
+ stripeEnd = start + len - 1;
+ done = true;
+ }
+
+ if (hnts == null)
+ repl[allocCtr] = new GlusterFSBrickRepl(rsize, stripeStart, (stripeEnd - stripeStart));
+
+ for (i = 0; i < rsize; i++) {
+ brick = replicas.get(counter).get(i);
+ currAlloc = (allocCtr * rsize) + i;
+
+ if (hnts == null)
+ repl[allocCtr].addHost(brick2host(brick));
+ else
+ if (currAlloc <= (rsize * rcount) - 1) {
+ hnts.put(currAlloc, new GlusterFSBrickClass(brick, stripeStart,
+ (stripeEnd - stripeStart),
+ true, stripeSize, rcount, rsize));
+ } else
+ hntsDone = true;
+ }
+
+ if (hntsDone)
+ break;
+
+ stripeStart = stripeEnd + 1;
+
+ allocCtr++;
+ counter++;
+
+ if (counter >= replicas.size())
+ counter = 0;
+ }
+
+ if (hnts == null)
+ for (int k = 0; k < nrAllocs; k++)
+ result[k] = new BlockLocation(null, repl[k].getReplHosts(), repl[k].getStartLen(), repl[k].getOffLen());
+
+ break;
+
+ case S:
+ case DS:
+ if (scount == 0)
+ throw new IOException("got striped volume with stripe count 0");
+
+ stripedBricks = new ArrayList<String>();
+ stripeSize = meta.get("block-size");
+
+ key = "STRIPE-" + scount;
+ brickList = vol.get(key);
+ it = brickList.iterator();
+ while (it.hasNext()) {
+ stripedBricks.add(it.next());
+ }
+
+ nrAllocs = (int) ((len - start) / stripeSize) + 1;
+ if (hnts == null)
+ result = new BlockLocation[nrAllocs];
+
+ // starting stripe position
+ counter = (int) ((start / stripeSize) % stripedBricks.size());
+ stripeStart = start;
+
+ key = null;
+ while ((stripeStart < len) && !done) {
+ brick = stripedBricks.get(counter);
+
+ stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1;
+ if (stripeEnd > start + len) {
+ stripeEnd = start + len - 1;
+ done = true;
+ }
+
+ if (hnts == null)
+ result[allocCtr] = new BlockLocation(null, new String[] {brick2host(brick)},
+ stripeStart, (stripeEnd - stripeStart));
+ else
+ if (allocCtr <= stripedBricks.size()) {
+ hnts.put(allocCtr, new GlusterFSBrickClass(brick, stripeStart, (stripeEnd - stripeStart),
+ true, stripeSize, stripedBricks.size(), -1));
+ } else
+ break;
+
+ stripeStart = stripeEnd + 1;
+
+ counter++;
+ allocCtr++;
+
+ if (counter >= stripedBricks.size())
+ counter = 0;
+ }
+
+ break;
+ }
+
+ return result;
+ }
+
+ /* TODO: use meta{dcount,scount,rcount} for checking */
+ public static int getReplicationFromLayout (HashMap<String, ArrayList<String>> vol,
+ HashMap<String, Integer> meta)
+ throws IOException {
+ int replication = 0;
+ LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0));
+
+ switch (l) {
+ case D:
+ case S:
+ case DS:
+ replication = 1;
+ break;
+
+ case R:
+ case DR:
+ case SR:
+ case DSR:
+ final String key = "REPLICATION-1";
+ replication = vol.get(key).size();
+ }
+
+ return replication;
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java
new file mode 100644
index 00000000..e92237ae
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java
@@ -0,0 +1,205 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.io.*;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+
+public class GlusterFUSEInputStream extends FSInputStream {
+ File f;
+ boolean lastActive;
+ long pos;
+ boolean closed;
+ String thisHost;
+ RandomAccessFile fuseInputStream;
+ RandomAccessFile fsInputStream;
+ GlusterFSBrickClass thisBrick;
+ int nodeLocation;
+ TreeMap<Integer, GlusterFSBrickClass> hnts;
+
+ public GlusterFUSEInputStream (File f, TreeMap<Integer, GlusterFSBrickClass> hnts,
+ String hostname) throws IOException {
+ this.f = f;
+ this.pos = 0;
+ this.closed = false;
+ this.hnts = hnts;
+ this.thisHost = hostname;
+ this.fsInputStream = null;
+ this.fuseInputStream = new RandomAccessFile(f.getPath(), "r");
+
+ this.lastActive = true; // true == FUSE, false == backed file
+
+ String directFilePath = null;
+ if (this.hnts != null) {
+ directFilePath = findLocalFile(f.getPath(), this.hnts);
+ if (directFilePath != null) {
+ this.fsInputStream = new RandomAccessFile(directFilePath, "r");
+ this.lastActive = !this.lastActive;
+ }
+ }
+ }
+
+ public String findLocalFile (String path, TreeMap<Integer, GlusterFSBrickClass> hnts) {
+ int i = 0;
+ String actFilePath = null;
+ GlusterFSBrickClass gfsBrick = null;
+
+ gfsBrick = hnts.get(0);
+
+ /* do a linear search for the matching host not worrying
+ about file stripes */
+ for (i = 0; i < hnts.size(); i++) {
+ gfsBrick = hnts.get(i);
+ actFilePath = gfsBrick.brickIsLocal(this.thisHost);
+ if (actFilePath != null) {
+ this.thisBrick = gfsBrick;
+ this.nodeLocation = i;
+ break;
+ }
+ }
+
+ return actFilePath;
+ }
+
+ public long getPos () throws IOException {
+ return pos;
+ }
+
+ public synchronized int available () throws IOException {
+ return (int) ((f.length()) - getPos());
+ }
+
+ public void seek (long pos) throws IOException {
+ fuseInputStream.seek(pos);
+ if (fsInputStream != null)
+ fsInputStream.seek(pos);
+ }
+
+ public boolean seekToNewSource (long pos) throws IOException {
+ return false;
+ }
+
+ public RandomAccessFile chooseStream (long start, int[] nlen)
+ throws IOException {
+ GlusterFSBrickClass gfsBrick = null;
+ RandomAccessFile in = fuseInputStream;
+ boolean oldActiveStream = lastActive;
+ lastActive = true;
+
+ if ((hnts != null) && (fsInputStream != null)) {
+ gfsBrick = hnts.get(0);
+ if (!gfsBrick.isChunked()) {
+ in = fsInputStream;
+ lastActive = false;
+ } else {
+ // find the current location in the tree and the amount of data it can serve
+ int[] nodeInTree = thisBrick.getBrickNumberInTree(start, nlen[0]);
+
+ // does this node hold the byte ranges we have been requested for ?
+ if ((nodeInTree[2] != 0) && thisBrick.brickHasFilePart(nodeInTree[0], nodeLocation)) {
+ in = fsInputStream;
+ nlen[0] = nodeInTree[2]; // the amount of data that can be read from the stripe
+ lastActive = false;
+ }
+ }
+ }
+
+ return in;
+ }
+
+ public synchronized int read () throws IOException {
+ int byteRead = 0;
+ RandomAccessFile in = null;
+
+ if (closed)
+ throw new IOException("Stream Closed.");
+
+ int[] nlen = { 1 };
+
+ in = chooseStream(getPos(), nlen);
+
+ byteRead = in.read();
+ if (byteRead >= 0) {
+ pos++;
+ syncStreams(1);
+ }
+
+ return byteRead;
+ }
+
+ public synchronized int read (byte buff[], int off, int len) throws
+ IOException {
+ int result = 0;
+ RandomAccessFile in = null;
+
+ if (closed)
+ throw new IOException("Stream Closed.");
+
+ int[] nlen = {len}; // hack to make len mutable
+ in = chooseStream(pos, nlen);
+
+ result = in.read(buff, off, nlen[0]);
+ if (result > 0) {
+ pos += result;
+ syncStreams(result);
+ }
+
+ return result;
+ }
+
+ /**
+ * TODO: use seek() insted of skipBytes(); skipBytes does I/O
+ */
+ public void syncStreams (int bytes) throws IOException {
+ if ((hnts != null) && (hnts.get(0).isChunked()) && (fsInputStream != null))
+ if (!this.lastActive)
+ fuseInputStream.skipBytes(bytes);
+ else
+ fsInputStream.skipBytes(bytes);
+ }
+
+ public synchronized void close () throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ super.close();
+ if (fsInputStream != null)
+ fsInputStream.close();
+ fuseInputStream.close();
+
+ closed = true;
+ }
+
+ // Not supported - mark () and reset ()
+
+ public boolean markSupported () {
+ return false;
+ }
+
+ public void mark (int readLimit) {}
+
+ public void reset () throws IOException {
+ throw new IOException("Mark/Reset not supported.");
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java
new file mode 100644
index 00000000..5192a0a5
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileSystem;
+
+public class GlusterFUSEOutputStream extends OutputStream {
+ File f;
+ long pos;
+ boolean closed;
+ OutputStream fuseOutputStream;
+
+ public GlusterFUSEOutputStream (String file, boolean append) throws
+ IOException {
+ this.f = new File(file); /* not needed ? */
+ this.pos = 0;
+ this.fuseOutputStream = new FileOutputStream(file, append);
+ this.closed = false;
+ }
+
+ public long getPos () throws IOException {
+ return pos;
+ }
+
+ public void write (int v) throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ byte[] b = new byte[1];
+ b[0] = (byte) v;
+
+ write(b, 0, 1);
+ }
+
+ public void write (byte b[]) throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ fuseOutputStream.write(b, 0, b.length);
+ pos += (long) b.length;
+ }
+
+ public void write (byte b[], int off, int len) throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ fuseOutputStream.write(b, off, len);
+ pos += (long) len;
+ }
+
+ public void flush () throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ fuseOutputStream.flush();
+ }
+
+ public void close () throws IOException {
+ if (closed)
+ throw new IOException("Stream closed.");
+
+ flush();
+ fuseOutputStream.close();
+ closed = true;
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java
new file mode 100644
index 00000000..b0501cce
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java
@@ -0,0 +1,492 @@
+/**
+ *
+ * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ * This file is part of GlusterFS.
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+/**
+ * Implements the Hadoop FileSystem Interface to allow applications to store
+ * files on GlusterFS and run Map/Reduce jobs on the data.
+ */
+
+package org.apache.hadoop.fs.glusterfs;
+
+import java.io.*;
+import java.net.*;
+
+import java.util.regex.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.util.TreeMap;
+
+/*
+ * This package provides interface for hadoop jobs (incl. Map/Reduce)
+ * to access files in GlusterFS backed file system via FUSE mount
+ */
+public class GlusterFileSystem extends FileSystem {
+
+ private FileSystem glusterFs = null;
+ private URI uri = null;
+ private Path workingDir = null;
+ private String glusterMount = null;
+ private boolean mounted = false;
+
+ /* for quick IO */
+ private boolean quickSlaveIO = false;
+
+ /* extended attribute class */
+ private GlusterFSXattr xattr = null;
+
+ /* hostname of this machine */
+ private static String hostname;
+
+ public GlusterFileSystem () {
+
+ }
+
+ public URI getUri () {
+ return uri;
+ }
+
+ public boolean FUSEMount (String volname, String server, String mount)
+ throws IOException, InterruptedException {
+ boolean ret = true;
+ int retVal = 0;
+ Process p = null;
+ String s = null;
+ String mountCmd = null;
+
+ mountCmd = "mount -t glusterfs " + server + ":" + "/" + volname + " " + mount;
+
+ try {
+ p = Runtime.getRuntime().exec(mountCmd);
+
+ retVal = p.waitFor();
+ if (retVal != 0)
+ ret = false;
+
+ } catch (IOException e) {
+ System.out.println ("Problem mounting FUSE mount on: " + mount);
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return ret;
+ }
+
+ public void initialize (URI uri, Configuration conf) throws IOException {
+ boolean ret = false;
+ String volName = null;
+ String remoteGFSServer = null;
+ String needQuickRead = null;
+
+ if (this.mounted)
+ return;
+
+ System.out.println("Initializing GlusterFS");
+
+ try {
+ volName = conf.get("fs.glusterfs.volname", "");
+ glusterMount = conf.get("fs.glusterfs.mount", "");
+ remoteGFSServer = conf.get("fs.glusterfs.server", "");
+ needQuickRead = conf.get("quick.slave.io", "");
+
+ /*
+ * bail out if we do not have enough information to do a FUSE
+ * mount
+ */
+ if ( (volName.length() == 0) || (remoteGFSServer.length() == 0) ||
+ (glusterMount.length() == 0) )
+ System.exit (-1);
+
+ ret = FUSEMount(volName, remoteGFSServer, glusterMount);
+ if (!ret) {
+ System.out.println("Failed to initialize GlusterFS");
+ System.exit(-1);
+ }
+
+ if ((needQuickRead.length() != 0)
+ && (needQuickRead.equalsIgnoreCase("yes")
+ || needQuickRead.equalsIgnoreCase("on")
+ || needQuickRead.equals("1")))
+ this.quickSlaveIO = true;
+
+ this.mounted = true;
+ this.glusterFs = FileSystem.getLocal(conf);
+ this.workingDir = new Path(glusterMount);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+
+ this.xattr = new GlusterFSXattr();
+
+ InetAddress addr = InetAddress.getLocalHost();
+ this.hostname = addr.getHostName();
+
+ setConf(conf);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Unable to initialize GlusterFS");
+ System.exit(-1);
+ }
+ }
+
+ @Deprecated
+ public String getName () {
+ return getUri().toString();
+ }
+
+ public Path getWorkingDirectory () {
+ return this.workingDir;
+ }
+
+ public Path getHomeDirectory () {
+ return this.workingDir;
+ }
+
+ public Path makeAbsolute (Path path) {
+ String pth = path.toUri().getPath();
+ if (pth.startsWith(workingDir.toUri().getPath())) {
+ return path;
+ }
+
+ return new Path(workingDir + "/" + pth);
+ }
+
+ public void setWorkingDirectory (Path dir) {
+ this.workingDir = makeAbsolute(dir);
+ }
+
+ public boolean exists (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ return f.exists();
+ }
+
+ public boolean mkdirs (Path path, FsPermission permission
+ ) throws IOException {
+ boolean created = false;
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ if (f.exists()) {
+ System.out.println("Directory " + f.getPath() + " already exist");
+ return true;
+ }
+
+ return f.mkdirs();
+ }
+
+ @Deprecated
+ public boolean isDirectory (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ return f.isDirectory();
+ }
+
+ public boolean isFile (Path path) throws IOException {
+ return !isDirectory(path);
+ }
+
+ public Path[] listPaths (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File (absolute.toUri().getPath());
+ String relPath = path.toUri().getPath();
+ String[] fileList = null;
+ Path[] filePath = null;
+ int fileCnt = 0;
+
+ fileList = f.list();
+
+ filePath = new Path[fileList.length];
+
+ for (; fileCnt < fileList.length; fileCnt++) {
+ filePath[fileCnt] = new Path(relPath + "/" + fileList[fileCnt]);
+ }
+
+ return filePath;
+ }
+
+ public FileStatus[] listStatus (Path path) throws IOException {
+ int fileCnt = 0;
+ Path absolute = makeAbsolute(path);
+ String relpath = path.toUri().getPath();
+ String[] strFileList = null;
+ FileStatus[] fileStatus = null;
+ File f = new File(absolute.toUri().getPath());
+
+ if (!f.exists()) {
+ return null;
+ }
+
+ if (f.isFile())
+ return new FileStatus[] {
+ getFileStatus(path)
+ };
+
+ if (relpath.charAt(relpath.length() - 1) != '/')
+ relpath += "/";
+
+ strFileList = f.list();
+
+ fileStatus = new FileStatus[strFileList.length];
+
+ for (; fileCnt < strFileList.length; fileCnt++) {
+ fileStatus[fileCnt] = getFileStatusFromFileString(relpath + strFileList[fileCnt]);
+ }
+
+ return fileStatus;
+ }
+
+ public FileStatus getFileStatusFromFileString (String path)
+ throws IOException {
+ Path nPath = new Path(path);
+ return getFileStatus(nPath);
+ }
+
+ public FileStatus getFileStatus (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ if (!f.exists ())
+ throw new FileNotFoundException("File " + f.getPath() + " does not exist.");
+
+ if (f.isDirectory ())
+ return new FileStatus(0, true, 1, 0, f.lastModified(), path.makeQualified(this));
+ else
+ return new FileStatus(f.length(), false, 0, getDefaultBlockSize(),
+ f.lastModified(), path.makeQualified(this));
+
+ }
+
+ /*
+ * creates a new file in glusterfs namespace. internally the file
+ * descriptor is an instance of OutputStream class.
+ */
+ public FSDataOutputStream create (Path path, FsPermission permission,
+ boolean overwrite, int bufferSize,
+ short replication, long blockSize,
+ Progressable progress)
+ throws IOException {
+ Path absolute = makeAbsolute(path);
+ Path parent = null;
+ File f = null;
+ File fParent = null;
+ FSDataOutputStream glusterFileStream = null;
+
+ f = new File(absolute.toUri().getPath());
+
+ if (f.exists ()) {
+ if (overwrite)
+ f.delete ();
+ else
+ throw new IOException(f.getPath() + " already exist");
+ }
+
+ parent = path.getParent();
+ fParent = new File ((makeAbsolute(parent)).toUri().getPath());
+ if ((parent != null) && (fParent != null) && (!fParent.exists()))
+ if (!fParent.mkdirs())
+ throw new IOException("cannot create parent directory: " + fParent.getPath());
+
+ glusterFileStream = new FSDataOutputStream(new GlusterFUSEOutputStream
+ (f.getPath(), false));
+
+ return glusterFileStream;
+ }
+
+ /*
+ * open the file in read mode (internally the file descriptor is an
+ * instance of InputStream class).
+ *
+ * if quick read mode is set then read the file by by-passing FUSE
+ * if we are on same slave where the file exist
+ */
+ public FSDataInputStream open (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+ FSDataInputStream glusterFileStream = null;
+ TreeMap<Integer, GlusterFSBrickClass> hnts = null;
+
+ if (!f.exists())
+ throw new IOException("File " + f.getPath() + " does not exist.");
+
+ if (quickSlaveIO)
+ hnts = xattr.quickIOPossible(f.getPath(), 0, f.length());
+
+ glusterFileStream = new FSDataInputStream(new GlusterFUSEInputStream(f, hnts, hostname));
+ return glusterFileStream;
+ }
+
+ public FSDataInputStream open (Path path, int bufferSize) throws IOException {
+ return open(path);
+ }
+
+ public FSDataOutputStream append (Path f, int bufferSize, Progressable progress)
+ throws IOException {
+ throw new IOException ("append not supported (as yet).");
+ }
+
+ public boolean rename (Path src, Path dst) throws IOException {
+ Path absoluteSrc = makeAbsolute(src);
+ Path absoluteDst = makeAbsolute(dst);
+
+ File fSrc = new File(absoluteSrc.toUri().getPath());
+ File fDst = new File(absoluteDst.toUri().getPath());
+
+ if (fDst.isDirectory()) {
+ fDst = null;
+ String newPath = absoluteDst.toUri().getPath() + "/" + fSrc.getName();
+ fDst = new File(newPath);
+ }
+ return fSrc.renameTo(fDst);
+ }
+
+ @Deprecated
+ public boolean delete (Path path) throws IOException {
+ return delete(path, true);
+ }
+
+ public boolean delete (Path path, boolean recursive) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ if (f.isFile())
+ return f.delete();
+
+ FileStatus[] dirEntries = listStatus(absolute);
+ if ((!recursive) && (dirEntries != null) && (dirEntries.length != 0))
+ throw new IOException ("Directory " + path.toString() + " is not empty");
+
+ if (dirEntries != null)
+ for (int i = 0; i < dirEntries.length; i++)
+ delete(new Path(absolute, dirEntries[i].getPath()), recursive);
+
+ return f.delete();
+ }
+
+ @Deprecated
+ public long getLength (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ if (!f.exists())
+ throw new IOException(f.getPath() + " does not exist.");
+
+ return f.length();
+ }
+
+ @Deprecated
+ public short getReplication (Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ if (!f.exists())
+ throw new IOException(f.getPath() + " does not exist.");
+
+ return xattr.getReplication(f.getPath());
+ }
+
+ public short getDefaultReplication (Path path) throws IOException {
+ return getReplication(path);
+ }
+
+ public boolean setReplication (Path path, short replication)
+ throws IOException {
+ return true;
+ }
+
+ public long getBlockSize (Path path) throws IOException {
+ long blkSz;
+ Path absolute = makeAbsolute(path);
+ File f = new File(absolute.toUri().getPath());
+
+ blkSz = xattr.getBlockSize(f.getPath());
+ if (blkSz == 0)
+ blkSz = getLength(path);
+
+ return blkSz;
+ }
+
+ public long getDefaultBlockSize () {
+ return 1 << 26; /* default's from hdfs, kfs */
+ }
+
+ @Deprecated
+ public void lock (Path path, boolean shared) throws IOException {
+ }
+
+ @Deprecated
+ public void release (Path path) throws IOException {
+ }
+
+ public BlockLocation[] getFileBlockLocations (FileStatus file, long start, long len)
+ throws IOException {
+
+ Path absolute = makeAbsolute(file.getPath());
+ File f = new File(absolute.toUri().getPath());
+ BlockLocation[] result = null;
+
+ if (file == null)
+ return null;
+
+ result = xattr.getPathInfo(f.getPath(), start, len);
+ if (result == null) {
+ System.out.println("Problem getting destination host for file "
+ + f.getPath());
+ return null;
+ }
+
+ return result;
+ }
+
+ // getFileBlockLocations (FileStatus, long, long) is called by hadoop
+ public BlockLocation[] getFileBlockLocations (Path p, long start, long len)
+ throws IOException {
+ return null;
+ }
+
+ public void copyFromLocalFile (boolean delSrc, Path src, Path dst)
+ throws IOException {
+ FileUtil.copy(glusterFs, src, this, dst, delSrc, getConf());
+ }
+
+ public void copyToLocalFile (boolean delSrc, Path src, Path dst)
+ throws IOException {
+ FileUtil.copy(this, src, glusterFs, dst, delSrc, getConf());
+ }
+
+ public Path startLocalOutput (Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return tmpLocalFile;
+ }
+
+ public void completeLocalOutput (Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java
new file mode 100644
index 00000000..21e188c5
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java
@@ -0,0 +1,38 @@
+package org.apache.hadoop.fs.glusterfs;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+ extends TestCase
+{
+ /**
+ * Create the test case
+ *
+ * @param testName name of the test case
+ */
+ public AppTest( String testName )
+ {
+ super( testName );
+ }
+
+ /**
+ * @return the suite of tests being tested
+ */
+ public static Test suite()
+ {
+ return new TestSuite( AppTest.class );
+ }
+
+ /**
+ * Rigourous Test :-)
+ */
+ public void testApp()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py b/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py
new file mode 100644
index 00000000..450e08fb
--- /dev/null
+++ b/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py
@@ -0,0 +1,212 @@
+#!/usr/bin/python
+
+##
+ #
+ # Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com>
+ # This file is part of GlusterFS.
+ #
+ # Licensed under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ # implied. See the License for the specific language governing
+ # permissions and limitations under the License.
+ #
+ ##
+
+import getopt
+import glob
+import sys, os
+import shutil
+import subprocess, shlex
+
+def usage():
+ print "usage: python build-deploy-jar.py [-b/--build] -d/--dir <hadoop-home> [-c/--core] [-m/--mapred] [-h/--henv]"
+
+def addSlash(s):
+ if not (s[-1] == '/'):
+ s = s + '/'
+
+ return s
+
+def whereis(program):
+ abspath = None
+ for path in (os.environ.get('PATH', '')).split(':'):
+ abspath = os.path.join(path, program)
+ if os.path.exists(abspath) and not os.path.isdir(abspath):
+ return abspath
+
+ return None
+
+def getLatestJar(targetdir):
+ latestJar = None
+ glusterfsJar = glob.glob(targetdir + "*.jar")
+ if len(glusterfsJar) == 0:
+ print "No GlusterFS jar file found in %s ... exiting" % (targetdir)
+ return None
+
+ # pick up the latest jar file - just in case ...
+ stat = latestJar = None
+ ctime = 0
+
+ for jar in glusterfsJar:
+ stat = os.stat(jar)
+ if stat.st_ctime > ctime:
+ latestJar = jar
+ ctime = stat.st_ctime
+
+ return latestJar
+
+# build the glusterfs hadoop plugin using maven
+def build_jar():
+ location = whereis('mvn')
+
+ if location == None:
+ print "Cannot find maven to build glusterfs hadoop jar"
+ print "please install maven or if it's already installed then fix your PATH environ"
+ return None
+
+ # do a clean packaging
+ targetdir = "./target/"
+ if os.path.exists(targetdir) and os.path.isdir(targetdir):
+ print "Cleaning up directories ... [ " + targetdir + " ]"
+ shutil.rmtree(targetdir)
+
+ print "Building glusterfs jar ..."
+ process = subprocess.Popen(['package'], shell=True,
+ executable=location, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ try:
+ (pout, perr) = process.communicate()
+ except:
+ process.wait()
+ if not process.returncode == 0:
+ print "Building glusterfs jar failed"
+ return None
+
+ latestJar = getLatestJar(targetdir)
+ return latestJar
+
+def rcopy(f, host, libdir):
+ print " * doing remote copy to host %s" % (host)
+ scpCmd = "scp %s %s:%s" % (f, host, libdir)
+
+ os.system(scpCmd);
+
+def deployInSlave(f, confdir, libdir, cc, cm, he):
+ slavefile = confdir + "slaves"
+
+ ccFile = confdir + "core-site.xml"
+ cmFile = confdir + "mapred-site.xml"
+ heFile = confdir + "hadoop-env.sh"
+
+ sf = open(slavefile, 'r')
+ for host in sf:
+ host = host.rstrip('\n')
+ print " >>> Deploying %s on %s ..." % (os.path.basename(f), host)
+ rcopy(f, host, libdir)
+
+ if cc:
+ print " >>> Deploying [%s] on %s ..." % (os.path.basename(ccFile), host)
+ rcopy(ccFile, host, confdir)
+
+ if cm:
+ print " >>> Deploying [%s] on %s ..." % (os.path.basename(cmFile), host)
+ rcopy(cmFile, host, confdir)
+
+ if he:
+ print " >>> Deploying [%s] on %s ..." % (os.path.basename(heFile), host)
+ rcopy(heFile, host, confdir);
+
+ print "<<< Done\n"
+
+ sf.close()
+
+def deployInMaster(f, confdir, libdir):
+ import socket
+ masterfile = confdir + "masters"
+
+ mf = open(masterfile, 'r')
+ for host in mf:
+ host = host.rstrip('\n')
+ print " >>> Deploying %s on %s ..." % (os.path.basename(f), host)
+ h = host
+ try:
+ socket.inet_aton(host)
+ h = socket.getfqdn(host)
+ except socket.error:
+ # host is not a ip adddress
+ pass
+
+ if h == socket.gethostname() or h == 'localhost':
+ # local cp
+ print " * doing local copy"
+ shutil.copy(f, libdir)
+ else:
+ # scp the file
+ rcopy(f, h, libdir)
+
+ print "<<< Done\n"
+
+ mf.close()
+
+if __name__ == '__main__':
+ opt = args = []
+ try:
+ opt, args = getopt.getopt(sys.argv[1:], "bd:cmh", ["build", "dir=", "core", "mapred", "henv"]);
+ except getopt.GetoptError, err:
+ print str(err)
+ usage()
+ sys.exit(1)
+
+ needbuild = hadoop_dir = copyCore = copyMapred = copyHadoopEnv = None
+
+ for k, v in opt:
+ if k in ("-b", "--build"):
+ needbuild = True
+ elif k in ("-d", "--dir"):
+ hadoop_dir = v
+ elif k in ("-c", "--core"):
+ copyCore = True
+ elif k in ("-m", "--mapred"):
+ copyMapred = True
+ elif k in ("-h", "--henv"):
+ copyHadoopEnv = True
+ else:
+ assert False, "unhandled option"
+
+ assert not hadoop_dir == None, "hadoop directory missing"
+
+ if needbuild:
+ jar = build_jar()
+ if jar == None:
+ sys.exit(1)
+ else:
+ jar = getLatestJar('./target/')
+ if jar == None:
+ print "Maybe you want to build it ? -b option"
+ sys.exit(1)
+
+ print ""
+ print "*** Deploying %s *** " % (jar)
+
+ # copy jar to local hadoop distribution (master)
+ hadoop_home = addSlash(hadoop_dir)
+ if not (os.path.exists(hadoop_home) and os.path.isdir(hadoop_home)):
+ print "path " + hadoop_home + " does not exist or is not adiretory";
+ sys.exit(1);
+
+ hadoop_conf = hadoop_home + "conf/"
+ hadoop_lib = hadoop_home + "lib/"
+
+ print " >>> Scanning hadoop master file for host(s) to deploy"
+ deployInMaster(jar, hadoop_conf, hadoop_lib)
+
+ print ""
+ print " >>> Scanning hadoop slave file for host(s) to deploy"
+ deployInSlave(jar, hadoop_conf, hadoop_lib, copyCore, copyMapred, copyHadoopEnv)