diff options
| author | M S Vishwanath Bhat <vishwanath@gluster.com> | 2012-02-24 13:18:56 +0530 | 
|---|---|---|
| committer | Vijay Bellur <vijay@gluster.com> | 2012-03-07 23:18:29 -0800 | 
| commit | 5fdd65f5f4f5df1d28b0fb4f7efed226d5db1b3c (patch) | |
| tree | 377a94774c5cd9f55b16ba6fcd1c7b5ec51bfa3b /glusterfs-hadoop/0.20.2 | |
| parent | e1ab347720f25ed2e7db633a7202f7b873f4b90a (diff) | |
renaming hdfs -> glusterfs-hadoop
Change-Id: Ibb937af1231f6bbed9a2d4eaeabc6e9d4000887f
BUG: 797064
Signed-off-by: M S Vishwanath Bhat <vishwanath@gluster.com>
Reviewed-on: http://review.gluster.com/2811
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vijay@gluster.com>
Diffstat (limited to 'glusterfs-hadoop/0.20.2')
10 files changed, 1739 insertions, 0 deletions
diff --git a/glusterfs-hadoop/0.20.2/conf/core-site.xml b/glusterfs-hadoop/0.20.2/conf/core-site.xml new file mode 100644 index 00000000000..d7f75fca733 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/conf/core-site.xml @@ -0,0 +1,38 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- Put site-specific property overrides in this file. --> + +<configuration> + +  <property> +    <name>fs.glusterfs.impl</name> +    <value>org.apache.hadoop.fs.glusterfs.GlusterFileSystem</value> +  </property> + +  <property> +    <name>fs.default.name</name> +    <value>glusterfs://192.168.1.36:9000</value> +  </property> + +  <property> +    <name>fs.glusterfs.volname</name> +    <value>volume-dist-rep</value> +  </property> + +  <property> +    <name>fs.glusterfs.mount</name> +    <value>/mnt/glusterfs</value> +  </property> + +  <property> +    <name>fs.glusterfs.server</name> +    <value>192.168.1.36</value> +  </property> + +  <property> +    <name>quick.slave.io</name> +    <value>Off</value> +  </property> + +</configuration> diff --git a/glusterfs-hadoop/0.20.2/pom.xml b/glusterfs-hadoop/0.20.2/pom.xml new file mode 100644 index 00000000000..fe661d40fdc --- /dev/null +++ b/glusterfs-hadoop/0.20.2/pom.xml @@ -0,0 +1,36 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +  <modelVersion>4.0.0</modelVersion> +  <groupId>org.apache.hadoop.fs.glusterfs</groupId> +  <artifactId>glusterfs</artifactId> +  <packaging>jar</packaging> +  <version>0.20.2-0.1</version> +  <name>glusterfs</name> +  <url>http://maven.apache.org</url> +  <dependencies> +    <dependency> +      <groupId>junit</groupId> +      <artifactId>junit</artifactId> +      <version>3.8.1</version> +      <scope>test</scope> +    </dependency> +    <dependency> +      <groupId>org.apache.hadoop</groupId> +      <artifactId>hadoop-core</artifactId> +      <version>0.20.2</version> +    </dependency> +  </dependencies> +  <build> +    <plugins> +      <plugin> +        <groupId>org.apache.maven.plugins</groupId> +        <artifactId>maven-compiler-plugin</artifactId> +        <version>2.3.2</version> +        <configuration> +          <source>1.5</source> +          <target>1.5</target> +        </configuration> +      </plugin> +    </plugins> +  </build> +</project> diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java new file mode 100644 index 00000000000..e633b8aae80 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickClass.java @@ -0,0 +1,109 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +public class GlusterFSBrickClass { +        String  host; +        String  exportedFile; +        long    start; +        long    end; +        boolean isChunked; +        int     stripeSize;     // Stripe size in bytes +        int     nrStripes;      // number of stripes +        int     switchCount;    // for SR, DSR - number of replicas of each stripe +                                // -1 for others + +        public GlusterFSBrickClass (String brick, long start, long len, boolean flag, +                                    int stripeSize, int nrStripes, int switchCount) +                throws IOException { +                this.host = brick2host(brick); +                this.exportedFile = brick2file(brick); +                this.start = start; +                this.end = start + len; +                this.isChunked = flag; +                this.stripeSize = stripeSize; +                this.nrStripes = nrStripes; +                this.switchCount = switchCount; +        } + +        public boolean isChunked () { +                return isChunked; +        } + +        public String brickIsLocal(String hostname) { +                String path = null; +                File f = null; +                if (host.equals(hostname)) +                        path = exportedFile; + +                return path; +        } + +        public int[] getBrickNumberInTree(long start, int len) { +                long end = len; +                int startNodeInTree = ((int) (start / stripeSize)) % nrStripes; +                int endNodeInTree = ((int) ((start + len) / stripeSize)) % nrStripes; + +                if (startNodeInTree != endNodeInTree) { +                        end = (start - (start % stripeSize)) + stripeSize; +                        end -= start; +                } + +                return new int[] {startNodeInTree, endNodeInTree, (int) end}; +        } + +        public boolean brickHasFilePart(int nodeInTree, int nodeLoc) { +                if (switchCount == -1) +                        return (nodeInTree == nodeLoc); + +                nodeInTree *= switchCount; +                for (int i = nodeInTree; i < (nodeInTree + switchCount); i++) { +                        if (i == nodeLoc) +                                return true; +                } + +                return false; +        } + +        public String brick2host (String brick) +                throws IOException { +                String[] hf = null; + +                hf = brick.split(":"); +                if (hf.length != 2) +                        throw new IOException("Error getting hostname from brick"); + +                return hf[0]; +        } + +        public String brick2file (String brick) +                throws IOException { +                String[] hf = null; + +                hf = brick.split(":"); +                if (hf.length != 2) +                        throw new IOException("Error getting hostname from brick"); + +                return hf[1]; +        } + +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java new file mode 100644 index 00000000000..11454e6368a --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSBrickRepl.java @@ -0,0 +1,52 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +public class GlusterFSBrickRepl { +        private String[] replHost; +        private long start; +        private long len; +        private int cnt; + +        GlusterFSBrickRepl(int replCount, long start, long len) { +                this.replHost = new String[replCount]; +                this.start = start; +                this.len = len; +                this.cnt = 0; +        } + +        public void addHost (String host) { +                this.replHost[cnt++] = host; +        } + +        public String[] getReplHosts () { +                return this.replHost; +        } + +        public long getStartLen () { +                return this.start; +        } + +        public long getOffLen () { +                return this.len; +        } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java new file mode 100644 index 00000000000..18e9003b43e --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFSXattr.java @@ -0,0 +1,471 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.net.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.io.*; +import java.util.HashMap; +import java.util.TreeMap; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.fs.BlockLocation; + +public class GlusterFSXattr { + +	public enum LAYOUT { D, S, R, DS, DR, SR, DSR } +        public enum CMD { GET_HINTS, GET_REPLICATION, GET_BLOCK_SIZE, CHECK_FOR_QUICK_IO } + +        private static String hostname; + +        public GlusterFSXattr() { } + +        public static String brick2host (String brick) +        throws IOException { +                String[] hf = null; + +                hf = brick.split(":"); +                if (hf.length != 2) { +                        System.out.println("brick not of format hostname:path"); +                        throw new IOException("Error getting hostname from brick"); +                } + +                return hf[0]; +        } + +        public static String brick2file (String brick) +        throws IOException { +                String[] hf = null; + +                hf = brick.split(":"); +                if (hf.length != 2) { +                        System.out.println("brick not of format hostname:path"); +                        throw new IOException("Error getting hostname from brick"); +                } + +                return hf[1]; +        } + +        public static BlockLocation[] getPathInfo (String filename, long start, long len) +                throws IOException { +                HashMap<String, ArrayList<String>> vol = null; +                HashMap<String, Integer> meta          = new HashMap<String, Integer>(); + +                vol = execGetFattr(filename, meta, CMD.GET_HINTS); + +                return getHints(vol, meta, start, len, null); +        } + +        public static long getBlockSize (String filename) +                throws IOException { +                HashMap<String, ArrayList<String>> vol = null; +                HashMap<String, Integer> meta          = new HashMap<String, Integer>(); + +                vol = execGetFattr(filename, meta, CMD.GET_BLOCK_SIZE); + +                if (!meta.containsKey("block-size")) +                        return 0; + +                return (long) meta.get("block-size"); + +        } + +        public static short getReplication (String filename) +                throws IOException { +                HashMap<String, ArrayList<String>> vol = null; +                HashMap<String, Integer> meta          = new HashMap<String, Integer>(); + +                vol = execGetFattr(filename, meta, CMD.GET_REPLICATION); + +                return (short) getReplicationFromLayout(vol, meta); + +        } + +        public static TreeMap<Integer, GlusterFSBrickClass> quickIOPossible (String filename, long start, +                                                                             long len) +                throws IOException { +                String                   realpath      = null; +                HashMap<String, ArrayList<String>> vol = null; +                HashMap<String, Integer> meta          = new HashMap<String, Integer>(); +                TreeMap<Integer, GlusterFSBrickClass> hnts = new TreeMap<Integer, GlusterFSBrickClass>(); + +                vol = execGetFattr(filename, meta, CMD.GET_HINTS); +                getHints(vol, meta, start, len, hnts); + +                if (hnts.size() == 0) +                        return null; // BOOM !! + +                // DEBUG - dump hnts here +                return hnts; +        } + +        public static HashMap<String, ArrayList<String>> execGetFattr (String filename, +                                                                       HashMap<String, Integer> meta, +                                                                       CMD cmd) +                throws IOException { +                Process        p              = null; +                BufferedReader brInput        = null; +                String         s              = null; +                String         cmdOut         = null; +                String         getfattrCmd    = null; +                String         xlator         = null; +                String         enclosingXl    = null; +                String         enclosingXlVol = null; +                String         key            = null; +                String         layout         = ""; +                int            rcount         = 0; +                int            scount         = 0; +                int            dcount         = 0; +                int            count          = 0; + +                HashMap<String, ArrayList<String>> vol = new HashMap<String, ArrayList<String>>(); + +                getfattrCmd = "getfattr -m . -n trusted.glusterfs.pathinfo " + filename; + +                p = Runtime.getRuntime().exec(getfattrCmd); +                brInput = new BufferedReader(new InputStreamReader(p.getInputStream())); + +                cmdOut = ""; +                while ( (s = brInput.readLine()) != null ) +                        cmdOut += s; + +                /** +                 * TODO: Use a single regex for extracting posix paths as well +                 * as xlator counts for layout matching. +                 */ + +                Pattern pattern = Pattern.compile("<(.*?)[:\\(](.*?)>"); +                Matcher matcher = pattern.matcher(cmdOut); + +                Pattern p_px = Pattern.compile(".*?:(.*)"); +                Matcher m_px; +                String gibberish_path; + +                s = null; +                while (matcher.find()) { +                        xlator = matcher.group(1); +                        if (xlator.equalsIgnoreCase("posix")) { +                                if (enclosingXl.equalsIgnoreCase("replicate")) +                                        count = rcount; +                                else if (enclosingXl.equalsIgnoreCase("stripe")) +                                        count = scount; +                                else if (enclosingXl.equalsIgnoreCase("distribute")) +                                        count = dcount; +                                else +                                        throw new IOException("Unknown Translator: " + enclosingXl); + +                                key = enclosingXl + "-" + count; + +                                if (vol.get(key) == null) +                                        vol.put(key, new ArrayList<String>()); + +                                gibberish_path = matcher.group(2); + +                                /* extract posix path from the gibberish string */ +                                m_px = p_px.matcher(gibberish_path); +                                if (!m_px.find()) +                                        throw new IOException("Cannot extract posix path"); + +                                vol.get(key).add(m_px.group(1)); +                                continue; +                        } + +                        enclosingXl = xlator; +                        enclosingXlVol = matcher.group(2); + +                        if (xlator.equalsIgnoreCase("replicate")) +                                if (rcount++ != 0) +                                        continue; + +                        if (xlator.equalsIgnoreCase("stripe")) { +                                if (scount++ != 0) +                                        continue; + + +                                Pattern ps = Pattern.compile("\\[(\\d+)\\]"); +                                Matcher ms = ps.matcher(enclosingXlVol); + +                                if (ms.find()) { +                                        if (((cmd == CMD.GET_BLOCK_SIZE) || (cmd == CMD.GET_HINTS)) +                                            && (meta != null)) +                                            meta.put("block-size", Integer.parseInt(ms.group(1))); +                                } else +                                        throw new IOException("Cannot get stripe size"); +                        } + +                        if (xlator.equalsIgnoreCase("distribute")) +                                if (dcount++ != 0) +                                        continue; + +                        layout += xlator.substring(0, 1); +                } + +                if ((dcount == 0) && (scount == 0) && (rcount == 0)) +                        throw new IOException("Cannot get layout"); + +                if (meta != null) { +                        meta.put("dcount", dcount); +                        meta.put("scount", scount); +                        meta.put("rcount", rcount); +                } + +                vol.put("layout", new ArrayList<String>(1)); +                vol.get("layout").add(layout); + +                return vol; +        } + +        static BlockLocation[] getHints (HashMap<String, ArrayList<String>> vol, +                                         HashMap<String, Integer> meta, +                                         long start, long len, +                                         TreeMap<Integer, GlusterFSBrickClass> hnts) +                throws IOException { +                String            brick         = null; +                String            key           = null; +                boolean           done          = false; +                int               i             = 0; +                int               counter       = 0; +                int               stripeSize    = 0; +                long              stripeStart   = 0; +                long              stripeEnd     = 0; +                int               nrAllocs      = 0; +                int               allocCtr      = 0; +                BlockLocation[] result          = null; +                ArrayList<String> brickList     = null; +                ArrayList<String> stripedBricks = null; +                Iterator<String>  it            = null; + +                String[] blks = null; +                GlusterFSBrickRepl[] repl = null; +                int dcount, scount, rcount; + +                LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0)); +                dcount = meta.get("dcount"); +                scount = meta.get("scount"); +                rcount = meta.get("rcount"); + +                switch (l) { +                case D: +                        key = "DISTRIBUTE-" + dcount; +                        brick = vol.get(key).get(0); + +                        if (hnts == null) { +                                result = new BlockLocation[1]; +                                result[0] = new BlockLocation(null, new String[] {brick2host(brick)}, start, len); +                        } else +                                hnts.put(0, new GlusterFSBrickClass(brick, start, len, false, -1, -1, -1)); +                        break; + +                case R: +                case DR: +                        /* just the name says it's striped - the volume isn't */ +                        stripedBricks = new ArrayList<String>(); + +                        for (i = 1; i <= rcount; i++) { +                                key = "REPLICATE-" + i; +                                brickList = vol.get(key); +                                it = brickList.iterator(); +                                while (it.hasNext()) { +                                        stripedBricks.add(it.next()); +                                } +                        } + +                        nrAllocs = stripedBricks.size(); +                        if (hnts == null) { +                                result = new BlockLocation[1]; +                                blks = new String[nrAllocs]; +                        } + +                        for (i = 0; i < nrAllocs; i++) { +                                if (hnts == null) +                                        blks[i] = brick2host(stripedBricks.get(i)); +                                else +                                        hnts.put(i, new GlusterFSBrickClass(stripedBricks.get(i), start, len, false, -1, -1, -1)); +                        } + +                        if (hnts == null) +                                result[0] = new BlockLocation(null, blks, start, len); + +                        break; + +                case SR: +                case DSR: +                        int rsize = 0; +                        ArrayList<ArrayList<String>> replicas = new ArrayList<ArrayList<String>>(); + +                        stripedBricks = new ArrayList<String>(); + +                        if (rcount == 0) +                                throw new IOException("got replicated volume with replication count 0"); + +                        for (i = 1; i <= rcount; i++) { +                                key = "REPLICATE-" + i; +                                brickList = vol.get(key); +                                it = brickList.iterator(); +                                replicas.add(i - 1, new ArrayList<String>()); +                                while (it.hasNext()) { +                                        replicas.get(i - 1).add(it.next()); +                                } +                        } + +                        rsize = replicas.get(0).size(); +                        stripeSize = meta.get("block-size"); + +                        nrAllocs = (int) (((len - start) / stripeSize) + 1); +                        if (hnts == null) { +                                result = new BlockLocation[nrAllocs]; +                                repl = new GlusterFSBrickRepl[nrAllocs]; +                        } + +                        // starting stripe position +                        counter = (int) ((start / stripeSize) % rcount); +                        stripeStart = start; + +                        key = null; +                        int currAlloc = 0; +                        boolean hntsDone = false; +                        while ((stripeStart < len) && !done) { +                                stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1; +                                if (stripeEnd > start + len) { +                                        stripeEnd = start + len - 1; +                                        done = true; +                                } + +                                if (hnts == null) +                                        repl[allocCtr] = new GlusterFSBrickRepl(rsize, stripeStart, (stripeEnd - stripeStart)); + +                                for (i = 0; i < rsize; i++) { +                                        brick = replicas.get(counter).get(i); +                                        currAlloc = (allocCtr * rsize) + i; + +                                        if (hnts == null) +                                                repl[allocCtr].addHost(brick2host(brick)); +                                        else +                                                if (currAlloc <= (rsize * rcount) - 1) { +                                                        hnts.put(currAlloc, new GlusterFSBrickClass(brick, stripeStart, +                                                                                                    (stripeEnd - stripeStart), +                                                                                                    true, stripeSize, rcount, rsize)); +                                                } else +                                                        hntsDone = true; +                                } + +                                if (hntsDone) +                                        break; + +                                stripeStart = stripeEnd + 1; + +                                allocCtr++; +                                counter++; + +                                if (counter >= replicas.size()) +                                        counter = 0; +                        } + +                        if (hnts == null) +                                for (int k = 0; k < nrAllocs; k++) +                                        result[k] = new BlockLocation(null, repl[k].getReplHosts(), repl[k].getStartLen(), repl[k].getOffLen()); + +                        break; + +                case S: +                case DS: +                        if (scount == 0) +                                throw new IOException("got striped volume with stripe count 0"); + +                        stripedBricks = new ArrayList<String>(); +                        stripeSize = meta.get("block-size"); + +                        key = "STRIPE-" + scount; +                        brickList = vol.get(key); +                        it = brickList.iterator(); +                        while (it.hasNext()) { +                                stripedBricks.add(it.next()); +                        } + +                        nrAllocs = (int) ((len - start) / stripeSize) + 1; +                        if (hnts == null) +                                result = new BlockLocation[nrAllocs]; + +                        // starting stripe position +                        counter = (int) ((start / stripeSize) % stripedBricks.size()); +                        stripeStart = start; + +                        key = null; +                        while ((stripeStart < len) && !done) { +                                brick = stripedBricks.get(counter); + +                                stripeEnd = (stripeStart - (stripeStart % stripeSize)) + stripeSize - 1; +                                if (stripeEnd > start + len) { +                                        stripeEnd = start + len - 1; +                                        done = true; +                                } + +                                if (hnts == null) +                                        result[allocCtr] = new BlockLocation(null, new String[] {brick2host(brick)}, +                                                                             stripeStart, (stripeEnd - stripeStart)); +                                else +                                        if (allocCtr <= stripedBricks.size()) { +                                                hnts.put(allocCtr, new GlusterFSBrickClass(brick, stripeStart, (stripeEnd - stripeStart), +                                                                                           true, stripeSize, stripedBricks.size(), -1)); +                                        } else +                                                break; + +                                stripeStart = stripeEnd + 1; + +                                counter++; +                                allocCtr++; + +                                if (counter >= stripedBricks.size()) +                                        counter = 0; +                        } + +                        break; +                } + +                return result; +        } + +        /* TODO: use meta{dcount,scount,rcount} for checking */ +        public static int getReplicationFromLayout (HashMap<String, ArrayList<String>> vol, +                                                    HashMap<String, Integer> meta) +                throws IOException { +                int replication = 0; +                LAYOUT l = LAYOUT.valueOf(vol.get("layout").get(0)); + +                switch (l) { +                case D: +                case S: +                case DS: +                        replication = 1; +                        break; + +                case R: +                case DR: +                case SR: +                case DSR: +                        final String key = "REPLICATION-1"; +                        replication = vol.get(key).size(); +                } + +                return replication; +        } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java new file mode 100644 index 00000000000..e92237aeea2 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEInputStream.java @@ -0,0 +1,205 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; +import java.util.TreeMap; + +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem; + + +public class GlusterFUSEInputStream extends FSInputStream { +        File                                  f; +        boolean                               lastActive; +        long                                  pos; +        boolean                               closed; +        String                                thisHost; +        RandomAccessFile                      fuseInputStream; +        RandomAccessFile                      fsInputStream; +        GlusterFSBrickClass                   thisBrick; +        int                                   nodeLocation; +        TreeMap<Integer, GlusterFSBrickClass> hnts; + +        public GlusterFUSEInputStream (File f, TreeMap<Integer, GlusterFSBrickClass> hnts, +                                       String hostname) throws IOException { +                this.f = f; +                this.pos = 0; +                this.closed = false; +                this.hnts = hnts; +                this.thisHost = hostname; +                this.fsInputStream = null; +                this.fuseInputStream = new RandomAccessFile(f.getPath(), "r"); + +                this.lastActive = true; // true == FUSE, false == backed file + +                String directFilePath = null; +                if (this.hnts != null) { +                        directFilePath = findLocalFile(f.getPath(), this.hnts); +                        if (directFilePath != null) { +                                this.fsInputStream = new RandomAccessFile(directFilePath, "r"); +                                this.lastActive = !this.lastActive; +                        } +                } +        } + +        public String findLocalFile (String path, TreeMap<Integer, GlusterFSBrickClass> hnts) { +                int i = 0; +                String actFilePath = null; +                GlusterFSBrickClass gfsBrick = null; + +                gfsBrick = hnts.get(0); + +                /* do a linear search for the matching host not worrying +                   about file stripes */ +                for (i = 0; i < hnts.size(); i++) { +                        gfsBrick = hnts.get(i); +                        actFilePath = gfsBrick.brickIsLocal(this.thisHost); +                        if (actFilePath != null) { +                                this.thisBrick = gfsBrick; +                                this.nodeLocation = i; +                                break; +                        } +                } + +                return actFilePath; +        } + +        public long getPos () throws IOException { +                return pos; +        } + +        public synchronized int available () throws IOException { +                return (int) ((f.length()) - getPos()); +        } + +        public void seek (long pos) throws IOException { +                fuseInputStream.seek(pos); +                if (fsInputStream != null) +                        fsInputStream.seek(pos); +        } + +        public boolean seekToNewSource (long pos) throws IOException { +                return false; +        } + +        public RandomAccessFile chooseStream (long start, int[] nlen) +                throws IOException { +                GlusterFSBrickClass gfsBrick = null; +                RandomAccessFile in = fuseInputStream; +                boolean oldActiveStream = lastActive; +                lastActive = true; + +                if ((hnts != null) && (fsInputStream != null)) { +                        gfsBrick = hnts.get(0); +                        if (!gfsBrick.isChunked()) { +                                in = fsInputStream; +                                lastActive = false; +                        } else { +                                // find the current location in the tree and the amount of data it can serve +                                int[] nodeInTree = thisBrick.getBrickNumberInTree(start, nlen[0]); + +                                // does this node hold the byte ranges we have been requested for ? +                                if ((nodeInTree[2] != 0) && thisBrick.brickHasFilePart(nodeInTree[0], nodeLocation)) { +                                        in = fsInputStream; +                                        nlen[0] = nodeInTree[2]; // the amount of data that can be read from the stripe +                                        lastActive = false; +                                } +                        } +                } + +                return in; +        } + +        public synchronized int read () throws IOException { +                int byteRead = 0; +                RandomAccessFile in = null; + +                if (closed) +                        throw new IOException("Stream Closed."); + +                int[] nlen = { 1 }; + +                in = chooseStream(getPos(), nlen); + +                byteRead = in.read(); +                if (byteRead >= 0) { +                        pos++; +                        syncStreams(1); +                } + +                return byteRead; +        } + +        public synchronized int read (byte buff[], int off, int len) throws +                IOException { +                int result = 0; +                RandomAccessFile in = null; + +                if (closed) +                        throw new IOException("Stream Closed."); + +                int[] nlen = {len}; // hack to make len mutable +                in = chooseStream(pos, nlen); + +                result = in.read(buff, off, nlen[0]); +                if (result > 0) { +                        pos += result; +                        syncStreams(result); +                } + +                return result; +        } + +        /** +         * TODO: use seek() insted of skipBytes(); skipBytes does I/O +         */ +        public void syncStreams (int bytes) throws IOException { +                if ((hnts != null) && (hnts.get(0).isChunked()) && (fsInputStream != null)) +                        if (!this.lastActive) +                                fuseInputStream.skipBytes(bytes); +                        else +                                fsInputStream.skipBytes(bytes); +        } + +        public synchronized void close () throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                super.close(); +                if (fsInputStream != null) +                        fsInputStream.close(); +                fuseInputStream.close(); + +                closed = true; +        } + +        // Not supported - mark () and reset () + +        public boolean markSupported () { +                return false; +        } + +        public void mark (int readLimit) {} + +        public void reset () throws IOException { +                throw new IOException("Mark/Reset not supported."); +        } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java new file mode 100644 index 00000000000..5192a0a56cb --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java @@ -0,0 +1,86 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; + +import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.fs.FileSystem; + +public class GlusterFUSEOutputStream extends OutputStream { +        File         f; +        long         pos; +        boolean      closed; +        OutputStream fuseOutputStream; + +        public GlusterFUSEOutputStream (String file, boolean append) throws +                IOException { +                this.f = new File(file); /* not needed ? */ +                this.pos = 0; +                this.fuseOutputStream = new FileOutputStream(file, append); +                this.closed = false; +        } + +        public long getPos () throws IOException { +                return pos; +        } + +        public void write (int v) throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                byte[] b = new byte[1]; +                b[0] = (byte) v; + +                write(b, 0, 1); +        } + +        public void write (byte b[]) throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                fuseOutputStream.write(b, 0, b.length); +                pos += (long) b.length; +        } + +        public void write (byte b[], int off, int len) throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                fuseOutputStream.write(b, off, len); +                pos += (long) len; +        } + +        public void flush () throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                fuseOutputStream.flush(); +        } + +        public void close () throws IOException { +                if (closed) +                        throw new IOException("Stream closed."); + +                flush(); +                fuseOutputStream.close(); +                closed = true; +        } +} diff --git a/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java new file mode 100644 index 00000000000..b0501cced27 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java @@ -0,0 +1,492 @@ +/** + * + * Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + * This file is part of GlusterFS. + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +/** + * Implements the Hadoop FileSystem Interface to allow applications to store + * files on GlusterFS and run Map/Reduce jobs on the data. + */ + +package org.apache.hadoop.fs.glusterfs; + +import java.io.*; +import java.net.*; + +import java.util.regex.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.util.TreeMap; + +/* + * This package provides interface for hadoop jobs (incl. Map/Reduce) + * to access files in GlusterFS backed file system via FUSE mount + */ +public class GlusterFileSystem extends FileSystem { + +        private FileSystem glusterFs    = null; +        private URI        uri          = null; +        private Path       workingDir   = null; +        private String     glusterMount = null; +        private boolean    mounted      = false; + +        /* for quick IO */ +        private boolean quickSlaveIO = false; + +        /* extended attribute class */ +        private GlusterFSXattr xattr = null; + +        /* hostname of this machine */ +        private static String hostname; + +        public GlusterFileSystem () { + +        } + +        public URI getUri () { +                return uri; +        } + +        public boolean FUSEMount (String volname, String server, String mount) +                throws IOException, InterruptedException  { +                boolean        ret      = true; +                int            retVal   = 0; +                Process        p        = null; +                String         s        = null; +                String         mountCmd = null; + +                mountCmd = "mount -t glusterfs " + server + ":" + "/" + volname + " " + mount; + +                try { +                        p = Runtime.getRuntime().exec(mountCmd); + +                        retVal = p.waitFor(); +                        if (retVal != 0) +                                ret = false; + +                } catch (IOException e) { +                        System.out.println ("Problem mounting FUSE mount on: " + mount); +                        e.printStackTrace(); +                        System.exit(-1); +                } + +                return ret; +        } + +        public void initialize (URI uri, Configuration conf) throws IOException { +                boolean ret             = false; +                String  volName         = null; +                String  remoteGFSServer = null; +                String  needQuickRead   = null; + +                if (this.mounted) +                        return; + +                System.out.println("Initializing GlusterFS"); + +                try { +                        volName = conf.get("fs.glusterfs.volname", ""); +                        glusterMount = conf.get("fs.glusterfs.mount", ""); +                        remoteGFSServer = conf.get("fs.glusterfs.server", ""); +                        needQuickRead = conf.get("quick.slave.io", ""); + +                        /* +                         * bail out if we do not have enough information to do a FUSE +                         * mount +                         */ +                        if ( (volName.length() == 0) || (remoteGFSServer.length() == 0) || +                             (glusterMount.length() == 0) ) +                                System.exit (-1); + +                        ret = FUSEMount(volName, remoteGFSServer, glusterMount); +                        if (!ret) { +                                System.out.println("Failed to initialize GlusterFS"); +                                System.exit(-1); +                        } + +                        if ((needQuickRead.length() != 0) +                            && (needQuickRead.equalsIgnoreCase("yes") +                                || needQuickRead.equalsIgnoreCase("on") +                                || needQuickRead.equals("1"))) +                                this.quickSlaveIO = true; + +                        this.mounted = true; +                        this.glusterFs = FileSystem.getLocal(conf); +                        this.workingDir = new Path(glusterMount); +                        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + +                        this.xattr = new GlusterFSXattr(); + +                        InetAddress addr = InetAddress.getLocalHost(); +                        this.hostname = addr.getHostName(); + +                        setConf(conf); + +                } catch (Exception e) { +                        e.printStackTrace(); +                        System.out.println("Unable to initialize GlusterFS"); +                        System.exit(-1); +                } +        } + +        @Deprecated +        public String getName () { +                return getUri().toString(); +        } + +        public Path getWorkingDirectory () { +                return this.workingDir; +        } + +        public Path getHomeDirectory () { +                return this.workingDir; +        } + +        public Path makeAbsolute (Path path) { +                String pth = path.toUri().getPath(); +                if (pth.startsWith(workingDir.toUri().getPath())) { +                        return path; +                } + +                return new Path(workingDir + "/" + pth); +        } + +        public void setWorkingDirectory (Path dir) { +                this.workingDir = makeAbsolute(dir); +        } + +        public boolean exists (Path path) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                return f.exists(); +        } + +        public boolean mkdirs (Path path, FsPermission permission +                               ) throws IOException { +                boolean created  = false; +                Path    absolute = makeAbsolute(path); +                File    f        = new File(absolute.toUri().getPath()); + +                if (f.exists()) { +                        System.out.println("Directory " + f.getPath() + " already exist"); +                        return true; +                } + +                return f.mkdirs(); +        } + +        @Deprecated +        public boolean isDirectory (Path path) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                return f.isDirectory(); +        } + +        public boolean isFile (Path path) throws IOException { +                return !isDirectory(path); +        } + +        public Path[] listPaths (Path path) throws IOException { +                Path   absolute   = makeAbsolute(path); +                File   f          = new File (absolute.toUri().getPath()); +                String relPath    = path.toUri().getPath(); +                String[] fileList = null; +                Path[] filePath   = null; +                int    fileCnt    = 0; + +                fileList = f.list(); + +                filePath = new Path[fileList.length]; + +                for (; fileCnt < fileList.length; fileCnt++) { +                        filePath[fileCnt] = new Path(relPath + "/" + fileList[fileCnt]); +                } + +                return filePath; +        } + +        public FileStatus[] listStatus (Path path) throws IOException { +                int    fileCnt          = 0; +                Path   absolute         = makeAbsolute(path); +                String relpath          = path.toUri().getPath(); +                String[] strFileList    = null; +                FileStatus[] fileStatus = null; +                File   f                = new File(absolute.toUri().getPath()); + +                if (!f.exists()) { +                        return null; +                } + +                if (f.isFile()) +                        return new FileStatus[] { +                                getFileStatus(path) +                        }; + +                if (relpath.charAt(relpath.length() - 1) != '/') +                        relpath += "/"; + +                strFileList = f.list(); + +                fileStatus = new FileStatus[strFileList.length]; + +                for (; fileCnt < strFileList.length; fileCnt++) { +                        fileStatus[fileCnt] = getFileStatusFromFileString(relpath + strFileList[fileCnt]); +                } + +                return fileStatus; +        } + +        public FileStatus getFileStatusFromFileString (String path) +                throws IOException { +                Path nPath = new Path(path); +                return getFileStatus(nPath); +        } + +        public FileStatus getFileStatus (Path path) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                if (!f.exists ()) +                        throw new FileNotFoundException("File " + f.getPath() + " does not exist."); + +                if (f.isDirectory ()) +                        return new FileStatus(0, true, 1, 0, f.lastModified(), path.makeQualified(this)); +                else +                        return new FileStatus(f.length(), false, 0, getDefaultBlockSize(), +                                              f.lastModified(), path.makeQualified(this)); + +        } + +        /* +         * creates a new file in glusterfs namespace. internally the file +         * descriptor is an instance of OutputStream class. +         */ +        public FSDataOutputStream create (Path path, FsPermission permission, +                                          boolean overwrite, int bufferSize, +                                          short replication, long blockSize, +                                          Progressable progress) +        throws IOException { +                Path               absolute          = makeAbsolute(path); +                Path               parent            = null; +                File               f                 = null; +                File               fParent           = null; +                FSDataOutputStream glusterFileStream = null; + +                f = new File(absolute.toUri().getPath()); + +                if (f.exists ()) { +                        if (overwrite) +                                f.delete (); +                        else +                                throw new IOException(f.getPath() + " already exist"); +                } + +                parent = path.getParent(); +                fParent = new File ((makeAbsolute(parent)).toUri().getPath()); +                if ((parent != null) && (fParent != null) && (!fParent.exists())) +                        if (!fParent.mkdirs()) +                                throw new IOException("cannot create parent directory: " + fParent.getPath()); + +                glusterFileStream = new FSDataOutputStream(new GlusterFUSEOutputStream +                                                           (f.getPath(), false)); + +                return glusterFileStream; +        } + +        /* +         * open the file in read mode (internally the file descriptor is an +         * instance of InputStream class). +         * +         * if quick read mode is set then read the file by by-passing FUSE +         * if we are on same slave where the file exist +         */ +        public FSDataInputStream open (Path path) throws IOException { +                Path              absolute          = makeAbsolute(path); +                File              f                 = new File(absolute.toUri().getPath()); +                FSDataInputStream glusterFileStream = null; +                TreeMap<Integer, GlusterFSBrickClass> hnts = null; + +                if (!f.exists()) +                        throw new IOException("File " + f.getPath() + " does not exist."); + +                if (quickSlaveIO) +                        hnts = xattr.quickIOPossible(f.getPath(), 0, f.length()); + +                glusterFileStream = new FSDataInputStream(new GlusterFUSEInputStream(f, hnts, hostname)); +                return glusterFileStream; +        } + +        public FSDataInputStream open (Path path, int bufferSize) throws IOException { +                return open(path); +        } + +        public FSDataOutputStream append (Path f, int bufferSize, Progressable progress) +                throws IOException { +                throw new IOException ("append not supported (as yet)."); +        } + +        public boolean rename (Path src, Path dst) throws IOException { +                Path absoluteSrc = makeAbsolute(src); +                Path absoluteDst = makeAbsolute(dst); + +                File fSrc = new File(absoluteSrc.toUri().getPath()); +                File fDst = new File(absoluteDst.toUri().getPath()); + +                if (fDst.isDirectory()) { +                        fDst = null; +                        String newPath = absoluteDst.toUri().getPath() + "/" + fSrc.getName(); +                        fDst = new File(newPath); +                } +                return fSrc.renameTo(fDst); +        } + +        @Deprecated +        public boolean delete (Path path) throws IOException { +                return delete(path, true); +        } + +        public boolean delete (Path path, boolean recursive) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                if (f.isFile()) +                        return f.delete(); + +                FileStatus[] dirEntries = listStatus(absolute); +                if ((!recursive) && (dirEntries != null) && (dirEntries.length != 0)) +                        throw new IOException ("Directory " + path.toString() + " is not empty"); + +                if (dirEntries != null) +                        for (int i = 0; i < dirEntries.length; i++) +                                delete(new Path(absolute, dirEntries[i].getPath()), recursive); + +                return f.delete(); +        } + +        @Deprecated +        public long getLength (Path path) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                if (!f.exists()) +                        throw new IOException(f.getPath() + " does not exist."); + +                return f.length(); +        } + +        @Deprecated +        public short getReplication (Path path) throws IOException { +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                if (!f.exists()) +                        throw new IOException(f.getPath() + " does not exist."); + +                return xattr.getReplication(f.getPath()); +        } + +        public short getDefaultReplication (Path path) throws IOException { +                return getReplication(path); +        } + +        public boolean setReplication (Path path, short replication) +                throws IOException { +                return true; +        } + +        public long getBlockSize (Path path) throws IOException { +                long blkSz; +                Path absolute = makeAbsolute(path); +                File f = new File(absolute.toUri().getPath()); + +                blkSz = xattr.getBlockSize(f.getPath()); +                if (blkSz == 0) +                        blkSz = getLength(path); + +                return blkSz; +        } + +        public long getDefaultBlockSize () { +                return 1 << 26; /* default's from hdfs, kfs */ +        } + +        @Deprecated +        public void lock (Path path, boolean shared) throws IOException { +        } + +        @Deprecated +        public void release (Path path) throws IOException { +        } + +        public BlockLocation[] getFileBlockLocations (FileStatus file, long start, long len) +                throws IOException { + +                Path absolute          = makeAbsolute(file.getPath()); +                File f                 = new File(absolute.toUri().getPath()); +                BlockLocation[] result = null; + +                if (file == null) +                        return null; + +                result = xattr.getPathInfo(f.getPath(), start, len); +                if (result == null) { +                        System.out.println("Problem getting destination host for file " +                                           + f.getPath()); +                        return null; +                } + +                return result; +        } + +        // getFileBlockLocations (FileStatus, long, long) is called by hadoop +        public BlockLocation[] getFileBlockLocations (Path p, long start, long len) +                throws IOException { +                return null; +        } + +        public void copyFromLocalFile (boolean delSrc, Path src, Path dst) +                throws IOException { +                FileUtil.copy(glusterFs, src, this, dst, delSrc, getConf()); +        } + +        public void copyToLocalFile (boolean delSrc, Path src, Path dst) +                throws IOException { +                FileUtil.copy(this, src, glusterFs, dst, delSrc, getConf()); +        } + +        public Path startLocalOutput (Path fsOutputFile, Path tmpLocalFile) +                throws IOException { +                return tmpLocalFile; +        } + +        public void completeLocalOutput (Path fsOutputFile, Path tmpLocalFile) +                throws IOException { +                moveFromLocalFile(tmpLocalFile, fsOutputFile); +        } +} diff --git a/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java new file mode 100644 index 00000000000..21e188c52bc --- /dev/null +++ b/glusterfs-hadoop/0.20.2/src/test/java/org/apache/hadoop/fs/glusterfs/AppTest.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.fs.glusterfs; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest +    extends TestCase +{ +    /** +     * Create the test case +     * +     * @param testName name of the test case +     */ +    public AppTest( String testName ) +    { +        super( testName ); +    } + +    /** +     * @return the suite of tests being tested +     */ +    public static Test suite() +    { +        return new TestSuite( AppTest.class ); +    } + +    /** +     * Rigourous Test :-) +     */ +    public void testApp() +    { +        assertTrue( true ); +    } +} diff --git a/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py b/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py new file mode 100644 index 00000000000..450e08fb0c6 --- /dev/null +++ b/glusterfs-hadoop/0.20.2/tools/build-deploy-jar.py @@ -0,0 +1,212 @@ +#!/usr/bin/python + +## + # + # Copyright (c) 2011 Gluster, Inc. <http://www.gluster.com> + # This file is part of GlusterFS. + # + # Licensed under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + # implied. See the License for the specific language governing + # permissions and limitations under the License. + # + ## + +import getopt +import glob +import sys, os +import shutil +import subprocess, shlex + +def usage(): +    print "usage: python build-deploy-jar.py [-b/--build] -d/--dir <hadoop-home> [-c/--core] [-m/--mapred] [-h/--henv]" + +def addSlash(s): +    if not (s[-1] == '/'): +        s = s + '/' + +    return s + +def whereis(program): +    abspath = None +    for path in (os.environ.get('PATH', '')).split(':'): +        abspath = os.path.join(path, program) +        if os.path.exists(abspath) and not os.path.isdir(abspath): +            return abspath + +    return None + +def getLatestJar(targetdir): +    latestJar = None +    glusterfsJar = glob.glob(targetdir + "*.jar") +    if len(glusterfsJar) == 0: +        print "No GlusterFS jar file found in %s ... exiting" % (targetdir) +        return None + +    # pick up the latest jar file - just in case ... +    stat = latestJar = None +    ctime = 0 + +    for jar in glusterfsJar: +        stat = os.stat(jar) +        if stat.st_ctime > ctime: +           latestJar = jar +           ctime = stat.st_ctime + +    return latestJar + +# build the glusterfs hadoop plugin using maven +def build_jar(): +    location = whereis('mvn') + +    if location == None: +        print "Cannot find maven to build glusterfs hadoop jar" +        print "please install maven or if it's already installed then fix your PATH environ" +        return None + +    # do a clean packaging +    targetdir = "./target/" +    if os.path.exists(targetdir) and os.path.isdir(targetdir): +        print "Cleaning up directories ... [ " + targetdir + " ]" +        shutil.rmtree(targetdir) + +    print "Building glusterfs jar ..." +    process = subprocess.Popen(['package'], shell=True, +                               executable=location, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + +    try: +        (pout, perr) = process.communicate() +    except: +        process.wait() +        if not process.returncode == 0: +            print "Building glusterfs jar failed" +            return None + +    latestJar = getLatestJar(targetdir) +    return latestJar + +def rcopy(f, host, libdir): +    print "   * doing remote copy to host %s" % (host) +    scpCmd = "scp %s %s:%s" % (f, host, libdir) + +    os.system(scpCmd); + +def deployInSlave(f, confdir, libdir, cc, cm, he): +    slavefile = confdir + "slaves" + +    ccFile = confdir + "core-site.xml" +    cmFile = confdir + "mapred-site.xml" +    heFile = confdir + "hadoop-env.sh" + +    sf = open(slavefile, 'r') +    for host in sf: +        host = host.rstrip('\n') +        print "  >>> Deploying %s on %s ..." % (os.path.basename(f), host) +        rcopy(f, host, libdir) + +        if cc: +            print "  >>> Deploying [%s] on %s ..." % (os.path.basename(ccFile), host) +            rcopy(ccFile, host, confdir) + +        if cm: +            print "  >>> Deploying [%s] on %s ..." % (os.path.basename(cmFile), host) +            rcopy(cmFile, host, confdir) + +        if he: +            print "  >>> Deploying [%s] on %s ..." % (os.path.basename(heFile), host) +            rcopy(heFile, host, confdir); + +        print "<<< Done\n" + +    sf.close() + +def deployInMaster(f, confdir, libdir): +    import socket +    masterfile = confdir + "masters" + +    mf = open(masterfile, 'r') +    for host in mf: +        host = host.rstrip('\n') +        print "  >>> Deploying %s on %s ..." % (os.path.basename(f), host) +        h = host +        try: +            socket.inet_aton(host) +            h = socket.getfqdn(host) +        except socket.error: +            # host is not a ip adddress +            pass + +        if h == socket.gethostname() or h == 'localhost': +            # local cp +            print "   * doing local copy" +            shutil.copy(f, libdir) +        else: +            # scp the file +            rcopy(f, h, libdir) + +        print "<<< Done\n" + +    mf.close() + +if __name__ == '__main__': +    opt = args = [] +    try: +        opt, args = getopt.getopt(sys.argv[1:], "bd:cmh", ["build", "dir=", "core", "mapred", "henv"]); +    except getopt.GetoptError, err: +        print str(err) +        usage() +        sys.exit(1) + +    needbuild = hadoop_dir = copyCore = copyMapred = copyHadoopEnv = None + +    for k, v in opt: +        if k in ("-b", "--build"): +            needbuild = True +        elif k in ("-d", "--dir"): +            hadoop_dir = v +        elif k in ("-c", "--core"): +            copyCore = True +        elif k in ("-m", "--mapred"): +            copyMapred = True +        elif k in ("-h", "--henv"): +            copyHadoopEnv = True +        else: +            assert False, "unhandled option" + +    assert not hadoop_dir == None, "hadoop directory missing" + +    if needbuild: +        jar = build_jar() +        if jar == None: +            sys.exit(1) +    else: +        jar = getLatestJar('./target/') +        if jar == None: +            print "Maybe you want to build it ? -b option" +            sys.exit(1) + +    print "" +    print "*** Deploying %s *** " % (jar) + +    # copy jar to local hadoop distribution (master) +    hadoop_home = addSlash(hadoop_dir) +    if not (os.path.exists(hadoop_home) and os.path.isdir(hadoop_home)): +        print "path " + hadoop_home + " does not exist or is not adiretory"; +        sys.exit(1); + +    hadoop_conf = hadoop_home + "conf/" +    hadoop_lib = hadoop_home + "lib/" + +    print " >>> Scanning hadoop master file for host(s) to deploy" +    deployInMaster(jar, hadoop_conf, hadoop_lib) + +    print "" +    print " >>> Scanning hadoop slave file for host(s) to deploy" +    deployInSlave(jar, hadoop_conf, hadoop_lib, copyCore, copyMapred, copyHadoopEnv)  | 
