Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F103060957
hdfs.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Feb 26, 23:15
Size
89 KB
Mime Type
text/x-c
Expires
Fri, Feb 28, 23:15 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
24471581
Attached To
R3704 elastic-yarn
hdfs.c
View Options
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "exception.h"
#include "hdfs.h"
#include "jni_helper.h"
#include <inttypes.h>
#include <stdio.h>
#include <string.h>
/* Some frequently used Java paths */
#define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
#define HADOOP_PATH "org/apache/hadoop/fs/Path"
#define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
#define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
#define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
#define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
#define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
#define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
#define READ_OPTION "org/apache/hadoop/fs/ReadOption"
#define JAVA_VOID "V"
/* Macros for constructing method signatures */
#define JPARAM(X) "L" X ";"
#define JARRPARAM(X) "[L" X ";"
#define JMETHOD1(X, R) "(" X ")" R
#define JMETHOD2(X, Y, R) "(" X Y ")" R
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
/**
* The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
*/
enum hdfsStreamType
{
UNINITIALIZED = 0,
INPUT = 1,
OUTPUT = 2,
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct hdfsFile_internal {
void* file;
enum hdfsStreamType type;
int flags;
};
int hdfsFileIsOpenForRead(hdfsFile file)
{
return (file->type == INPUT);
}
int hdfsFileGetReadStatistics(hdfsFile file,
struct hdfsReadStatistics **stats)
{
jthrowable jthr;
jobject readStats = NULL;
jvalue jVal;
struct hdfsReadStatistics *s = NULL;
int ret;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
if (file->type != INPUT) {
ret = EINVAL;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
"org/apache/hadoop/hdfs/client/HdfsDataInputStream",
"getReadStatistics",
"()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getReadStatistics failed");
goto done;
}
readStats = jVal.l;
s = malloc(sizeof(struct hdfsReadStatistics));
if (!s) {
ret = ENOMEM;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalBytesRead failed");
goto done;
}
s->totalBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalLocalBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalLocalBytesRead failed");
goto done;
}
s->totalLocalBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalShortCircuitBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed");
goto done;
}
s->totalShortCircuitBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalZeroCopyBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
goto done;
}
s->totalZeroCopyBytesRead = jVal.j;
*stats = s;
s = NULL;
ret = 0;
done:
destroyLocalReference(env, readStats);
free(s);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int64_t hdfsReadStatisticsGetRemoteBytesRead(
const struct hdfsReadStatistics *stats)
{
return stats->totalBytesRead - stats->totalLocalBytesRead;
}
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats)
{
free(stats);
}
int hdfsFileIsOpenForWrite(hdfsFile file)
{
return (file->type == OUTPUT);
}
int hdfsFileUsesDirectRead(hdfsFile file)
{
return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
}
void hdfsFileDisableDirectRead(hdfsFile file)
{
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
int hdfsDisableDomainSocketSecurity(void)
{
jthrowable jthr;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, STATIC, NULL,
"org/apache/hadoop/net/unix/DomainSocket",
"disableBindPathValidation", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"DomainSocket#disableBindPathValidation");
return -1;
}
return 0;
}
/**
* hdfsJniEnv: A wrapper struct to be used as 'value'
* while saving thread -> JNIEnv* mappings
*/
typedef struct
{
JNIEnv* env;
} hdfsJniEnv;
/**
* Helper function to create a org.apache.hadoop.fs.Path object.
* @param env: The JNIEnv pointer.
* @param path: The file-path for which to construct org.apache.hadoop.fs.Path
* object.
* @return Returns a jobject on success and NULL on error.
*/
static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
jobject *out)
{
jthrowable jthr;
jstring jPathString;
jobject jPath;
//Construct a java.lang.String object
jthr = newJavaStr(env, path, &jPathString);
if (jthr)
return jthr;
//Construct the org.apache.hadoop.fs.Path object
jthr = constructNewObjectOfClass(env, &jPath, "org/apache/hadoop/fs/Path",
"(Ljava/lang/String;)V", jPathString);
destroyLocalReference(env, jPathString);
if (jthr)
return jthr;
*out = jPath;
return NULL;
}
static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
jthrowable jthr;
jvalue jVal;
jstring jkey = NULL, jRet = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "get", JMETHOD1(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING)), jkey);
if (jthr)
goto done;
jRet = jVal.l;
jthr = newCStr(env, jRet, val);
done:
destroyLocalReference(env, jkey);
destroyLocalReference(env, jRet);
return jthr;
}
int hdfsConfGetStr(const char *key, char **val)
{
JNIEnv *env;
int ret;
jthrowable jthr;
jobject jConfiguration = NULL;
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetStr(%s): new Configuration", key);
goto done;
}
jthr = hadoopConfGetStr(env, jConfiguration, key, val);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetStr(%s): hadoopConfGetStr", key);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
void hdfsConfStrFree(char *val)
{
free(val);
}
static jthrowable hadoopConfGetInt(JNIEnv *env, jobject jConfiguration,
const char *key, int32_t *val)
{
jthrowable jthr = NULL;
jvalue jVal;
jstring jkey = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
return jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "getInt", JMETHOD2(JPARAM(JAVA_STRING), "I", "I"),
jkey, (jint)(*val));
destroyLocalReference(env, jkey);
if (jthr)
return jthr;
*val = jVal.i;
return NULL;
}
int hdfsConfGetInt(const char *key, int32_t *val)
{
JNIEnv *env;
int ret;
jobject jConfiguration = NULL;
jthrowable jthr;
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetInt(%s): new Configuration", key);
goto done;
}
jthr = hadoopConfGetInt(env, jConfiguration, key, val);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsConfGetInt(%s): hadoopConfGetInt", key);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
if (ret)
errno = ret;
return ret;
}
struct hdfsBuilderConfOpt {
struct hdfsBuilderConfOpt *next;
const char *key;
const char *val;
};
struct hdfsBuilder {
int forceNewInstance;
const char *nn;
tPort port;
const char *kerbTicketCachePath;
const char *userName;
struct hdfsBuilderConfOpt *opts;
};
struct hdfsBuilder *hdfsNewBuilder(void)
{
struct hdfsBuilder *bld = calloc(1, sizeof(struct hdfsBuilder));
if (!bld) {
errno = ENOMEM;
return NULL;
}
return bld;
}
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
const char *val)
{
struct hdfsBuilderConfOpt *opt, *next;
opt = calloc(1, sizeof(struct hdfsBuilderConfOpt));
if (!opt)
return -ENOMEM;
next = bld->opts;
bld->opts = opt;
opt->next = next;
opt->key = key;
opt->val = val;
return 0;
}
void hdfsFreeBuilder(struct hdfsBuilder *bld)
{
struct hdfsBuilderConfOpt *cur, *next;
cur = bld->opts;
for (cur = bld->opts; cur; ) {
next = cur->next;
free(cur);
cur = next;
}
free(bld);
}
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld)
{
bld->forceNewInstance = 1;
}
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
{
bld->nn = nn;
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
{
bld->port = port;
}
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
{
bld->userName = userName;
}
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath)
{
bld->kerbTicketCachePath = kerbTicketCachePath;
}
hdfsFS hdfsConnect(const char* host, tPort port)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
return hdfsBuilderConnect(bld);
}
/** Always return a new FileSystem handle */
hdfsFS hdfsConnectNewInstance(const char* host, tPort port)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetForceNewInstance(bld);
return hdfsBuilderConnect(bld);
}
hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char *user)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetUserName(bld, user);
return hdfsBuilderConnect(bld);
}
/** Always return a new FileSystem handle */
hdfsFS hdfsConnectAsUserNewInstance(const char* host, tPort port,
const char *user)
{
struct hdfsBuilder *bld = hdfsNewBuilder();
if (!bld)
return NULL;
hdfsBuilderSetNameNode(bld, host);
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderSetUserName(bld, user);
return hdfsBuilderConnect(bld);
}
/**
* Calculate the effective URI to use, given a builder configuration.
*
* If there is not already a URI scheme, we prepend 'hdfs://'.
*
* If there is not already a port specified, and a port was given to the
* builder, we suffix that port. If there is a port specified but also one in
* the URI, that is an error.
*
* @param bld The hdfs builder object
* @param uri (out param) dynamically allocated string representing the
* effective URI
*
* @return 0 on success; error code otherwise
*/
static int calcEffectiveURI(struct hdfsBuilder *bld, char ** uri)
{
const char *scheme;
char suffix[64];
const char *lastColon;
char *u;
size_t uriLen;
if (!bld->nn)
return EINVAL;
scheme = (strstr(bld->nn, "://")) ? "" : "hdfs://";
if (bld->port == 0) {
suffix[0] = '\0';
} else {
lastColon = rindex(bld->nn, ':');
if (lastColon && (strspn(lastColon + 1, "0123456789") ==
strlen(lastColon + 1))) {
fprintf(stderr, "port %d was given, but URI '%s' already "
"contains a port!\n", bld->port, bld->nn);
return EINVAL;
}
snprintf(suffix, sizeof(suffix), ":%d", bld->port);
}
uriLen = strlen(scheme) + strlen(bld->nn) + strlen(suffix);
u = malloc((uriLen + 1) * (sizeof(char)));
if (!u) {
fprintf(stderr, "calcEffectiveURI: out of memory");
return ENOMEM;
}
snprintf(u, uriLen + 1, "%s%s%s", scheme, bld->nn, suffix);
*uri = u;
return 0;
}
static const char *maybeNull(const char *str)
{
return str ? str : "(NULL)";
}
static const char *hdfsBuilderToStr(const struct hdfsBuilder *bld,
char *buf, size_t bufLen)
{
snprintf(buf, bufLen, "forceNewInstance=%d, nn=%s, port=%d, "
"kerbTicketCachePath=%s, userName=%s",
bld->forceNewInstance, maybeNull(bld->nn), bld->port,
maybeNull(bld->kerbTicketCachePath), maybeNull(bld->userName));
return buf;
}
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld)
{
JNIEnv *env = 0;
jobject jConfiguration = NULL, jFS = NULL, jURI = NULL, jCachePath = NULL;
jstring jURIString = NULL, jUserString = NULL;
jvalue jVal;
jthrowable jthr = NULL;
char *cURI = 0, buf[512];
int ret;
jobject jRet = NULL;
struct hdfsBuilderConfOpt *opt;
//Get the JNIEnv* corresponding to current thread
env = getJNIEnv();
if (env == NULL) {
ret = EINTERNAL;
goto done;
}
// jConfiguration = new Configuration();
jthr = constructNewObjectOfClass(env, &jConfiguration, HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)", hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
// set configuration values
for (opt = bld->opts; opt; opt = opt->next) {
jthr = hadoopConfSetStr(env, jConfiguration, opt->key, opt->val);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s): error setting conf '%s' to '%s'",
hdfsBuilderToStr(bld, buf, sizeof(buf)), opt->key, opt->val);
goto done;
}
}
//Check what type of FileSystem the caller wants...
if (bld->nn == NULL) {
// Get a local filesystem.
if (bld->forceNewInstance) {
// fs = FileSytem#newInstanceLocal(conf);
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"newInstanceLocal", JMETHOD1(JPARAM(HADOOP_CONF),
JPARAM(HADOOP_LOCALFS)), jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
} else {
// fs = FileSytem#getLocal(conf);
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "getLocal",
JMETHOD1(JPARAM(HADOOP_CONF),
JPARAM(HADOOP_LOCALFS)),
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
}
} else {
if (!strcmp(bld->nn, "default")) {
// jURI = FileSystem.getDefaultUri(conf)
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"getDefaultUri",
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;",
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jURI = jVal.l;
} else {
// fs = FileSystem#get(URI, conf, ugi);
ret = calcEffectiveURI(bld, &cURI);
if (ret)
goto done;
jthr = newJavaStr(env, cURI, &jURIString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL, JAVA_NET_URI,
"create", "(Ljava/lang/String;)Ljava/net/URI;",
jURIString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jURI = jVal.l;
}
if (bld->kerbTicketCachePath) {
jthr = hadoopConfSetStr(env, jConfiguration,
KERBEROS_TICKET_CACHE_PATH, bld->kerbTicketCachePath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
}
jthr = newJavaStr(env, bld->userName, &jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
if (bld->forceNewInstance) {
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS,
"newInstance", JMETHOD3(JPARAM(JAVA_NET_URI),
JPARAM(HADOOP_CONF), JPARAM(JAVA_STRING),
JPARAM(HADOOP_FS)),
jURI, jConfiguration, jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
} else {
jthr = invokeMethod(env, &jVal, STATIC, NULL, HADOOP_FS, "get",
JMETHOD3(JPARAM(JAVA_NET_URI), JPARAM(HADOOP_CONF),
JPARAM(JAVA_STRING), JPARAM(HADOOP_FS)),
jURI, jConfiguration, jUserString);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
jFS = jVal.l;
}
}
jRet = (*env)->NewGlobalRef(env, jFS);
if (!jRet) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsBuilderConnect(%s)",
hdfsBuilderToStr(bld, buf, sizeof(buf)));
goto done;
}
ret = 0;
done:
// Release unnecessary local references
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jFS);
destroyLocalReference(env, jURI);
destroyLocalReference(env, jCachePath);
destroyLocalReference(env, jURIString);
destroyLocalReference(env, jUserString);
free(cURI);
hdfsFreeBuilder(bld);
if (ret) {
errno = ret;
return NULL;
}
return (hdfsFS)jRet;
}
int hdfsDisconnect(hdfsFS fs)
{
// JAVA EQUIVALENT:
// fs.close()
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
int ret;
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Parameters
jobject jFS = (jobject)fs;
//Sanity check
if (fs == NULL) {
errno = EBADF;
return -1;
}
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"close", "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsDisconnect: FileSystem#close");
} else {
ret = 0;
}
(*env)->DeleteGlobalRef(env, jFS);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
/**
* Get the default block size of a FileSystem object.
*
* @param env The Java env
* @param jFS The FileSystem object
* @param jPath The path to find the default blocksize at
* @param out (out param) the default block size
*
* @return NULL on success; or the exception
*/
static jthrowable getDefaultBlockSize(JNIEnv *env, jobject jFS,
jobject jPath, jlong *out)
{
jthrowable jthr;
jvalue jVal;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getDefaultBlockSize", JMETHOD1(JPARAM(HADOOP_PATH), "J"), jPath);
if (jthr)
return jthr;
*out = jVal.j;
return NULL;
}
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blockSize)
{
/*
JAVA EQUIVALENT:
File f = new File(path);
FSData{Input|Output}Stream f{is|os} = fs.create(f);
return f{is|os};
*/
/* Get the JNIEnv* corresponding to current thread */
JNIEnv* env = getJNIEnv();
int accmode = flags & O_ACCMODE;
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
jstring jStrBufferSize = NULL, jStrReplication = NULL;
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
jobject jFS = (jobject)fs;
jthrowable jthr;
jvalue jVal;
hdfsFile file = NULL;
int ret;
if (accmode == O_RDONLY || accmode == O_WRONLY) {
/* yay */
} else if (accmode == O_RDWR) {
fprintf(stderr, "ERROR: cannot open an hdfs file in O_RDWR mode\n");
errno = ENOTSUP;
return NULL;
} else {
fprintf(stderr, "ERROR: cannot open an hdfs file in mode 0x%x\n", accmode);
errno = EINVAL;
return NULL;
}
if ((flags & O_CREAT) && (flags & O_EXCL)) {
fprintf(stderr, "WARN: hdfs does not truly support O_CREATE && O_EXCL\n");
}
/* The hadoop java api/signature */
const char* method = NULL;
const char* signature = NULL;
if (accmode == O_RDONLY) {
method = "open";
signature = JMETHOD2(JPARAM(HADOOP_PATH), "I", JPARAM(HADOOP_ISTRM));
} else if (flags & O_APPEND) {
method = "append";
signature = JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_OSTRM));
} else {
method = "create";
signature = JMETHOD2(JPARAM(HADOOP_PATH), "ZISJ", JPARAM(HADOOP_OSTRM));
}
/* Create an object of org.apache.hadoop.fs.Path */
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFile(%s): constructNewObjectOfPath", path);
goto done;
}
/* Get the Configuration object from the FileSystem object */
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getConf", JMETHOD1("", JPARAM(HADOOP_CONF)));
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFile(%s): FileSystem#getConf", path);
goto done;
}
jConfiguration = jVal.l;
jint jBufferSize = bufferSize;
jshort jReplication = replication;
jStrBufferSize = (*env)->NewStringUTF(env, "io.file.buffer.size");
if (!jStrBufferSize) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
goto done;
}
jStrReplication = (*env)->NewStringUTF(env, "dfs.replication");
if (!jStrReplication) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, "OOM");
goto done;
}
if (!bufferSize) {
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
jStrBufferSize, 4096);
if (jthr) {
ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)",
path);
goto done;
}
jBufferSize = jVal.i;
}
if ((accmode == O_WRONLY) && (flags & O_APPEND) == 0) {
if (!replication) {
jthr = invokeMethod(env, &jVal, INSTANCE, jConfiguration,
HADOOP_CONF, "getInt", "(Ljava/lang/String;I)I",
jStrReplication, 1);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFile(%s): Configuration#getInt(dfs.replication)",
path);
goto done;
}
jReplication = jVal.i;
}
}
/* Create and return either the FSDataInputStream or
FSDataOutputStream references jobject jStream */
// READ?
if (accmode == O_RDONLY) {
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
method, signature, jPath, jBufferSize);
} else if ((accmode == O_WRONLY) && (flags & O_APPEND)) {
// WRITE/APPEND?
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
method, signature, jPath);
} else {
// WRITE/CREATE
jboolean jOverWrite = 1;
jlong jBlockSize = blockSize;
if (jBlockSize == 0) {
jthr = getDefaultBlockSize(env, jFS, jPath, &jBlockSize);
if (jthr) {
ret = EIO;
goto done;
}
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
method, signature, jPath, jOverWrite,
jBufferSize, jReplication, jBlockSize);
}
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsOpenFile(%s): FileSystem#%s(%s)", path, method, signature);
goto done;
}
jFile = jVal.l;
file = calloc(1, sizeof(struct hdfsFile_internal));
if (!file) {
fprintf(stderr, "hdfsOpenFile(%s): OOM create hdfsFile\n", path);
ret = ENOMEM;
goto done;
}
file->file = (*env)->NewGlobalRef(env, jFile);
if (!file->file) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsOpenFile(%s): NewGlobalRef", path);
goto done;
}
file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT);
file->flags = 0;
if ((flags & O_WRONLY) == 0) {
// Try a test read to see if we can do direct reads
char buf;
if (readDirect(fs, file, &buf, 0) == 0) {
// Success - 0-byte read should return 0
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
} else if (errno != ENOTSUP) {
// Unexpected error. Clear it, don't set the direct flag.
fprintf(stderr,
"hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
"for direct read compatibility\n", path, errno);
}
}
ret = 0;
done:
destroyLocalReference(env, jStrBufferSize);
destroyLocalReference(env, jStrReplication);
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jPath);
destroyLocalReference(env, jFile);
if (ret) {
if (file) {
if (file->file) {
(*env)->DeleteGlobalRef(env, file->file);
}
free(file);
}
errno = ret;
return NULL;
}
return file;
}
int hdfsCloseFile(hdfsFS fs, hdfsFile file)
{
int ret;
// JAVA EQUIVALENT:
// file.close
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Caught exception
jthrowable jthr;
//Sanity check
if (!file || file->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//The interface whose 'close' method to be called
const char* interface = (file->type == INPUT) ?
HADOOP_ISTRM : HADOOP_OSTRM;
jthr = invokeMethod(env, NULL, INSTANCE, file->file, interface,
"close", "()V");
if (jthr) {
const char *interfaceShortName = (file->type == INPUT) ?
"FSDataInputStream" : "FSDataOutputStream";
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"%s#close", interfaceShortName);
} else {
ret = 0;
}
//De-allocate memory
(*env)->DeleteGlobalRef(env, file->file);
free(file);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsExists(hdfsFS fs, const char *path)
{
JNIEnv *env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jPath;
jvalue jVal;
jobject jFS = (jobject)fs;
jthrowable jthr;
if (path == NULL) {
errno = EINVAL;
return -1;
}
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsExists: constructNewObjectOfPath");
return -1;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"), jPath);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsExists: invokeMethod(%s)",
JMETHOD1(JPARAM(HADOOP_PATH), "Z"));
return -1;
}
if (jVal.z) {
return 0;
} else {
errno = ENOENT;
return -1;
}
}
// Checks input file for readiness for reading.
static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
jobject* jInputStream)
{
*jInputStream = (jobject)(f ? f->file : NULL);
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
return 0;
}
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
if (length == 0) {
return 0;
} else if (length < 0) {
errno = EINVAL;
return -1;
}
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
return readDirect(fs, f, buffer, length);
}
// JAVA EQUIVALENT:
// byte [] bR = new byte[length];
// fis.read(bR);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Parameters
jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jbyteArray jbRarray;
jint noReadBytes = length;
jvalue jVal;
jthrowable jthr;
//Read the requisite bytes
jbRarray = (*env)->NewByteArray(env, length);
if (!jbRarray) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsRead: NewByteArray");
return -1;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream, HADOOP_ISTRM,
"read", "([B)I", jbRarray);
if (jthr) {
destroyLocalReference(env, jbRarray);
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsRead: FSDataInputStream#read");
return -1;
}
if (jVal.i < 0) {
// EOF
destroyLocalReference(env, jbRarray);
return 0;
} else if (jVal.i == 0) {
destroyLocalReference(env, jbRarray);
errno = EINTR;
return -1;
}
(*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
destroyLocalReference(env, jbRarray);
if ((*env)->ExceptionCheck(env)) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsRead: GetByteArrayRegion");
return -1;
}
return jVal.i;
}
// Reads using the read(ByteBuffer) API, which does fewer copies
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
// JAVA EQUIVALENT:
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
// fis.read(bbuffer);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jvalue jVal;
jthrowable jthr;
//Read the requisite bytes
jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length);
if (bb == NULL) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"readDirect: NewDirectByteBuffer");
return -1;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I", bb);
destroyLocalReference(env, bb);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"readDirect: FSDataInputStream#read");
return -1;
}
return (jVal.i < 0) ? 0 : jVal.i;
}
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
void* buffer, tSize length)
{
JNIEnv* env;
jbyteArray jbRarray;
jvalue jVal;
jthrowable jthr;
if (length == 0) {
return 0;
} else if (length < 0) {
errno = EINVAL;
return -1;
}
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
// JAVA EQUIVALENT:
// byte [] bR = new byte[length];
// fis.read(pos, bR, 0, length);
jbRarray = (*env)->NewByteArray(env, length);
if (!jbRarray) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsPread: NewByteArray");
return -1;
}
jthr = invokeMethod(env, &jVal, INSTANCE, f->file, HADOOP_ISTRM,
"read", "(J[BII)I", position, jbRarray, 0, length);
if (jthr) {
destroyLocalReference(env, jbRarray);
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsPread: FSDataInputStream#read");
return -1;
}
if (jVal.i < 0) {
// EOF
destroyLocalReference(env, jbRarray);
return 0;
} else if (jVal.i == 0) {
destroyLocalReference(env, jbRarray);
errno = EINTR;
return -1;
}
(*env)->GetByteArrayRegion(env, jbRarray, 0, jVal.i, buffer);
destroyLocalReference(env, jbRarray);
if ((*env)->ExceptionCheck(env)) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsPread: GetByteArrayRegion");
return -1;
}
return jVal.i;
}
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
{
// JAVA EQUIVALENT
// byte b[] = str.getBytes();
// fso.write(b);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
jobject jOutputStream = f->file;
jbyteArray jbWarray;
jthrowable jthr;
if (length < 0) {
errno = EINVAL;
return -1;
}
//Error checking... make sure that this file is 'writable'
if (f->type != OUTPUT) {
fprintf(stderr, "Cannot write into a non-OutputStream object!\n");
errno = EINVAL;
return -1;
}
if (length < 0) {
errno = EINVAL;
return -1;
}
if (length == 0) {
return 0;
}
//Write the requisite bytes into the file
jbWarray = (*env)->NewByteArray(env, length);
if (!jbWarray) {
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsWrite: NewByteArray");
return -1;
}
(*env)->SetByteArrayRegion(env, jbWarray, 0, length, buffer);
if ((*env)->ExceptionCheck(env)) {
destroyLocalReference(env, jbWarray);
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsWrite(length = %d): SetByteArrayRegion", length);
return -1;
}
jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
HADOOP_OSTRM, "write", "([B)V", jbWarray);
destroyLocalReference(env, jbWarray);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsWrite: FSDataOutputStream#write");
return -1;
}
// Unlike most Java streams, FSDataOutputStream never does partial writes.
// If we succeeded, all the data was written.
return length;
}
int hdfsSeek(hdfsFS fs, hdfsFile f, tOffset desiredPos)
{
// JAVA EQUIVALENT
// fis.seek(pos);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != INPUT) {
errno = EBADF;
return -1;
}
jobject jInputStream = f->file;
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jInputStream,
HADOOP_ISTRM, "seek", "(J)V", desiredPos);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsSeek(desiredPos=%" PRId64 ")"
": FSDataInputStream#seek", desiredPos);
return -1;
}
return 0;
}
tOffset hdfsTell(hdfsFS fs, hdfsFile f)
{
// JAVA EQUIVALENT
// pos = f.getPos();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Parameters
jobject jStream = f->file;
const char* interface = (f->type == INPUT) ?
HADOOP_ISTRM : HADOOP_OSTRM;
jvalue jVal;
jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jStream,
interface, "getPos", "()J");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsTell: %s#getPos",
((f->type == INPUT) ? "FSDataInputStream" :
"FSDataOutputStream"));
return -1;
}
return jVal.j;
}
int hdfsFlush(hdfsFS fs, hdfsFile f)
{
// JAVA EQUIVALENT
// fos.flush();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != OUTPUT) {
errno = EBADF;
return -1;
}
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, f->file,
HADOOP_OSTRM, "flush", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFlush: FSDataInputStream#flush");
return -1;
}
return 0;
}
int hdfsHFlush(hdfsFS fs, hdfsFile f)
{
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != OUTPUT) {
errno = EBADF;
return -1;
}
jobject jOutputStream = f->file;
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
HADOOP_OSTRM, "hflush", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsHFlush: FSDataOutputStream#hflush");
return -1;
}
return 0;
}
int hdfsHSync(hdfsFS fs, hdfsFile f)
{
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != OUTPUT) {
errno = EBADF;
return -1;
}
jobject jOutputStream = f->file;
jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream,
HADOOP_OSTRM, "hsync", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsHSync: FSDataOutputStream#hsync");
return -1;
}
return 0;
}
int hdfsAvailable(hdfsFS fs, hdfsFile f)
{
// JAVA EQUIVALENT
// fis.available();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Sanity check
if (!f || f->type != INPUT) {
errno = EBADF;
return -1;
}
//Parameters
jobject jInputStream = f->file;
jvalue jVal;
jthrowable jthr = invokeMethod(env, &jVal, INSTANCE, jInputStream,
HADOOP_ISTRM, "available", "()I");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsAvailable: FSDataInputStream#available");
return -1;
}
return jVal.i;
}
static int hdfsCopyImpl(hdfsFS srcFS, const char* src, hdfsFS dstFS,
const char* dst, jboolean deleteSource)
{
//JAVA EQUIVALENT
// FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
// deleteSource = false, conf)
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
//Parameters
jobject jSrcFS = (jobject)srcFS;
jobject jDstFS = (jobject)dstFS;
jobject jConfiguration = NULL, jSrcPath = NULL, jDstPath = NULL;
jthrowable jthr;
jvalue jVal;
int ret;
jthr = constructNewObjectOfPath(env, src, &jSrcPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(src=%s): constructNewObjectOfPath", src);
goto done;
}
jthr = constructNewObjectOfPath(env, dst, &jDstPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(dst=%s): constructNewObjectOfPath", dst);
goto done;
}
//Create the org.apache.hadoop.conf.Configuration object
jthr = constructNewObjectOfClass(env, &jConfiguration,
HADOOP_CONF, "()V");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl: Configuration constructor");
goto done;
}
//FileUtil#copy
jthr = invokeMethod(env, &jVal, STATIC,
NULL, "org/apache/hadoop/fs/FileUtil", "copy",
"(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"ZLorg/apache/hadoop/conf/Configuration;)Z",
jSrcFS, jSrcPath, jDstFS, jDstPath, deleteSource,
jConfiguration);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
"FileUtil#copy", src, dst, deleteSource);
goto done;
}
if (!jVal.z) {
ret = EIO;
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jConfiguration);
destroyLocalReference(env, jSrcPath);
destroyLocalReference(env, jDstPath);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
return hdfsCopyImpl(srcFS, src, dstFS, dst, 0);
}
int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst)
{
return hdfsCopyImpl(srcFS, src, dstFS, dst, 1);
}
int hdfsDelete(hdfsFS fs, const char* path, int recursive)
{
// JAVA EQUIVALENT:
// Path p = new Path(path);
// bool retval = fs.delete(p, recursive);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
jthrowable jthr;
jobject jPath;
jvalue jVal;
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsDelete(path=%s): constructNewObjectOfPath", path);
return -1;
}
jboolean jRecursive = recursive ? JNI_TRUE : JNI_FALSE;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"delete", "(Lorg/apache/hadoop/fs/Path;Z)Z",
jPath, jRecursive);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsDelete(path=%s, recursive=%d): "
"FileSystem#delete", path, recursive);
return -1;
}
if (!jVal.z) {
errno = EIO;
return -1;
}
return 0;
}
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath)
{
// JAVA EQUIVALENT:
// Path old = new Path(oldPath);
// Path new = new Path(newPath);
// fs.rename(old, new);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
jthrowable jthr;
jobject jOldPath = NULL, jNewPath = NULL;
int ret = -1;
jvalue jVal;
jthr = constructNewObjectOfPath(env, oldPath, &jOldPath );
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsRename: constructNewObjectOfPath(%s)", oldPath);
goto done;
}
jthr = constructNewObjectOfPath(env, newPath, &jNewPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsRename: constructNewObjectOfPath(%s)", newPath);
goto done;
}
// Rename the file
// TODO: use rename2 here? (See HDFS-3592)
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS, "rename",
JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_PATH), "Z"),
jOldPath, jNewPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename",
oldPath, newPath);
goto done;
}
if (!jVal.z) {
errno = EIO;
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jOldPath);
destroyLocalReference(env, jNewPath);
return ret;
}
char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize)
{
// JAVA EQUIVALENT:
// Path p = fs.getWorkingDirectory();
// return p.toString()
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
jobject jPath = NULL;
jstring jPathString = NULL;
jobject jFS = (jobject)fs;
jvalue jVal;
jthrowable jthr;
int ret;
const char *jPathChars = NULL;
//FileSystem#getWorkingDirectory()
jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
HADOOP_FS, "getWorkingDirectory",
"()Lorg/apache/hadoop/fs/Path;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory");
goto done;
}
jPath = jVal.l;
if (!jPath) {
fprintf(stderr, "hdfsGetWorkingDirectory: "
"FileSystem#getWorkingDirectory returned NULL");
ret = -EIO;
goto done;
}
//Path#toString()
jthr = invokeMethod(env, &jVal, INSTANCE, jPath,
"org/apache/hadoop/fs/Path", "toString",
"()Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetWorkingDirectory: Path#toString");
goto done;
}
jPathString = jVal.l;
jPathChars = (*env)->GetStringUTFChars(env, jPathString, NULL);
if (!jPathChars) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsGetWorkingDirectory: GetStringUTFChars");
goto done;
}
//Copy to user-provided buffer
ret = snprintf(buffer, bufferSize, "%s", jPathChars);
if (ret >= bufferSize) {
ret = ENAMETOOLONG;
goto done;
}
ret = 0;
done:
if (jPathChars) {
(*env)->ReleaseStringUTFChars(env, jPathString, jPathChars);
}
destroyLocalReference(env, jPath);
destroyLocalReference(env, jPathString);
if (ret) {
errno = ret;
return NULL;
}
return buffer;
}
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path)
{
// JAVA EQUIVALENT:
// fs.setWorkingDirectory(Path(path));
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
jthrowable jthr;
jobject jPath;
//Create an object of org.apache.hadoop.fs.Path
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsSetWorkingDirectory(%s): constructNewObjectOfPath",
path);
return -1;
}
//FileSystem#setWorkingDirectory()
jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"setWorkingDirectory",
"(Lorg/apache/hadoop/fs/Path;)V", jPath);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, NOPRINT_EXC_ILLEGAL_ARGUMENT,
"hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory",
path);
return -1;
}
return 0;
}
int hdfsCreateDirectory(hdfsFS fs, const char* path)
{
// JAVA EQUIVALENT:
// fs.mkdirs(new Path(path));
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
jobject jPath;
jthrowable jthr;
//Create an object of org.apache.hadoop.fs.Path
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsCreateDirectory(%s): constructNewObjectOfPath", path);
return -1;
}
//Create the directory
jvalue jVal;
jVal.z = 0;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"mkdirs", "(Lorg/apache/hadoop/fs/Path;)Z",
jPath);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK | NOPRINT_EXC_PARENT_NOT_DIRECTORY,
"hdfsCreateDirectory(%s): FileSystem#mkdirs", path);
return -1;
}
if (!jVal.z) {
// It's unclear under exactly which conditions FileSystem#mkdirs
// is supposed to return false (as opposed to throwing an exception.)
// It seems like the current code never actually returns false.
// So we're going to translate this to EIO, since there seems to be
// nothing more specific we can do with it.
errno = EIO;
return -1;
}
return 0;
}
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication)
{
// JAVA EQUIVALENT:
// fs.setReplication(new Path(path), replication);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
jthrowable jthr;
//Create an object of org.apache.hadoop.fs.Path
jobject jPath;
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsSetReplication(path=%s): constructNewObjectOfPath", path);
return -1;
}
//Create the directory
jvalue jVal;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"setReplication", "(Lorg/apache/hadoop/fs/Path;S)Z",
jPath, replication);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsSetReplication(path=%s, replication=%d): "
"FileSystem#setReplication", path, replication);
return -1;
}
if (!jVal.z) {
// setReplication returns false "if file does not exist or is a
// directory." So the nearest translation to that is ENOENT.
errno = ENOENT;
return -1;
}
return 0;
}
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
{
// JAVA EQUIVALENT:
// fs.setOwner(path, owner, group)
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
if (owner == NULL && group == NULL) {
return 0;
}
jobject jFS = (jobject)fs;
jobject jPath = NULL;
jstring jOwner = NULL, jGroup = NULL;
jthrowable jthr;
int ret;
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsChown(path=%s): constructNewObjectOfPath", path);
goto done;
}
jthr = newJavaStr(env, owner, &jOwner);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsChown(path=%s): newJavaStr(%s)", path, owner);
goto done;
}
jthr = newJavaStr(env, group, &jGroup);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsChown(path=%s): newJavaStr(%s)", path, group);
goto done;
}
//Create the directory
jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"setOwner", JMETHOD3(JPARAM(HADOOP_PATH),
JPARAM(JAVA_STRING), JPARAM(JAVA_STRING), JAVA_VOID),
jPath, jOwner, jGroup);
if (jthr) {
ret = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsChown(path=%s, owner=%s, group=%s): "
"FileSystem#setOwner", path, owner, group);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jPath);
destroyLocalReference(env, jOwner);
destroyLocalReference(env, jGroup);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsChmod(hdfsFS fs, const char* path, short mode)
{
int ret;
// JAVA EQUIVALENT:
// fs.setPermission(path, FsPermission)
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthrowable jthr;
jobject jPath = NULL, jPermObj = NULL;
jobject jFS = (jobject)fs;
// construct jPerm = FsPermission.createImmutable(short mode);
jshort jmode = mode;
jthr = constructNewObjectOfClass(env, &jPermObj,
HADOOP_FSPERM,"(S)V",jmode);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"constructNewObjectOfClass(%s)", HADOOP_FSPERM);
return -1;
}
//Create an object of org.apache.hadoop.fs.Path
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsChmod(%s): constructNewObjectOfPath", path);
goto done;
}
//Create the directory
jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"setPermission",
JMETHOD2(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FSPERM), JAVA_VOID),
jPath, jPermObj);
if (jthr) {
ret = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsChmod(%s): FileSystem#setPermission", path);
goto done;
}
ret = 0;
done:
destroyLocalReference(env, jPath);
destroyLocalReference(env, jPermObj);
if (ret) {
errno = ret;
return -1;
}
return 0;
}
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
{
// JAVA EQUIVALENT:
// fs.setTimes(src, mtime, atime)
jthrowable jthr;
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
//Create an object of org.apache.hadoop.fs.Path
jobject jPath;
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsUtime(path=%s): constructNewObjectOfPath", path);
return -1;
}
const tTime NO_CHANGE = -1;
jlong jmtime = (mtime == NO_CHANGE) ? -1 : (mtime * (jlong)1000);
jlong jatime = (atime == NO_CHANGE) ? -1 : (atime * (jlong)1000);
jthr = invokeMethod(env, NULL, INSTANCE, jFS, HADOOP_FS,
"setTimes", JMETHOD3(JPARAM(HADOOP_PATH), "J", "J", JAVA_VOID),
jPath, jmtime, jatime);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsUtime(path=%s): FileSystem#setTimes", path);
return -1;
}
return 0;
}
/**
* Zero-copy options.
*
* We cache the EnumSet of ReadOptions which has to be passed into every
* readZero call, to avoid reconstructing it each time. This cache is cleared
* whenever an element changes.
*/
struct hadoopRzOptions
{
JNIEnv *env;
int skipChecksums;
jobject byteBufferPool;
jobject cachedEnumSet;
};
struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
{
struct hadoopRzOptions *opts;
JNIEnv *env;
env = getJNIEnv();
if (!env) {
// Check to make sure the JNI environment is set up properly.
errno = EINTERNAL;
return NULL;
}
opts = calloc(1, sizeof(struct hadoopRzOptions));
if (!opts) {
errno = ENOMEM;
return NULL;
}
return opts;
}
static void hadoopRzOptionsClearCached(JNIEnv *env,
struct hadoopRzOptions *opts)
{
if (!opts->cachedEnumSet) {
return;
}
(*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
opts->cachedEnumSet = NULL;
}
int hadoopRzOptionsSetSkipChecksum(
struct hadoopRzOptions *opts, int skip)
{
JNIEnv *env;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return -1;
}
hadoopRzOptionsClearCached(env, opts);
opts->skipChecksums = !!skip;
return 0;
}
int hadoopRzOptionsSetByteBufferPool(
struct hadoopRzOptions *opts, const char *className)
{
JNIEnv *env;
jthrowable jthr;
jobject byteBufferPool = NULL;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return -1;
}
if (className) {
// Note: we don't have to call hadoopRzOptionsClearCached in this
// function, since the ByteBufferPool is passed separately from the
// EnumSet of ReadOptions.
jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
errno = EINVAL;
return -1;
}
}
if (opts->byteBufferPool) {
// Delete any previous ByteBufferPool we had.
(*env)->DeleteGlobalRef(env, opts->byteBufferPool);
}
opts->byteBufferPool = byteBufferPool;
return 0;
}
void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
{
JNIEnv *env;
env = getJNIEnv();
if (!env) {
return;
}
hadoopRzOptionsClearCached(env, opts);
if (opts->byteBufferPool) {
(*env)->DeleteGlobalRef(env, opts->byteBufferPool);
opts->byteBufferPool = NULL;
}
free(opts);
}
struct hadoopRzBuffer
{
jobject byteBuffer;
uint8_t *ptr;
int32_t length;
int direct;
};
static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
struct hadoopRzOptions *opts, jobject *enumSet)
{
jthrowable jthr = NULL;
jobject enumInst = NULL, enumSetObj = NULL;
jvalue jVal;
if (opts->cachedEnumSet) {
// If we cached the value, return it now.
*enumSet = opts->cachedEnumSet;
goto done;
}
if (opts->skipChecksums) {
jthr = fetchEnumInstance(env, READ_OPTION,
"SKIP_CHECKSUMS", &enumInst);
if (jthr) {
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL,
"java/util/EnumSet", "of",
"(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
if (jthr) {
goto done;
}
enumSetObj = jVal.l;
} else {
jclass clazz = (*env)->FindClass(env, READ_OPTION);
if (!clazz) {
jthr = newRuntimeError(env, "failed "
"to find class for %s", READ_OPTION);
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL,
"java/util/EnumSet", "noneOf",
"(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
enumSetObj = jVal.l;
}
// create global ref
opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
if (!opts->cachedEnumSet) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
*enumSet = opts->cachedEnumSet;
jthr = NULL;
done:
(*env)->DeleteLocalRef(env, enumInst);
(*env)->DeleteLocalRef(env, enumSetObj);
return jthr;
}
static int hadoopReadZeroExtractBuffer(JNIEnv *env,
const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
{
int ret;
jthrowable jthr;
jvalue jVal;
uint8_t *directStart;
void *mallocBuf = NULL;
jint position;
jarray array = NULL;
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "remaining", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
goto done;
}
buffer->length = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "position", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
goto done;
}
position = jVal.i;
directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
if (directStart) {
// Handle direct buffers.
buffer->ptr = directStart + position;
buffer->direct = 1;
ret = 0;
goto done;
}
// Handle indirect buffers.
// The JNI docs don't say that GetDirectBufferAddress throws any exceptions
// when it fails. However, they also don't clearly say that it doesn't. It
// seems safest to clear any pending exceptions here, to prevent problems on
// various JVMs.
(*env)->ExceptionClear(env);
if (!opts->byteBufferPool) {
fputs("hadoopReadZeroExtractBuffer: we read through the "
"zero-copy path, but failed to get the address of the buffer via "
"GetDirectBufferAddress. Please make sure your JVM supports "
"GetDirectBufferAddress.\n", stderr);
ret = ENOTSUP;
goto done;
}
// Get the backing array object of this buffer.
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "array", "()[B");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
goto done;
}
array = jVal.l;
if (!array) {
fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
stderr);
ret = EIO;
goto done;
}
mallocBuf = malloc(buffer->length);
if (!mallocBuf) {
fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
buffer->length);
ret = ENOMEM;
goto done;
}
(*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
goto done;
}
buffer->ptr = mallocBuf;
buffer->direct = 0;
ret = 0;
done:
free(mallocBuf);
(*env)->DeleteLocalRef(env, array);
return ret;
}
static int translateZCRException(JNIEnv *env, jthrowable exc)
{
int ret;
char *className = NULL;
jthrowable jthr = classNameOfObject(exc, env, &className);
if (jthr) {
fputs("hadoopReadZero: failed to get class name of "
"exception from read().\n", stderr);
destroyLocalReference(env, exc);
destroyLocalReference(env, jthr);
ret = EIO;
goto done;
}
if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
ret = EPROTONOSUPPORT;
goto done;
}
ret = printExceptionAndFree(env, exc, PRINT_EXC_ALL,
"hadoopZeroCopyRead: ZeroCopyCursor#read failed");
done:
free(className);
return ret;
}
struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
struct hadoopRzOptions *opts, int32_t maxLength)
{
JNIEnv *env;
jthrowable jthr = NULL;
jvalue jVal;
jobject enumSet = NULL, byteBuffer = NULL;
struct hadoopRzBuffer* buffer = NULL;
int ret;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return NULL;
}
if (file->type != INPUT) {
fputs("Cannot read from a non-InputStream object!\n", stderr);
ret = EINVAL;
goto done;
}
buffer = calloc(1, sizeof(struct hadoopRzBuffer));
if (!buffer) {
ret = ENOMEM;
goto done;
}
jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
"(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
"Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
if (jthr) {
ret = translateZCRException(env, jthr);
goto done;
}
byteBuffer = jVal.l;
if (!byteBuffer) {
buffer->byteBuffer = NULL;
buffer->length = 0;
buffer->ptr = NULL;
} else {
buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
if (!buffer->byteBuffer) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hadoopReadZero: failed to create global ref to ByteBuffer");
goto done;
}
ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
if (ret) {
goto done;
}
}
ret = 0;
done:
(*env)->DeleteLocalRef(env, byteBuffer);
if (ret) {
if (buffer) {
if (buffer->byteBuffer) {
(*env)->DeleteGlobalRef(env, buffer->byteBuffer);
}
free(buffer);
}
errno = ret;
return NULL;
} else {
errno = 0;
}
return buffer;
}
int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
{
return buffer->length;
}
const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
{
return buffer->ptr;
}
void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
{
jvalue jVal;
jthrowable jthr;
JNIEnv* env;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return;
}
if (buffer->byteBuffer) {
jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
HADOOP_ISTRM, "releaseBuffer",
"(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopRzBufferFree: releaseBuffer failed: ");
// even on error, we have to delete the reference.
}
(*env)->DeleteGlobalRef(env, buffer->byteBuffer);
}
if (!buffer->direct) {
free(buffer->ptr);
}
memset(buffer, 0, sizeof(*buffer));
free(buffer);
}
char***
hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
{
// JAVA EQUIVALENT:
// fs.getFileBlockLoctions(new Path(path), start, length);
jthrowable jthr;
jobject jPath = NULL;
jobject jFileStatus = NULL;
jvalue jFSVal, jVal;
jobjectArray jBlockLocations = NULL, jFileBlockHosts = NULL;
jstring jHost = NULL;
char*** blockHosts = NULL;
int i, j, ret;
jsize jNumFileBlocks = 0;
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
jobject jFS = (jobject)fs;
//Create an object of org.apache.hadoop.fs.Path
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s): constructNewObjectOfPath", path);
goto done;
}
jthr = invokeMethod(env, &jFSVal, INSTANCE, jFS,
HADOOP_FS, "getFileStatus", "(Lorg/apache/hadoop/fs/Path;)"
"Lorg/apache/hadoop/fs/FileStatus;", jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, NOPRINT_EXC_FILE_NOT_FOUND,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
"FileSystem#getFileStatus", path, start, length);
destroyLocalReference(env, jPath);
goto done;
}
jFileStatus = jFSVal.l;
//org.apache.hadoop.fs.FileSystem#getFileBlockLocations
jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
HADOOP_FS, "getFileBlockLocations",
"(Lorg/apache/hadoop/fs/FileStatus;JJ)"
"[Lorg/apache/hadoop/fs/BlockLocation;",
jFileStatus, start, length);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
"FileSystem#getFileBlockLocations", path, start, length);
goto done;
}
jBlockLocations = jVal.l;
//Figure out no of entries in jBlockLocations
//Allocate memory and add NULL at the end
jNumFileBlocks = (*env)->GetArrayLength(env, jBlockLocations);
blockHosts = calloc(jNumFileBlocks + 1, sizeof(char**));
if (blockHosts == NULL) {
ret = ENOMEM;
goto done;
}
if (jNumFileBlocks == 0) {
ret = 0;
goto done;
}
//Now parse each block to get hostnames
for (i = 0; i < jNumFileBlocks; ++i) {
jobject jFileBlock =
(*env)->GetObjectArrayElement(env, jBlockLocations, i);
if (!jFileBlock) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
"GetObjectArrayElement(%d)", path, start, length, i);
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFileBlock, HADOOP_BLK_LOC,
"getHosts", "()[Ljava/lang/String;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
"BlockLocation#getHosts", path, start, length);
goto done;
}
jFileBlockHosts = jVal.l;
if (!jFileBlockHosts) {
fprintf(stderr,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"):"
"BlockLocation#getHosts returned NULL", path, start, length);
ret = EINTERNAL;
goto done;
}
//Figure out no of hosts in jFileBlockHosts, and allocate the memory
jsize jNumBlockHosts = (*env)->GetArrayLength(env, jFileBlockHosts);
blockHosts[i] = calloc(jNumBlockHosts + 1, sizeof(char*));
if (!blockHosts[i]) {
ret = ENOMEM;
goto done;
}
//Now parse each hostname
const char *hostName;
for (j = 0; j < jNumBlockHosts; ++j) {
jHost = (*env)->GetObjectArrayElement(env, jFileBlockHosts, j);
if (!jHost) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64"): "
"NewByteArray", path, start, length);
goto done;
}
hostName =
(const char*)((*env)->GetStringUTFChars(env, jHost, NULL));
if (!hostName) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsGetHosts(path=%s, start=%"PRId64", length=%"PRId64", "
"j=%d out of %d): GetStringUTFChars",
path, start, length, j, jNumBlockHosts);
goto done;
}
blockHosts[i][j] = strdup(hostName);
(*env)->ReleaseStringUTFChars(env, jHost, hostName);
if (!blockHosts[i][j]) {
ret = ENOMEM;
goto done;
}
destroyLocalReference(env, jHost);
jHost = NULL;
}
destroyLocalReference(env, jFileBlockHosts);
jFileBlockHosts = NULL;
}
ret = 0;
done:
destroyLocalReference(env, jPath);
destroyLocalReference(env, jFileStatus);
destroyLocalReference(env, jBlockLocations);
destroyLocalReference(env, jFileBlockHosts);
destroyLocalReference(env, jHost);
if (ret) {
if (blockHosts) {
hdfsFreeHosts(blockHosts);
}
return NULL;
}
return blockHosts;
}
void hdfsFreeHosts(char ***blockHosts)
{
int i, j;
for (i=0; blockHosts[i]; i++) {
for (j=0; blockHosts[i][j]; j++) {
free(blockHosts[i][j]);
}
free(blockHosts[i]);
}
free(blockHosts);
}
tOffset hdfsGetDefaultBlockSize(hdfsFS fs)
{
// JAVA EQUIVALENT:
// fs.getDefaultBlockSize();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
//FileSystem#getDefaultBlockSize()
jvalue jVal;
jthrowable jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getDefaultBlockSize", "()J");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize");
return -1;
}
return jVal.j;
}
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path)
{
// JAVA EQUIVALENT:
// fs.getDefaultBlockSize(path);
jthrowable jthr;
jobject jFS = (jobject)fs;
jobject jPath;
tOffset blockSize;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath",
path);
return -1;
}
jthr = getDefaultBlockSize(env, jFS, jPath, &blockSize);
(*env)->DeleteLocalRef(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetDefaultBlockSize(path=%s): "
"FileSystem#getDefaultBlockSize", path);
return -1;
}
return blockSize;
}
tOffset hdfsGetCapacity(hdfsFS fs)
{
// JAVA EQUIVALENT:
// FsStatus fss = fs.getStatus();
// return Fss.getCapacity();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
//FileSystem#getStatus
jvalue jVal;
jthrowable jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetCapacity: FileSystem#getStatus");
return -1;
}
jobject fss = (jobject)jVal.l;
jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
"getCapacity", "()J");
destroyLocalReference(env, fss);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetCapacity: FsStatus#getCapacity");
return -1;
}
return jVal.j;
}
tOffset hdfsGetUsed(hdfsFS fs)
{
// JAVA EQUIVALENT:
// FsStatus fss = fs.getStatus();
// return Fss.getUsed();
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jFS = (jobject)fs;
//FileSystem#getStatus
jvalue jVal;
jthrowable jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"getStatus", "()Lorg/apache/hadoop/fs/FsStatus;");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetUsed: FileSystem#getStatus");
return -1;
}
jobject fss = (jobject)jVal.l;
jthr = invokeMethod(env, &jVal, INSTANCE, fss, HADOOP_FSSTATUS,
"getUsed", "()J");
destroyLocalReference(env, fss);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetUsed: FsStatus#getUsed");
return -1;
}
return jVal.j;
}
static jthrowable
getFileInfoFromStat(JNIEnv *env, jobject jStat, hdfsFileInfo *fileInfo)
{
jvalue jVal;
jthrowable jthr;
jobject jPath = NULL;
jstring jPathName = NULL;
jstring jUserName = NULL;
jstring jGroupName = NULL;
jobject jPermission = NULL;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "isDir", "()Z");
if (jthr)
goto done;
fileInfo->mKind = jVal.z ? kObjectKindDirectory : kObjectKindFile;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "getReplication", "()S");
if (jthr)
goto done;
fileInfo->mReplication = jVal.s;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "getBlockSize", "()J");
if (jthr)
goto done;
fileInfo->mBlockSize = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "getModificationTime", "()J");
if (jthr)
goto done;
fileInfo->mLastMod = jVal.j / 1000;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "getAccessTime", "()J");
if (jthr)
goto done;
fileInfo->mLastAccess = (tTime) (jVal.j / 1000);
if (fileInfo->mKind == kObjectKindFile) {
jthr = invokeMethod(env, &jVal, INSTANCE, jStat,
HADOOP_STAT, "getLen", "()J");
if (jthr)
goto done;
fileInfo->mSize = jVal.j;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
"getPath", "()Lorg/apache/hadoop/fs/Path;");
if (jthr)
goto done;
jPath = jVal.l;
if (jPath == NULL) {
jthr = newRuntimeError(env, "org.apache.hadoop.fs.FileStatus#"
"getPath returned NULL!");
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jPath, HADOOP_PATH,
"toString", "()Ljava/lang/String;");
if (jthr)
goto done;
jPathName = jVal.l;
const char *cPathName =
(const char*) ((*env)->GetStringUTFChars(env, jPathName, NULL));
if (!cPathName) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
fileInfo->mName = strdup(cPathName);
(*env)->ReleaseStringUTFChars(env, jPathName, cPathName);
jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
"getOwner", "()Ljava/lang/String;");
if (jthr)
goto done;
jUserName = jVal.l;
const char* cUserName =
(const char*) ((*env)->GetStringUTFChars(env, jUserName, NULL));
if (!cUserName) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
fileInfo->mOwner = strdup(cUserName);
(*env)->ReleaseStringUTFChars(env, jUserName, cUserName);
const char* cGroupName;
jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
"getGroup", "()Ljava/lang/String;");
if (jthr)
goto done;
jGroupName = jVal.l;
cGroupName = (const char*) ((*env)->GetStringUTFChars(env, jGroupName, NULL));
if (!cGroupName) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
fileInfo->mGroup = strdup(cGroupName);
(*env)->ReleaseStringUTFChars(env, jGroupName, cGroupName);
jthr = invokeMethod(env, &jVal, INSTANCE, jStat, HADOOP_STAT,
"getPermission",
"()Lorg/apache/hadoop/fs/permission/FsPermission;");
if (jthr)
goto done;
if (jVal.l == NULL) {
jthr = newRuntimeError(env, "%s#getPermission returned NULL!",
HADOOP_STAT);
goto done;
}
jPermission = jVal.l;
jthr = invokeMethod(env, &jVal, INSTANCE, jPermission, HADOOP_FSPERM,
"toShort", "()S");
if (jthr)
goto done;
fileInfo->mPermissions = jVal.s;
jthr = NULL;
done:
if (jthr)
hdfsFreeFileInfoEntry(fileInfo);
destroyLocalReference(env, jPath);
destroyLocalReference(env, jPathName);
destroyLocalReference(env, jUserName);
destroyLocalReference(env, jGroupName);
destroyLocalReference(env, jPermission);
destroyLocalReference(env, jPath);
return jthr;
}
static jthrowable
getFileInfo(JNIEnv *env, jobject jFS, jobject jPath, hdfsFileInfo **fileInfo)
{
// JAVA EQUIVALENT:
// fs.isDirectory(f)
// fs.getModificationTime()
// fs.getAccessTime()
// fs.getLength(f)
// f.getPath()
// f.getOwner()
// f.getGroup()
// f.getPermission().toShort()
jobject jStat;
jvalue jVal;
jthrowable jthr;
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_FS,
"exists", JMETHOD1(JPARAM(HADOOP_PATH), "Z"),
jPath);
if (jthr)
return jthr;
if (jVal.z == 0) {
*fileInfo = NULL;
return NULL;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS,
HADOOP_FS, "getFileStatus",
JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_STAT)), jPath);
if (jthr)
return jthr;
jStat = jVal.l;
*fileInfo = calloc(1, sizeof(hdfsFileInfo));
if (!*fileInfo) {
destroyLocalReference(env, jStat);
return newRuntimeError(env, "getFileInfo: OOM allocating hdfsFileInfo");
}
jthr = getFileInfoFromStat(env, jStat, *fileInfo);
destroyLocalReference(env, jStat);
return jthr;
}
hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries)
{
// JAVA EQUIVALENT:
// Path p(path);
// Path []pathList = fs.listPaths(p)
// foreach path in pathList
// getFileInfo(path)
jthrowable jthr;
jobject jPath = NULL;
hdfsFileInfo *pathList = NULL;
jobjectArray jPathList = NULL;
jvalue jVal;
jsize jPathListSize = 0;
int ret;
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
jobject jFS = (jobject)fs;
//Create an object of org.apache.hadoop.fs.Path
jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsListDirectory(%s): constructNewObjectOfPath", path);
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, jFS, HADOOP_DFS, "listStatus",
JMETHOD1(JPARAM(HADOOP_PATH), JARRPARAM(HADOOP_STAT)),
jPath);
if (jthr) {
ret = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsListDirectory(%s): FileSystem#listStatus", path);
goto done;
}
jPathList = jVal.l;
//Figure out the number of entries in that directory
jPathListSize = (*env)->GetArrayLength(env, jPathList);
if (jPathListSize == 0) {
ret = 0;
goto done;
}
//Allocate memory
pathList = calloc(jPathListSize, sizeof(hdfsFileInfo));
if (pathList == NULL) {
ret = ENOMEM;
goto done;
}
//Save path information in pathList
jsize i;
jobject tmpStat;
for (i=0; i < jPathListSize; ++i) {
tmpStat = (*env)->GetObjectArrayElement(env, jPathList, i);
if (!tmpStat) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)",
path, i, jPathListSize);
goto done;
}
jthr = getFileInfoFromStat(env, tmpStat, &pathList[i]);
destroyLocalReference(env, tmpStat);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)",
path, i, jPathListSize);
goto done;
}
}
ret = 0;
done:
destroyLocalReference(env, jPath);
destroyLocalReference(env, jPathList);
if (ret) {
hdfsFreeFileInfo(pathList, jPathListSize);
errno = ret;
return NULL;
}
*numEntries = jPathListSize;
return pathList;
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path)
{
// JAVA EQUIVALENT:
// File f(path);
// fs.isDirectory(f)
// fs.lastModified() ??
// fs.getLength(f)
// f.getPath()
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
jobject jFS = (jobject)fs;
//Create an object of org.apache.hadoop.fs.Path
jobject jPath;
jthrowable jthr = constructNewObjectOfPath(env, path, &jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsGetPathInfo(%s): constructNewObjectOfPath", path);
return NULL;
}
hdfsFileInfo *fileInfo;
jthr = getFileInfo(env, jFS, jPath, &fileInfo);
destroyLocalReference(env, jPath);
if (jthr) {
errno = printExceptionAndFree(env, jthr,
NOPRINT_EXC_ACCESS_CONTROL | NOPRINT_EXC_FILE_NOT_FOUND |
NOPRINT_EXC_UNRESOLVED_LINK,
"hdfsGetPathInfo(%s): getFileInfo", path);
return NULL;
}
if (!fileInfo) {
errno = ENOENT;
return NULL;
}
return fileInfo;
}
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo)
{
free(hdfsFileInfo->mName);
free(hdfsFileInfo->mOwner);
free(hdfsFileInfo->mGroup);
memset(hdfsFileInfo, 0, sizeof(hdfsFileInfo));
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
{
//Free the mName, mOwner, and mGroup
int i;
for (i=0; i < numEntries; ++i) {
hdfsFreeFileInfoEntry(hdfsFileInfo + i);
}
//Free entire block
free(hdfsFileInfo);
}
/**
* vim: ts=4: sw=4: et:
*/
Event Timeline
Log In to Comment