Read operation on HDFS
In order to read a file from HDFS, create a Configuration object followed by a FileSystem object by passing configuration object in it. Add core-site.xml as resource to configuration object.
Note:- Apache Hadoop doc states that configurations are specified by resources and Hadoop by default specifies two resources, loaded in-order from the classpath: core-default.xml and core-site.xml.
Now, create a Path object from pathString- a fully qualified file name and open this file using FileSystem object and read file until reach at end of file.
public static void readHDFSFile(String srcFileLocation) throws IOException {
/*
* fully qualified name = HDFS location(ip address + port) +
* fileLocation hdfs://192.168.213.133:54310/<fileLocation>
*/
String pathString = "hdfs://192.168.213.133:54310/" + srcFileLocation;
// Create configuration object - get config files from classpath
Configuration config = new Configuration();
/*
* Add configuration file core-site.xml to configuration object.
* core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
*/
config.addResource(new Path(
"/usr/local/hadoop2.8.1/etc/hadoop/core-site.xml"));
FileSystem fs1 = null;
try {
// create a FileSystem object passing configuration object config
fs1 = FileSystem.get(config);
// Create path object and check for its existence
Path p = new Path(pathString);
if (fs1.exists(p)) {
BufferedReader br1 = new BufferedReader(new InputStreamReader(
fs1.open(p)));
String line;
line = br1.readLine();
while (line != null) {
System.out.println(line);
line = br1.readLine();
}
} else {
System.out.println("File not exist on HDFS");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// Close file descriptors
if (fs1 != null)
fs1.close();
}
}
Write operation on HDFS
In write operation ,we create a file in HDFS and copy content form source file which is available in local file system.Similar to read operation, create Configuration object followed by FileSystem object and Path object. Here we are creating file only if file does not exist on HDFS(we have an option to overwrite by passing overwrite flag as true in create method - fs1.create(path, overwrite).
/*
* Create file in HDFS.
* srcFileLocation - source file(fully qualified) is in local file system
* dstFileLocation - relative path with respect to node where HDFS exist
*/
public static void writeFileInHDFS(String dstFileLocation,
String srcFileLocation) throws IOException {
/*
* fully qualified name = HDFS location(ip address + port) +
* fileLocation hdfs://192.168.185.133:54230/<dstFileLocation>
*/
String dstPathString = "hdfs://192.168.185.133:54230/"
+ dstFileLocation;
// Create configuration object - get config files from classpath
Configuration config = new Configuration();
/*
* Add configuration file core-site.xml to configuration object
* core-site.xml is available in <Hadoop_HOME>/conf/core-site.xml
*/
config.addResource(new Path(
"/usr/local/hadoop2.8.1/etc/hadoop/core-site.xml"));
// create a FileSystem object passing configuration object config
FileSystem fs2 = null;
FSDataOutputStream out = null;
InputStream in = null;
try {
fs2 = FileSystem.get(config);
File sourceFileObj = new File(srcFileLocation);
// Check source file exist, then only create file on HDFS
if (sourceFileObj.exists()) {
// Create path object and check for its existence
Path dstPathObj = new Path(dstPathString);
// Create file on HDFS if it does not exist
if (!fs2.exists(dstPathObj)) {
System.out.println("-----Write operation in progress(check write "+
"permission on the given location in HDFS)----");
out = fs2.create(dstPathObj);
in = new BufferedInputStream(new FileInputStream(
sourceFileObj));
byte[] b1 = new byte[1024];
int numBytes = 0;
while ((numBytes = in.read(b)) > 0) {
out.write(b1, 0, numBytes);
}
} else {
System.out.println("File already exist in HDFS !!");
return;
}
// Do check for existence of newly createdfile
if (fs2.exists(dstPathObj)) {
System.out.println("File created successfully in HDFS "
+ fs2.getFileChecksum(dstPathObj));
}
} else {
System.out.println("Source file does not exist in local file system !!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// Close file descriptors
if (in != null)
in.close();
if (out != null)
out.close();
if (fs2 != null)
fs2.close();
}
}