Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F103076975
hdfs.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Feb 27, 02:41
Size
89 KB
Mime Type
text/x-c
Expires
Sat, Mar 1, 02:41 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
24471581
Attached To
R3704 elastic-yarn
hdfs.c
View Options
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "exception.h"
#include "hdfs.h"
#include "jni_helper.h"
#include <inttypes.h>
#include <stdio.h>
#include <string.h>
/* Some frequently used Java paths */
#define HADOOP_CONF "org/apache/hadoop/conf/Configuration"
#define HADOOP_PATH "org/apache/hadoop/fs/Path"
#define HADOOP_LOCALFS "org/apache/hadoop/fs/LocalFileSystem"
#define HADOOP_FS "org/apache/hadoop/fs/FileSystem"
#define HADOOP_FSSTATUS "org/apache/hadoop/fs/FsStatus"
#define HADOOP_BLK_LOC "org/apache/hadoop/fs/BlockLocation"
#define HADOOP_DFS "org/apache/hadoop/hdfs/DistributedFileSystem"
#define HADOOP_ISTRM "org/apache/hadoop/fs/FSDataInputStream"
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
#define READ_OPTION "org/apache/hadoop/fs/ReadOption"
#define JAVA_VOID "V"
/* Macros for constructing method signatures */
#define JPARAM(X) "L" X ";"
#define JARRPARAM(X) "[L" X ";"
#define JMETHOD1(X, R) "(" X ")" R
#define JMETHOD2(X, Y, R) "(" X Y ")" R
#define JMETHOD3(X, Y, Z, R) "(" X Y Z")" R
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
tSize
readDirect
(
hdfsFS
fs
,
hdfsFile
f
,
void
*
buffer
,
tSize
length
);
static
void
hdfsFreeFileInfoEntry
(
hdfsFileInfo
*
hdfsFileInfo
);
/**
* The C equivalent of org.apache.org.hadoop.FSData(Input|Output)Stream .
*/
enum
hdfsStreamType
{
UNINITIALIZED
=
0
,
INPUT
=
1
,
OUTPUT
=
2
,
};
/**
* The 'file-handle' to a file in hdfs.
*/
struct
hdfsFile_internal
{
void
*
file
;
enum
hdfsStreamType
type
;
int
flags
;
};
int
hdfsFileIsOpenForRead
(
hdfsFile
file
)
{
return
(
file
->
type
==
INPUT
);
}
int
hdfsFileGetReadStatistics
(
hdfsFile
file
,
struct
hdfsReadStatistics
**
stats
)
{
jthrowable
jthr
;
jobject
readStats
=
NULL
;
jvalue
jVal
;
struct
hdfsReadStatistics
*
s
=
NULL
;
int
ret
;
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
if
(
file
->
type
!=
INPUT
)
{
ret
=
EINVAL
;
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
file
->
file
,
"org/apache/hadoop/hdfs/client/HdfsDataInputStream"
,
"getReadStatistics"
,
"()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFileGetReadStatistics: getReadStatistics failed"
);
goto
done
;
}
readStats
=
jVal
.
l
;
s
=
malloc
(
sizeof
(
struct
hdfsReadStatistics
));
if
(
!
s
)
{
ret
=
ENOMEM
;
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
readStats
,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics"
,
"getTotalBytesRead"
,
"()J"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFileGetReadStatistics: getTotalBytesRead failed"
);
goto
done
;
}
s
->
totalBytesRead
=
jVal
.
j
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
readStats
,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics"
,
"getTotalLocalBytesRead"
,
"()J"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFileGetReadStatistics: getTotalLocalBytesRead failed"
);
goto
done
;
}
s
->
totalLocalBytesRead
=
jVal
.
j
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
readStats
,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics"
,
"getTotalShortCircuitBytesRead"
,
"()J"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFileGetReadStatistics: getTotalShortCircuitBytesRead failed"
);
goto
done
;
}
s
->
totalShortCircuitBytesRead
=
jVal
.
j
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
readStats
,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics"
,
"getTotalZeroCopyBytesRead"
,
"()J"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed"
);
goto
done
;
}
s
->
totalZeroCopyBytesRead
=
jVal
.
j
;
*
stats
=
s
;
s
=
NULL
;
ret
=
0
;
done:
destroyLocalReference
(
env
,
readStats
);
free
(
s
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
int64_t
hdfsReadStatisticsGetRemoteBytesRead
(
const
struct
hdfsReadStatistics
*
stats
)
{
return
stats
->
totalBytesRead
-
stats
->
totalLocalBytesRead
;
}
void
hdfsFileFreeReadStatistics
(
struct
hdfsReadStatistics
*
stats
)
{
free
(
stats
);
}
int
hdfsFileIsOpenForWrite
(
hdfsFile
file
)
{
return
(
file
->
type
==
OUTPUT
);
}
int
hdfsFileUsesDirectRead
(
hdfsFile
file
)
{
return
!!
(
file
->
flags
&
HDFS_FILE_SUPPORTS_DIRECT_READ
);
}
void
hdfsFileDisableDirectRead
(
hdfsFile
file
)
{
file
->
flags
&=
~
HDFS_FILE_SUPPORTS_DIRECT_READ
;
}
int
hdfsDisableDomainSocketSecurity
(
void
)
{
jthrowable
jthr
;
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
NULL
,
STATIC
,
NULL
,
"org/apache/hadoop/net/unix/DomainSocket"
,
"disableBindPathValidation"
,
"()V"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"DomainSocket#disableBindPathValidation"
);
return
-
1
;
}
return
0
;
}
/**
* hdfsJniEnv: A wrapper struct to be used as 'value'
* while saving thread -> JNIEnv* mappings
*/
typedef
struct
{
JNIEnv
*
env
;
}
hdfsJniEnv
;
/**
* Helper function to create a org.apache.hadoop.fs.Path object.
* @param env: The JNIEnv pointer.
* @param path: The file-path for which to construct org.apache.hadoop.fs.Path
* object.
* @return Returns a jobject on success and NULL on error.
*/
static
jthrowable
constructNewObjectOfPath
(
JNIEnv
*
env
,
const
char
*
path
,
jobject
*
out
)
{
jthrowable
jthr
;
jstring
jPathString
;
jobject
jPath
;
//Construct a java.lang.String object
jthr
=
newJavaStr
(
env
,
path
,
&
jPathString
);
if
(
jthr
)
return
jthr
;
//Construct the org.apache.hadoop.fs.Path object
jthr
=
constructNewObjectOfClass
(
env
,
&
jPath
,
"org/apache/hadoop/fs/Path"
,
"(Ljava/lang/String;)V"
,
jPathString
);
destroyLocalReference
(
env
,
jPathString
);
if
(
jthr
)
return
jthr
;
*
out
=
jPath
;
return
NULL
;
}
static
jthrowable
hadoopConfGetStr
(
JNIEnv
*
env
,
jobject
jConfiguration
,
const
char
*
key
,
char
**
val
)
{
jthrowable
jthr
;
jvalue
jVal
;
jstring
jkey
=
NULL
,
jRet
=
NULL
;
jthr
=
newJavaStr
(
env
,
key
,
&
jkey
);
if
(
jthr
)
goto
done
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jConfiguration
,
HADOOP_CONF
,
"get"
,
JMETHOD1
(
JPARAM
(
JAVA_STRING
),
JPARAM
(
JAVA_STRING
)),
jkey
);
if
(
jthr
)
goto
done
;
jRet
=
jVal
.
l
;
jthr
=
newCStr
(
env
,
jRet
,
val
);
done:
destroyLocalReference
(
env
,
jkey
);
destroyLocalReference
(
env
,
jRet
);
return
jthr
;
}
int
hdfsConfGetStr
(
const
char
*
key
,
char
**
val
)
{
JNIEnv
*
env
;
int
ret
;
jthrowable
jthr
;
jobject
jConfiguration
=
NULL
;
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
ret
=
EINTERNAL
;
goto
done
;
}
jthr
=
constructNewObjectOfClass
(
env
,
&
jConfiguration
,
HADOOP_CONF
,
"()V"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsConfGetStr(%s): new Configuration"
,
key
);
goto
done
;
}
jthr
=
hadoopConfGetStr
(
env
,
jConfiguration
,
key
,
val
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsConfGetStr(%s): hadoopConfGetStr"
,
key
);
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jConfiguration
);
if
(
ret
)
errno
=
ret
;
return
ret
;
}
void
hdfsConfStrFree
(
char
*
val
)
{
free
(
val
);
}
static
jthrowable
hadoopConfGetInt
(
JNIEnv
*
env
,
jobject
jConfiguration
,
const
char
*
key
,
int32_t
*
val
)
{
jthrowable
jthr
=
NULL
;
jvalue
jVal
;
jstring
jkey
=
NULL
;
jthr
=
newJavaStr
(
env
,
key
,
&
jkey
);
if
(
jthr
)
return
jthr
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jConfiguration
,
HADOOP_CONF
,
"getInt"
,
JMETHOD2
(
JPARAM
(
JAVA_STRING
),
"I"
,
"I"
),
jkey
,
(
jint
)(
*
val
));
destroyLocalReference
(
env
,
jkey
);
if
(
jthr
)
return
jthr
;
*
val
=
jVal
.
i
;
return
NULL
;
}
int
hdfsConfGetInt
(
const
char
*
key
,
int32_t
*
val
)
{
JNIEnv
*
env
;
int
ret
;
jobject
jConfiguration
=
NULL
;
jthrowable
jthr
;
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
ret
=
EINTERNAL
;
goto
done
;
}
jthr
=
constructNewObjectOfClass
(
env
,
&
jConfiguration
,
HADOOP_CONF
,
"()V"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsConfGetInt(%s): new Configuration"
,
key
);
goto
done
;
}
jthr
=
hadoopConfGetInt
(
env
,
jConfiguration
,
key
,
val
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsConfGetInt(%s): hadoopConfGetInt"
,
key
);
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jConfiguration
);
if
(
ret
)
errno
=
ret
;
return
ret
;
}
struct
hdfsBuilderConfOpt
{
struct
hdfsBuilderConfOpt
*
next
;
const
char
*
key
;
const
char
*
val
;
};
struct
hdfsBuilder
{
int
forceNewInstance
;
const
char
*
nn
;
tPort
port
;
const
char
*
kerbTicketCachePath
;
const
char
*
userName
;
struct
hdfsBuilderConfOpt
*
opts
;
};
struct
hdfsBuilder
*
hdfsNewBuilder
(
void
)
{
struct
hdfsBuilder
*
bld
=
calloc
(
1
,
sizeof
(
struct
hdfsBuilder
));
if
(
!
bld
)
{
errno
=
ENOMEM
;
return
NULL
;
}
return
bld
;
}
int
hdfsBuilderConfSetStr
(
struct
hdfsBuilder
*
bld
,
const
char
*
key
,
const
char
*
val
)
{
struct
hdfsBuilderConfOpt
*
opt
,
*
next
;
opt
=
calloc
(
1
,
sizeof
(
struct
hdfsBuilderConfOpt
));
if
(
!
opt
)
return
-
ENOMEM
;
next
=
bld
->
opts
;
bld
->
opts
=
opt
;
opt
->
next
=
next
;
opt
->
key
=
key
;
opt
->
val
=
val
;
return
0
;
}
void
hdfsFreeBuilder
(
struct
hdfsBuilder
*
bld
)
{
struct
hdfsBuilderConfOpt
*
cur
,
*
next
;
cur
=
bld
->
opts
;
for
(
cur
=
bld
->
opts
;
cur
;
)
{
next
=
cur
->
next
;
free
(
cur
);
cur
=
next
;
}
free
(
bld
);
}
void
hdfsBuilderSetForceNewInstance
(
struct
hdfsBuilder
*
bld
)
{
bld
->
forceNewInstance
=
1
;
}
void
hdfsBuilderSetNameNode
(
struct
hdfsBuilder
*
bld
,
const
char
*
nn
)
{
bld
->
nn
=
nn
;
}
void
hdfsBuilderSetNameNodePort
(
struct
hdfsBuilder
*
bld
,
tPort
port
)
{
bld
->
port
=
port
;
}
void
hdfsBuilderSetUserName
(
struct
hdfsBuilder
*
bld
,
const
char
*
userName
)
{
bld
->
userName
=
userName
;
}
void
hdfsBuilderSetKerbTicketCachePath
(
struct
hdfsBuilder
*
bld
,
const
char
*
kerbTicketCachePath
)
{
bld
->
kerbTicketCachePath
=
kerbTicketCachePath
;
}
hdfsFS
hdfsConnect
(
const
char
*
host
,
tPort
port
)
{
struct
hdfsBuilder
*
bld
=
hdfsNewBuilder
();
if
(
!
bld
)
return
NULL
;
hdfsBuilderSetNameNode
(
bld
,
host
);
hdfsBuilderSetNameNodePort
(
bld
,
port
);
return
hdfsBuilderConnect
(
bld
);
}
/** Always return a new FileSystem handle */
hdfsFS
hdfsConnectNewInstance
(
const
char
*
host
,
tPort
port
)
{
struct
hdfsBuilder
*
bld
=
hdfsNewBuilder
();
if
(
!
bld
)
return
NULL
;
hdfsBuilderSetNameNode
(
bld
,
host
);
hdfsBuilderSetNameNodePort
(
bld
,
port
);
hdfsBuilderSetForceNewInstance
(
bld
);
return
hdfsBuilderConnect
(
bld
);
}
hdfsFS
hdfsConnectAsUser
(
const
char
*
host
,
tPort
port
,
const
char
*
user
)
{
struct
hdfsBuilder
*
bld
=
hdfsNewBuilder
();
if
(
!
bld
)
return
NULL
;
hdfsBuilderSetNameNode
(
bld
,
host
);
hdfsBuilderSetNameNodePort
(
bld
,
port
);
hdfsBuilderSetUserName
(
bld
,
user
);
return
hdfsBuilderConnect
(
bld
);
}
/** Always return a new FileSystem handle */
hdfsFS
hdfsConnectAsUserNewInstance
(
const
char
*
host
,
tPort
port
,
const
char
*
user
)
{
struct
hdfsBuilder
*
bld
=
hdfsNewBuilder
();
if
(
!
bld
)
return
NULL
;
hdfsBuilderSetNameNode
(
bld
,
host
);
hdfsBuilderSetNameNodePort
(
bld
,
port
);
hdfsBuilderSetForceNewInstance
(
bld
);
hdfsBuilderSetUserName
(
bld
,
user
);
return
hdfsBuilderConnect
(
bld
);
}
/**
* Calculate the effective URI to use, given a builder configuration.
*
* If there is not already a URI scheme, we prepend 'hdfs://'.
*
* If there is not already a port specified, and a port was given to the
* builder, we suffix that port. If there is a port specified but also one in
* the URI, that is an error.
*
* @param bld The hdfs builder object
* @param uri (out param) dynamically allocated string representing the
* effective URI
*
* @return 0 on success; error code otherwise
*/
static
int
calcEffectiveURI
(
struct
hdfsBuilder
*
bld
,
char
**
uri
)
{
const
char
*
scheme
;
char
suffix
[
64
];
const
char
*
lastColon
;
char
*
u
;
size_t
uriLen
;
if
(
!
bld
->
nn
)
return
EINVAL
;
scheme
=
(
strstr
(
bld
->
nn
,
"://"
))
?
""
:
"hdfs://"
;
if
(
bld
->
port
==
0
)
{
suffix
[
0
]
=
'\0'
;
}
else
{
lastColon
=
rindex
(
bld
->
nn
,
':'
);
if
(
lastColon
&&
(
strspn
(
lastColon
+
1
,
"0123456789"
)
==
strlen
(
lastColon
+
1
)))
{
fprintf
(
stderr
,
"port %d was given, but URI '%s' already "
"contains a port!
\n
"
,
bld
->
port
,
bld
->
nn
);
return
EINVAL
;
}
snprintf
(
suffix
,
sizeof
(
suffix
),
":%d"
,
bld
->
port
);
}
uriLen
=
strlen
(
scheme
)
+
strlen
(
bld
->
nn
)
+
strlen
(
suffix
);
u
=
malloc
((
uriLen
+
1
)
*
(
sizeof
(
char
)));
if
(
!
u
)
{
fprintf
(
stderr
,
"calcEffectiveURI: out of memory"
);
return
ENOMEM
;
}
snprintf
(
u
,
uriLen
+
1
,
"%s%s%s"
,
scheme
,
bld
->
nn
,
suffix
);
*
uri
=
u
;
return
0
;
}
static
const
char
*
maybeNull
(
const
char
*
str
)
{
return
str
?
str
:
"(NULL)"
;
}
static
const
char
*
hdfsBuilderToStr
(
const
struct
hdfsBuilder
*
bld
,
char
*
buf
,
size_t
bufLen
)
{
snprintf
(
buf
,
bufLen
,
"forceNewInstance=%d, nn=%s, port=%d, "
"kerbTicketCachePath=%s, userName=%s"
,
bld
->
forceNewInstance
,
maybeNull
(
bld
->
nn
),
bld
->
port
,
maybeNull
(
bld
->
kerbTicketCachePath
),
maybeNull
(
bld
->
userName
));
return
buf
;
}
hdfsFS
hdfsBuilderConnect
(
struct
hdfsBuilder
*
bld
)
{
JNIEnv
*
env
=
0
;
jobject
jConfiguration
=
NULL
,
jFS
=
NULL
,
jURI
=
NULL
,
jCachePath
=
NULL
;
jstring
jURIString
=
NULL
,
jUserString
=
NULL
;
jvalue
jVal
;
jthrowable
jthr
=
NULL
;
char
*
cURI
=
0
,
buf
[
512
];
int
ret
;
jobject
jRet
=
NULL
;
struct
hdfsBuilderConfOpt
*
opt
;
//Get the JNIEnv* corresponding to current thread
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
ret
=
EINTERNAL
;
goto
done
;
}
// jConfiguration = new Configuration();
jthr
=
constructNewObjectOfClass
(
env
,
&
jConfiguration
,
HADOOP_CONF
,
"()V"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
// set configuration values
for
(
opt
=
bld
->
opts
;
opt
;
opt
=
opt
->
next
)
{
jthr
=
hadoopConfSetStr
(
env
,
jConfiguration
,
opt
->
key
,
opt
->
val
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s): error setting conf '%s' to '%s'"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)),
opt
->
key
,
opt
->
val
);
goto
done
;
}
}
//Check what type of FileSystem the caller wants...
if
(
bld
->
nn
==
NULL
)
{
// Get a local filesystem.
if
(
bld
->
forceNewInstance
)
{
// fs = FileSytem#newInstanceLocal(conf);
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
HADOOP_FS
,
"newInstanceLocal"
,
JMETHOD1
(
JPARAM
(
HADOOP_CONF
),
JPARAM
(
HADOOP_LOCALFS
)),
jConfiguration
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jFS
=
jVal
.
l
;
}
else
{
// fs = FileSytem#getLocal(conf);
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
HADOOP_FS
,
"getLocal"
,
JMETHOD1
(
JPARAM
(
HADOOP_CONF
),
JPARAM
(
HADOOP_LOCALFS
)),
jConfiguration
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jFS
=
jVal
.
l
;
}
}
else
{
if
(
!
strcmp
(
bld
->
nn
,
"default"
))
{
// jURI = FileSystem.getDefaultUri(conf)
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
HADOOP_FS
,
"getDefaultUri"
,
"(Lorg/apache/hadoop/conf/Configuration;)Ljava/net/URI;"
,
jConfiguration
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jURI
=
jVal
.
l
;
}
else
{
// fs = FileSystem#get(URI, conf, ugi);
ret
=
calcEffectiveURI
(
bld
,
&
cURI
);
if
(
ret
)
goto
done
;
jthr
=
newJavaStr
(
env
,
cURI
,
&
jURIString
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
JAVA_NET_URI
,
"create"
,
"(Ljava/lang/String;)Ljava/net/URI;"
,
jURIString
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jURI
=
jVal
.
l
;
}
if
(
bld
->
kerbTicketCachePath
)
{
jthr
=
hadoopConfSetStr
(
env
,
jConfiguration
,
KERBEROS_TICKET_CACHE_PATH
,
bld
->
kerbTicketCachePath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
}
jthr
=
newJavaStr
(
env
,
bld
->
userName
,
&
jUserString
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
if
(
bld
->
forceNewInstance
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
HADOOP_FS
,
"newInstance"
,
JMETHOD3
(
JPARAM
(
JAVA_NET_URI
),
JPARAM
(
HADOOP_CONF
),
JPARAM
(
JAVA_STRING
),
JPARAM
(
HADOOP_FS
)),
jURI
,
jConfiguration
,
jUserString
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jFS
=
jVal
.
l
;
}
else
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
HADOOP_FS
,
"get"
,
JMETHOD3
(
JPARAM
(
JAVA_NET_URI
),
JPARAM
(
HADOOP_CONF
),
JPARAM
(
JAVA_STRING
),
JPARAM
(
HADOOP_FS
)),
jURI
,
jConfiguration
,
jUserString
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
jFS
=
jVal
.
l
;
}
}
jRet
=
(
*
env
)
->
NewGlobalRef
(
env
,
jFS
);
if
(
!
jRet
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsBuilderConnect(%s)"
,
hdfsBuilderToStr
(
bld
,
buf
,
sizeof
(
buf
)));
goto
done
;
}
ret
=
0
;
done:
// Release unnecessary local references
destroyLocalReference
(
env
,
jConfiguration
);
destroyLocalReference
(
env
,
jFS
);
destroyLocalReference
(
env
,
jURI
);
destroyLocalReference
(
env
,
jCachePath
);
destroyLocalReference
(
env
,
jURIString
);
destroyLocalReference
(
env
,
jUserString
);
free
(
cURI
);
hdfsFreeBuilder
(
bld
);
if
(
ret
)
{
errno
=
ret
;
return
NULL
;
}
return
(
hdfsFS
)
jRet
;
}
int
hdfsDisconnect
(
hdfsFS
fs
)
{
// JAVA EQUIVALENT:
// fs.close()
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
int
ret
;
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Parameters
jobject
jFS
=
(
jobject
)
fs
;
//Sanity check
if
(
fs
==
NULL
)
{
errno
=
EBADF
;
return
-
1
;
}
jthrowable
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"close"
,
"()V"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsDisconnect: FileSystem#close"
);
}
else
{
ret
=
0
;
}
(
*
env
)
->
DeleteGlobalRef
(
env
,
jFS
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
/**
* Get the default block size of a FileSystem object.
*
* @param env The Java env
* @param jFS The FileSystem object
* @param jPath The path to find the default blocksize at
* @param out (out param) the default block size
*
* @return NULL on success; or the exception
*/
static
jthrowable
getDefaultBlockSize
(
JNIEnv
*
env
,
jobject
jFS
,
jobject
jPath
,
jlong
*
out
)
{
jthrowable
jthr
;
jvalue
jVal
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getDefaultBlockSize"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
"J"
),
jPath
);
if
(
jthr
)
return
jthr
;
*
out
=
jVal
.
j
;
return
NULL
;
}
hdfsFile
hdfsOpenFile
(
hdfsFS
fs
,
const
char
*
path
,
int
flags
,
int
bufferSize
,
short
replication
,
tSize
blockSize
)
{
/*
JAVA EQUIVALENT:
File f = new File(path);
FSData{Input|Output}Stream f{is|os} = fs.create(f);
return f{is|os};
*/
/* Get the JNIEnv* corresponding to current thread */
JNIEnv
*
env
=
getJNIEnv
();
int
accmode
=
flags
&
O_ACCMODE
;
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
jstring
jStrBufferSize
=
NULL
,
jStrReplication
=
NULL
;
jobject
jConfiguration
=
NULL
,
jPath
=
NULL
,
jFile
=
NULL
;
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
jvalue
jVal
;
hdfsFile
file
=
NULL
;
int
ret
;
if
(
accmode
==
O_RDONLY
||
accmode
==
O_WRONLY
)
{
/* yay */
}
else
if
(
accmode
==
O_RDWR
)
{
fprintf
(
stderr
,
"ERROR: cannot open an hdfs file in O_RDWR mode
\n
"
);
errno
=
ENOTSUP
;
return
NULL
;
}
else
{
fprintf
(
stderr
,
"ERROR: cannot open an hdfs file in mode 0x%x
\n
"
,
accmode
);
errno
=
EINVAL
;
return
NULL
;
}
if
((
flags
&
O_CREAT
)
&&
(
flags
&
O_EXCL
))
{
fprintf
(
stderr
,
"WARN: hdfs does not truly support O_CREATE && O_EXCL
\n
"
);
}
/* The hadoop java api/signature */
const
char
*
method
=
NULL
;
const
char
*
signature
=
NULL
;
if
(
accmode
==
O_RDONLY
)
{
method
=
"open"
;
signature
=
JMETHOD2
(
JPARAM
(
HADOOP_PATH
),
"I"
,
JPARAM
(
HADOOP_ISTRM
));
}
else
if
(
flags
&
O_APPEND
)
{
method
=
"append"
;
signature
=
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
JPARAM
(
HADOOP_OSTRM
));
}
else
{
method
=
"create"
;
signature
=
JMETHOD2
(
JPARAM
(
HADOOP_PATH
),
"ZISJ"
,
JPARAM
(
HADOOP_OSTRM
));
}
/* Create an object of org.apache.hadoop.fs.Path */
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsOpenFile(%s): constructNewObjectOfPath"
,
path
);
goto
done
;
}
/* Get the Configuration object from the FileSystem object */
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getConf"
,
JMETHOD1
(
""
,
JPARAM
(
HADOOP_CONF
)));
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsOpenFile(%s): FileSystem#getConf"
,
path
);
goto
done
;
}
jConfiguration
=
jVal
.
l
;
jint
jBufferSize
=
bufferSize
;
jshort
jReplication
=
replication
;
jStrBufferSize
=
(
*
env
)
->
NewStringUTF
(
env
,
"io.file.buffer.size"
);
if
(
!
jStrBufferSize
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"OOM"
);
goto
done
;
}
jStrReplication
=
(
*
env
)
->
NewStringUTF
(
env
,
"dfs.replication"
);
if
(
!
jStrReplication
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"OOM"
);
goto
done
;
}
if
(
!
bufferSize
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jConfiguration
,
HADOOP_CONF
,
"getInt"
,
"(Ljava/lang/String;I)I"
,
jStrBufferSize
,
4096
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsOpenFile(%s): Configuration#getInt(io.file.buffer.size)"
,
path
);
goto
done
;
}
jBufferSize
=
jVal
.
i
;
}
if
((
accmode
==
O_WRONLY
)
&&
(
flags
&
O_APPEND
)
==
0
)
{
if
(
!
replication
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jConfiguration
,
HADOOP_CONF
,
"getInt"
,
"(Ljava/lang/String;I)I"
,
jStrReplication
,
1
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsOpenFile(%s): Configuration#getInt(dfs.replication)"
,
path
);
goto
done
;
}
jReplication
=
jVal
.
i
;
}
}
/* Create and return either the FSDataInputStream or
FSDataOutputStream references jobject jStream */
// READ?
if
(
accmode
==
O_RDONLY
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
method
,
signature
,
jPath
,
jBufferSize
);
}
else
if
((
accmode
==
O_WRONLY
)
&&
(
flags
&
O_APPEND
))
{
// WRITE/APPEND?
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
method
,
signature
,
jPath
);
}
else
{
// WRITE/CREATE
jboolean
jOverWrite
=
1
;
jlong
jBlockSize
=
blockSize
;
if
(
jBlockSize
==
0
)
{
jthr
=
getDefaultBlockSize
(
env
,
jFS
,
jPath
,
&
jBlockSize
);
if
(
jthr
)
{
ret
=
EIO
;
goto
done
;
}
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
method
,
signature
,
jPath
,
jOverWrite
,
jBufferSize
,
jReplication
,
jBlockSize
);
}
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsOpenFile(%s): FileSystem#%s(%s)"
,
path
,
method
,
signature
);
goto
done
;
}
jFile
=
jVal
.
l
;
file
=
calloc
(
1
,
sizeof
(
struct
hdfsFile_internal
));
if
(
!
file
)
{
fprintf
(
stderr
,
"hdfsOpenFile(%s): OOM create hdfsFile
\n
"
,
path
);
ret
=
ENOMEM
;
goto
done
;
}
file
->
file
=
(
*
env
)
->
NewGlobalRef
(
env
,
jFile
);
if
(
!
file
->
file
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsOpenFile(%s): NewGlobalRef"
,
path
);
goto
done
;
}
file
->
type
=
(((
flags
&
O_WRONLY
)
==
0
)
?
INPUT
:
OUTPUT
);
file
->
flags
=
0
;
if
((
flags
&
O_WRONLY
)
==
0
)
{
// Try a test read to see if we can do direct reads
char
buf
;
if
(
readDirect
(
fs
,
file
,
&
buf
,
0
)
==
0
)
{
// Success - 0-byte read should return 0
file
->
flags
|=
HDFS_FILE_SUPPORTS_DIRECT_READ
;
}
else
if
(
errno
!=
ENOTSUP
)
{
// Unexpected error. Clear it, don't set the direct flag.
fprintf
(
stderr
,
"hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
"for direct read compatibility
\n
"
,
path
,
errno
);
}
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jStrBufferSize
);
destroyLocalReference
(
env
,
jStrReplication
);
destroyLocalReference
(
env
,
jConfiguration
);
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jFile
);
if
(
ret
)
{
if
(
file
)
{
if
(
file
->
file
)
{
(
*
env
)
->
DeleteGlobalRef
(
env
,
file
->
file
);
}
free
(
file
);
}
errno
=
ret
;
return
NULL
;
}
return
file
;
}
int
hdfsCloseFile
(
hdfsFS
fs
,
hdfsFile
file
)
{
int
ret
;
// JAVA EQUIVALENT:
// file.close
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Caught exception
jthrowable
jthr
;
//Sanity check
if
(
!
file
||
file
->
type
==
UNINITIALIZED
)
{
errno
=
EBADF
;
return
-
1
;
}
//The interface whose 'close' method to be called
const
char
*
interface
=
(
file
->
type
==
INPUT
)
?
HADOOP_ISTRM
:
HADOOP_OSTRM
;
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
file
->
file
,
interface
,
"close"
,
"()V"
);
if
(
jthr
)
{
const
char
*
interfaceShortName
=
(
file
->
type
==
INPUT
)
?
"FSDataInputStream"
:
"FSDataOutputStream"
;
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"%s#close"
,
interfaceShortName
);
}
else
{
ret
=
0
;
}
//De-allocate memory
(
*
env
)
->
DeleteGlobalRef
(
env
,
file
->
file
);
free
(
file
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
int
hdfsExists
(
hdfsFS
fs
,
const
char
*
path
)
{
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jPath
;
jvalue
jVal
;
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
if
(
path
==
NULL
)
{
errno
=
EINVAL
;
return
-
1
;
}
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsExists: constructNewObjectOfPath"
);
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"exists"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
"Z"
),
jPath
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsExists: invokeMethod(%s)"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
"Z"
));
return
-
1
;
}
if
(
jVal
.
z
)
{
return
0
;
}
else
{
errno
=
ENOENT
;
return
-
1
;
}
}
// Checks input file for readiness for reading.
static
int
readPrepare
(
JNIEnv
*
env
,
hdfsFS
fs
,
hdfsFile
f
,
jobject
*
jInputStream
)
{
*
jInputStream
=
(
jobject
)(
f
?
f
->
file
:
NULL
);
//Sanity check
if
(
!
f
||
f
->
type
==
UNINITIALIZED
)
{
errno
=
EBADF
;
return
-
1
;
}
//Error checking... make sure that this file is 'readable'
if
(
f
->
type
!=
INPUT
)
{
fprintf
(
stderr
,
"Cannot read from a non-InputStream object!
\n
"
);
errno
=
EINVAL
;
return
-
1
;
}
return
0
;
}
tSize
hdfsRead
(
hdfsFS
fs
,
hdfsFile
f
,
void
*
buffer
,
tSize
length
)
{
if
(
length
==
0
)
{
return
0
;
}
else
if
(
length
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
f
->
flags
&
HDFS_FILE_SUPPORTS_DIRECT_READ
)
{
return
readDirect
(
fs
,
f
,
buffer
,
length
);
}
// JAVA EQUIVALENT:
// byte [] bR = new byte[length];
// fis.read(bR);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Parameters
jobject
jInputStream
;
if
(
readPrepare
(
env
,
fs
,
f
,
&
jInputStream
)
==
-
1
)
{
return
-
1
;
}
jbyteArray
jbRarray
;
jint
noReadBytes
=
length
;
jvalue
jVal
;
jthrowable
jthr
;
//Read the requisite bytes
jbRarray
=
(
*
env
)
->
NewByteArray
(
env
,
length
);
if
(
!
jbRarray
)
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsRead: NewByteArray"
);
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jInputStream
,
HADOOP_ISTRM
,
"read"
,
"([B)I"
,
jbRarray
);
if
(
jthr
)
{
destroyLocalReference
(
env
,
jbRarray
);
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsRead: FSDataInputStream#read"
);
return
-
1
;
}
if
(
jVal
.
i
<
0
)
{
// EOF
destroyLocalReference
(
env
,
jbRarray
);
return
0
;
}
else
if
(
jVal
.
i
==
0
)
{
destroyLocalReference
(
env
,
jbRarray
);
errno
=
EINTR
;
return
-
1
;
}
(
*
env
)
->
GetByteArrayRegion
(
env
,
jbRarray
,
0
,
noReadBytes
,
buffer
);
destroyLocalReference
(
env
,
jbRarray
);
if
((
*
env
)
->
ExceptionCheck
(
env
))
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsRead: GetByteArrayRegion"
);
return
-
1
;
}
return
jVal
.
i
;
}
// Reads using the read(ByteBuffer) API, which does fewer copies
tSize
readDirect
(
hdfsFS
fs
,
hdfsFile
f
,
void
*
buffer
,
tSize
length
)
{
// JAVA EQUIVALENT:
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
// fis.read(bbuffer);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jInputStream
;
if
(
readPrepare
(
env
,
fs
,
f
,
&
jInputStream
)
==
-
1
)
{
return
-
1
;
}
jvalue
jVal
;
jthrowable
jthr
;
//Read the requisite bytes
jobject
bb
=
(
*
env
)
->
NewDirectByteBuffer
(
env
,
buffer
,
length
);
if
(
bb
==
NULL
)
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"readDirect: NewDirectByteBuffer"
);
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jInputStream
,
HADOOP_ISTRM
,
"read"
,
"(Ljava/nio/ByteBuffer;)I"
,
bb
);
destroyLocalReference
(
env
,
bb
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"readDirect: FSDataInputStream#read"
);
return
-
1
;
}
return
(
jVal
.
i
<
0
)
?
0
:
jVal
.
i
;
}
tSize
hdfsPread
(
hdfsFS
fs
,
hdfsFile
f
,
tOffset
position
,
void
*
buffer
,
tSize
length
)
{
JNIEnv
*
env
;
jbyteArray
jbRarray
;
jvalue
jVal
;
jthrowable
jthr
;
if
(
length
==
0
)
{
return
0
;
}
else
if
(
length
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
!
f
||
f
->
type
==
UNINITIALIZED
)
{
errno
=
EBADF
;
return
-
1
;
}
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Error checking... make sure that this file is 'readable'
if
(
f
->
type
!=
INPUT
)
{
fprintf
(
stderr
,
"Cannot read from a non-InputStream object!
\n
"
);
errno
=
EINVAL
;
return
-
1
;
}
// JAVA EQUIVALENT:
// byte [] bR = new byte[length];
// fis.read(pos, bR, 0, length);
jbRarray
=
(
*
env
)
->
NewByteArray
(
env
,
length
);
if
(
!
jbRarray
)
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsPread: NewByteArray"
);
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
f
->
file
,
HADOOP_ISTRM
,
"read"
,
"(J[BII)I"
,
position
,
jbRarray
,
0
,
length
);
if
(
jthr
)
{
destroyLocalReference
(
env
,
jbRarray
);
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsPread: FSDataInputStream#read"
);
return
-
1
;
}
if
(
jVal
.
i
<
0
)
{
// EOF
destroyLocalReference
(
env
,
jbRarray
);
return
0
;
}
else
if
(
jVal
.
i
==
0
)
{
destroyLocalReference
(
env
,
jbRarray
);
errno
=
EINTR
;
return
-
1
;
}
(
*
env
)
->
GetByteArrayRegion
(
env
,
jbRarray
,
0
,
jVal
.
i
,
buffer
);
destroyLocalReference
(
env
,
jbRarray
);
if
((
*
env
)
->
ExceptionCheck
(
env
))
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsPread: GetByteArrayRegion"
);
return
-
1
;
}
return
jVal
.
i
;
}
tSize
hdfsWrite
(
hdfsFS
fs
,
hdfsFile
f
,
const
void
*
buffer
,
tSize
length
)
{
// JAVA EQUIVALENT
// byte b[] = str.getBytes();
// fso.write(b);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
==
UNINITIALIZED
)
{
errno
=
EBADF
;
return
-
1
;
}
jobject
jOutputStream
=
f
->
file
;
jbyteArray
jbWarray
;
jthrowable
jthr
;
if
(
length
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
//Error checking... make sure that this file is 'writable'
if
(
f
->
type
!=
OUTPUT
)
{
fprintf
(
stderr
,
"Cannot write into a non-OutputStream object!
\n
"
);
errno
=
EINVAL
;
return
-
1
;
}
if
(
length
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
length
==
0
)
{
return
0
;
}
//Write the requisite bytes into the file
jbWarray
=
(
*
env
)
->
NewByteArray
(
env
,
length
);
if
(
!
jbWarray
)
{
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsWrite: NewByteArray"
);
return
-
1
;
}
(
*
env
)
->
SetByteArrayRegion
(
env
,
jbWarray
,
0
,
length
,
buffer
);
if
((
*
env
)
->
ExceptionCheck
(
env
))
{
destroyLocalReference
(
env
,
jbWarray
);
errno
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsWrite(length = %d): SetByteArrayRegion"
,
length
);
return
-
1
;
}
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jOutputStream
,
HADOOP_OSTRM
,
"write"
,
"([B)V"
,
jbWarray
);
destroyLocalReference
(
env
,
jbWarray
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsWrite: FSDataOutputStream#write"
);
return
-
1
;
}
// Unlike most Java streams, FSDataOutputStream never does partial writes.
// If we succeeded, all the data was written.
return
length
;
}
int
hdfsSeek
(
hdfsFS
fs
,
hdfsFile
f
,
tOffset
desiredPos
)
{
// JAVA EQUIVALENT
// fis.seek(pos);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
!=
INPUT
)
{
errno
=
EBADF
;
return
-
1
;
}
jobject
jInputStream
=
f
->
file
;
jthrowable
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jInputStream
,
HADOOP_ISTRM
,
"seek"
,
"(J)V"
,
desiredPos
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsSeek(desiredPos=%"
PRId64
")"
": FSDataInputStream#seek"
,
desiredPos
);
return
-
1
;
}
return
0
;
}
tOffset
hdfsTell
(
hdfsFS
fs
,
hdfsFile
f
)
{
// JAVA EQUIVALENT
// pos = f.getPos();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
==
UNINITIALIZED
)
{
errno
=
EBADF
;
return
-
1
;
}
//Parameters
jobject
jStream
=
f
->
file
;
const
char
*
interface
=
(
f
->
type
==
INPUT
)
?
HADOOP_ISTRM
:
HADOOP_OSTRM
;
jvalue
jVal
;
jthrowable
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStream
,
interface
,
"getPos"
,
"()J"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsTell: %s#getPos"
,
((
f
->
type
==
INPUT
)
?
"FSDataInputStream"
:
"FSDataOutputStream"
));
return
-
1
;
}
return
jVal
.
j
;
}
int
hdfsFlush
(
hdfsFS
fs
,
hdfsFile
f
)
{
// JAVA EQUIVALENT
// fos.flush();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
!=
OUTPUT
)
{
errno
=
EBADF
;
return
-
1
;
}
jthrowable
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
f
->
file
,
HADOOP_OSTRM
,
"flush"
,
"()V"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsFlush: FSDataInputStream#flush"
);
return
-
1
;
}
return
0
;
}
int
hdfsHFlush
(
hdfsFS
fs
,
hdfsFile
f
)
{
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
!=
OUTPUT
)
{
errno
=
EBADF
;
return
-
1
;
}
jobject
jOutputStream
=
f
->
file
;
jthrowable
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jOutputStream
,
HADOOP_OSTRM
,
"hflush"
,
"()V"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsHFlush: FSDataOutputStream#hflush"
);
return
-
1
;
}
return
0
;
}
int
hdfsHSync
(
hdfsFS
fs
,
hdfsFile
f
)
{
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
!=
OUTPUT
)
{
errno
=
EBADF
;
return
-
1
;
}
jobject
jOutputStream
=
f
->
file
;
jthrowable
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jOutputStream
,
HADOOP_OSTRM
,
"hsync"
,
"()V"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsHSync: FSDataOutputStream#hsync"
);
return
-
1
;
}
return
0
;
}
int
hdfsAvailable
(
hdfsFS
fs
,
hdfsFile
f
)
{
// JAVA EQUIVALENT
// fis.available();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Sanity check
if
(
!
f
||
f
->
type
!=
INPUT
)
{
errno
=
EBADF
;
return
-
1
;
}
//Parameters
jobject
jInputStream
=
f
->
file
;
jvalue
jVal
;
jthrowable
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jInputStream
,
HADOOP_ISTRM
,
"available"
,
"()I"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsAvailable: FSDataInputStream#available"
);
return
-
1
;
}
return
jVal
.
i
;
}
static
int
hdfsCopyImpl
(
hdfsFS
srcFS
,
const
char
*
src
,
hdfsFS
dstFS
,
const
char
*
dst
,
jboolean
deleteSource
)
{
//JAVA EQUIVALENT
// FileUtil#copy(srcFS, srcPath, dstFS, dstPath,
// deleteSource = false, conf)
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
//Parameters
jobject
jSrcFS
=
(
jobject
)
srcFS
;
jobject
jDstFS
=
(
jobject
)
dstFS
;
jobject
jConfiguration
=
NULL
,
jSrcPath
=
NULL
,
jDstPath
=
NULL
;
jthrowable
jthr
;
jvalue
jVal
;
int
ret
;
jthr
=
constructNewObjectOfPath
(
env
,
src
,
&
jSrcPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsCopyImpl(src=%s): constructNewObjectOfPath"
,
src
);
goto
done
;
}
jthr
=
constructNewObjectOfPath
(
env
,
dst
,
&
jDstPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsCopyImpl(dst=%s): constructNewObjectOfPath"
,
dst
);
goto
done
;
}
//Create the org.apache.hadoop.conf.Configuration object
jthr
=
constructNewObjectOfClass
(
env
,
&
jConfiguration
,
HADOOP_CONF
,
"()V"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsCopyImpl: Configuration constructor"
);
goto
done
;
}
//FileUtil#copy
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
"org/apache/hadoop/fs/FileUtil"
,
"copy"
,
"(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;"
"ZLorg/apache/hadoop/conf/Configuration;)Z"
,
jSrcFS
,
jSrcPath
,
jDstFS
,
jDstPath
,
deleteSource
,
jConfiguration
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsCopyImpl(src=%s, dst=%s, deleteSource=%d): "
"FileUtil#copy"
,
src
,
dst
,
deleteSource
);
goto
done
;
}
if
(
!
jVal
.
z
)
{
ret
=
EIO
;
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jConfiguration
);
destroyLocalReference
(
env
,
jSrcPath
);
destroyLocalReference
(
env
,
jDstPath
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
int
hdfsCopy
(
hdfsFS
srcFS
,
const
char
*
src
,
hdfsFS
dstFS
,
const
char
*
dst
)
{
return
hdfsCopyImpl
(
srcFS
,
src
,
dstFS
,
dst
,
0
);
}
int
hdfsMove
(
hdfsFS
srcFS
,
const
char
*
src
,
hdfsFS
dstFS
,
const
char
*
dst
)
{
return
hdfsCopyImpl
(
srcFS
,
src
,
dstFS
,
dst
,
1
);
}
int
hdfsDelete
(
hdfsFS
fs
,
const
char
*
path
,
int
recursive
)
{
// JAVA EQUIVALENT:
// Path p = new Path(path);
// bool retval = fs.delete(p, recursive);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
jobject
jPath
;
jvalue
jVal
;
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsDelete(path=%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
jboolean
jRecursive
=
recursive
?
JNI_TRUE
:
JNI_FALSE
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"delete"
,
"(Lorg/apache/hadoop/fs/Path;Z)Z"
,
jPath
,
jRecursive
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsDelete(path=%s, recursive=%d): "
"FileSystem#delete"
,
path
,
recursive
);
return
-
1
;
}
if
(
!
jVal
.
z
)
{
errno
=
EIO
;
return
-
1
;
}
return
0
;
}
int
hdfsRename
(
hdfsFS
fs
,
const
char
*
oldPath
,
const
char
*
newPath
)
{
// JAVA EQUIVALENT:
// Path old = new Path(oldPath);
// Path new = new Path(newPath);
// fs.rename(old, new);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
jobject
jOldPath
=
NULL
,
jNewPath
=
NULL
;
int
ret
=
-
1
;
jvalue
jVal
;
jthr
=
constructNewObjectOfPath
(
env
,
oldPath
,
&
jOldPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsRename: constructNewObjectOfPath(%s)"
,
oldPath
);
goto
done
;
}
jthr
=
constructNewObjectOfPath
(
env
,
newPath
,
&
jNewPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsRename: constructNewObjectOfPath(%s)"
,
newPath
);
goto
done
;
}
// Rename the file
// TODO: use rename2 here? (See HDFS-3592)
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"rename"
,
JMETHOD2
(
JPARAM
(
HADOOP_PATH
),
JPARAM
(
HADOOP_PATH
),
"Z"
),
jOldPath
,
jNewPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsRename(oldPath=%s, newPath=%s): FileSystem#rename"
,
oldPath
,
newPath
);
goto
done
;
}
if
(
!
jVal
.
z
)
{
errno
=
EIO
;
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jOldPath
);
destroyLocalReference
(
env
,
jNewPath
);
return
ret
;
}
char
*
hdfsGetWorkingDirectory
(
hdfsFS
fs
,
char
*
buffer
,
size_t
bufferSize
)
{
// JAVA EQUIVALENT:
// Path p = fs.getWorkingDirectory();
// return p.toString()
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
jobject
jPath
=
NULL
;
jstring
jPathString
=
NULL
;
jobject
jFS
=
(
jobject
)
fs
;
jvalue
jVal
;
jthrowable
jthr
;
int
ret
;
const
char
*
jPathChars
=
NULL
;
//FileSystem#getWorkingDirectory()
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getWorkingDirectory"
,
"()Lorg/apache/hadoop/fs/Path;"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetWorkingDirectory: FileSystem#getWorkingDirectory"
);
goto
done
;
}
jPath
=
jVal
.
l
;
if
(
!
jPath
)
{
fprintf
(
stderr
,
"hdfsGetWorkingDirectory: "
"FileSystem#getWorkingDirectory returned NULL"
);
ret
=
-
EIO
;
goto
done
;
}
//Path#toString()
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jPath
,
"org/apache/hadoop/fs/Path"
,
"toString"
,
"()Ljava/lang/String;"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetWorkingDirectory: Path#toString"
);
goto
done
;
}
jPathString
=
jVal
.
l
;
jPathChars
=
(
*
env
)
->
GetStringUTFChars
(
env
,
jPathString
,
NULL
);
if
(
!
jPathChars
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsGetWorkingDirectory: GetStringUTFChars"
);
goto
done
;
}
//Copy to user-provided buffer
ret
=
snprintf
(
buffer
,
bufferSize
,
"%s"
,
jPathChars
);
if
(
ret
>=
bufferSize
)
{
ret
=
ENAMETOOLONG
;
goto
done
;
}
ret
=
0
;
done:
if
(
jPathChars
)
{
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jPathString
,
jPathChars
);
}
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jPathString
);
if
(
ret
)
{
errno
=
ret
;
return
NULL
;
}
return
buffer
;
}
int
hdfsSetWorkingDirectory
(
hdfsFS
fs
,
const
char
*
path
)
{
// JAVA EQUIVALENT:
// fs.setWorkingDirectory(Path(path));
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
jobject
jPath
;
//Create an object of org.apache.hadoop.fs.Path
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsSetWorkingDirectory(%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
//FileSystem#setWorkingDirectory()
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"setWorkingDirectory"
,
"(Lorg/apache/hadoop/fs/Path;)V"
,
jPath
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ILLEGAL_ARGUMENT
,
"hdfsSetWorkingDirectory(%s): FileSystem#setWorkingDirectory"
,
path
);
return
-
1
;
}
return
0
;
}
int
hdfsCreateDirectory
(
hdfsFS
fs
,
const
char
*
path
)
{
// JAVA EQUIVALENT:
// fs.mkdirs(new Path(path));
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
jobject
jPath
;
jthrowable
jthr
;
//Create an object of org.apache.hadoop.fs.Path
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsCreateDirectory(%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
//Create the directory
jvalue
jVal
;
jVal
.
z
=
0
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"mkdirs"
,
"(Lorg/apache/hadoop/fs/Path;)Z"
,
jPath
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
|
NOPRINT_EXC_PARENT_NOT_DIRECTORY
,
"hdfsCreateDirectory(%s): FileSystem#mkdirs"
,
path
);
return
-
1
;
}
if
(
!
jVal
.
z
)
{
// It's unclear under exactly which conditions FileSystem#mkdirs
// is supposed to return false (as opposed to throwing an exception.)
// It seems like the current code never actually returns false.
// So we're going to translate this to EIO, since there seems to be
// nothing more specific we can do with it.
errno
=
EIO
;
return
-
1
;
}
return
0
;
}
int
hdfsSetReplication
(
hdfsFS
fs
,
const
char
*
path
,
int16_t
replication
)
{
// JAVA EQUIVALENT:
// fs.setReplication(new Path(path), replication);
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
jthrowable
jthr
;
//Create an object of org.apache.hadoop.fs.Path
jobject
jPath
;
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsSetReplication(path=%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
//Create the directory
jvalue
jVal
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"setReplication"
,
"(Lorg/apache/hadoop/fs/Path;S)Z"
,
jPath
,
replication
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsSetReplication(path=%s, replication=%d): "
"FileSystem#setReplication"
,
path
,
replication
);
return
-
1
;
}
if
(
!
jVal
.
z
)
{
// setReplication returns false "if file does not exist or is a
// directory." So the nearest translation to that is ENOENT.
errno
=
ENOENT
;
return
-
1
;
}
return
0
;
}
int
hdfsChown
(
hdfsFS
fs
,
const
char
*
path
,
const
char
*
owner
,
const
char
*
group
)
{
// JAVA EQUIVALENT:
// fs.setOwner(path, owner, group)
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
if
(
owner
==
NULL
&&
group
==
NULL
)
{
return
0
;
}
jobject
jFS
=
(
jobject
)
fs
;
jobject
jPath
=
NULL
;
jstring
jOwner
=
NULL
,
jGroup
=
NULL
;
jthrowable
jthr
;
int
ret
;
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsChown(path=%s): constructNewObjectOfPath"
,
path
);
goto
done
;
}
jthr
=
newJavaStr
(
env
,
owner
,
&
jOwner
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsChown(path=%s): newJavaStr(%s)"
,
path
,
owner
);
goto
done
;
}
jthr
=
newJavaStr
(
env
,
group
,
&
jGroup
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsChown(path=%s): newJavaStr(%s)"
,
path
,
group
);
goto
done
;
}
//Create the directory
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"setOwner"
,
JMETHOD3
(
JPARAM
(
HADOOP_PATH
),
JPARAM
(
JAVA_STRING
),
JPARAM
(
JAVA_STRING
),
JAVA_VOID
),
jPath
,
jOwner
,
jGroup
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsChown(path=%s, owner=%s, group=%s): "
"FileSystem#setOwner"
,
path
,
owner
,
group
);
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jOwner
);
destroyLocalReference
(
env
,
jGroup
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
int
hdfsChmod
(
hdfsFS
fs
,
const
char
*
path
,
short
mode
)
{
int
ret
;
// JAVA EQUIVALENT:
// fs.setPermission(path, FsPermission)
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jthrowable
jthr
;
jobject
jPath
=
NULL
,
jPermObj
=
NULL
;
jobject
jFS
=
(
jobject
)
fs
;
// construct jPerm = FsPermission.createImmutable(short mode);
jshort
jmode
=
mode
;
jthr
=
constructNewObjectOfClass
(
env
,
&
jPermObj
,
HADOOP_FSPERM
,
"(S)V"
,
jmode
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"constructNewObjectOfClass(%s)"
,
HADOOP_FSPERM
);
return
-
1
;
}
//Create an object of org.apache.hadoop.fs.Path
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsChmod(%s): constructNewObjectOfPath"
,
path
);
goto
done
;
}
//Create the directory
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"setPermission"
,
JMETHOD2
(
JPARAM
(
HADOOP_PATH
),
JPARAM
(
HADOOP_FSPERM
),
JAVA_VOID
),
jPath
,
jPermObj
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsChmod(%s): FileSystem#setPermission"
,
path
);
goto
done
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jPermObj
);
if
(
ret
)
{
errno
=
ret
;
return
-
1
;
}
return
0
;
}
int
hdfsUtime
(
hdfsFS
fs
,
const
char
*
path
,
tTime
mtime
,
tTime
atime
)
{
// JAVA EQUIVALENT:
// fs.setTimes(src, mtime, atime)
jthrowable
jthr
;
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
//Create an object of org.apache.hadoop.fs.Path
jobject
jPath
;
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsUtime(path=%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
const
tTime
NO_CHANGE
=
-
1
;
jlong
jmtime
=
(
mtime
==
NO_CHANGE
)
?
-
1
:
(
mtime
*
(
jlong
)
1000
);
jlong
jatime
=
(
atime
==
NO_CHANGE
)
?
-
1
:
(
atime
*
(
jlong
)
1000
);
jthr
=
invokeMethod
(
env
,
NULL
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"setTimes"
,
JMETHOD3
(
JPARAM
(
HADOOP_PATH
),
"J"
,
"J"
,
JAVA_VOID
),
jPath
,
jmtime
,
jatime
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsUtime(path=%s): FileSystem#setTimes"
,
path
);
return
-
1
;
}
return
0
;
}
/**
* Zero-copy options.
*
* We cache the EnumSet of ReadOptions which has to be passed into every
* readZero call, to avoid reconstructing it each time. This cache is cleared
* whenever an element changes.
*/
struct
hadoopRzOptions
{
JNIEnv
*
env
;
int
skipChecksums
;
jobject
byteBufferPool
;
jobject
cachedEnumSet
;
};
struct
hadoopRzOptions
*
hadoopRzOptionsAlloc
(
void
)
{
struct
hadoopRzOptions
*
opts
;
JNIEnv
*
env
;
env
=
getJNIEnv
();
if
(
!
env
)
{
// Check to make sure the JNI environment is set up properly.
errno
=
EINTERNAL
;
return
NULL
;
}
opts
=
calloc
(
1
,
sizeof
(
struct
hadoopRzOptions
));
if
(
!
opts
)
{
errno
=
ENOMEM
;
return
NULL
;
}
return
opts
;
}
static
void
hadoopRzOptionsClearCached
(
JNIEnv
*
env
,
struct
hadoopRzOptions
*
opts
)
{
if
(
!
opts
->
cachedEnumSet
)
{
return
;
}
(
*
env
)
->
DeleteGlobalRef
(
env
,
opts
->
cachedEnumSet
);
opts
->
cachedEnumSet
=
NULL
;
}
int
hadoopRzOptionsSetSkipChecksum
(
struct
hadoopRzOptions
*
opts
,
int
skip
)
{
JNIEnv
*
env
;
env
=
getJNIEnv
();
if
(
!
env
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
hadoopRzOptionsClearCached
(
env
,
opts
);
opts
->
skipChecksums
=
!!
skip
;
return
0
;
}
int
hadoopRzOptionsSetByteBufferPool
(
struct
hadoopRzOptions
*
opts
,
const
char
*
className
)
{
JNIEnv
*
env
;
jthrowable
jthr
;
jobject
byteBufferPool
=
NULL
;
env
=
getJNIEnv
();
if
(
!
env
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
if
(
className
)
{
// Note: we don't have to call hadoopRzOptionsClearCached in this
// function, since the ByteBufferPool is passed separately from the
// EnumSet of ReadOptions.
jthr
=
constructNewObjectOfClass
(
env
,
&
byteBufferPool
,
className
,
"()V"
);
if
(
jthr
)
{
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopRzOptionsSetByteBufferPool(className=%s): "
,
className
);
errno
=
EINVAL
;
return
-
1
;
}
}
if
(
opts
->
byteBufferPool
)
{
// Delete any previous ByteBufferPool we had.
(
*
env
)
->
DeleteGlobalRef
(
env
,
opts
->
byteBufferPool
);
}
opts
->
byteBufferPool
=
byteBufferPool
;
return
0
;
}
void
hadoopRzOptionsFree
(
struct
hadoopRzOptions
*
opts
)
{
JNIEnv
*
env
;
env
=
getJNIEnv
();
if
(
!
env
)
{
return
;
}
hadoopRzOptionsClearCached
(
env
,
opts
);
if
(
opts
->
byteBufferPool
)
{
(
*
env
)
->
DeleteGlobalRef
(
env
,
opts
->
byteBufferPool
);
opts
->
byteBufferPool
=
NULL
;
}
free
(
opts
);
}
struct
hadoopRzBuffer
{
jobject
byteBuffer
;
uint8_t
*
ptr
;
int32_t
length
;
int
direct
;
};
static
jthrowable
hadoopRzOptionsGetEnumSet
(
JNIEnv
*
env
,
struct
hadoopRzOptions
*
opts
,
jobject
*
enumSet
)
{
jthrowable
jthr
=
NULL
;
jobject
enumInst
=
NULL
,
enumSetObj
=
NULL
;
jvalue
jVal
;
if
(
opts
->
cachedEnumSet
)
{
// If we cached the value, return it now.
*
enumSet
=
opts
->
cachedEnumSet
;
goto
done
;
}
if
(
opts
->
skipChecksums
)
{
jthr
=
fetchEnumInstance
(
env
,
READ_OPTION
,
"SKIP_CHECKSUMS"
,
&
enumInst
);
if
(
jthr
)
{
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
"java/util/EnumSet"
,
"of"
,
"(Ljava/lang/Enum;)Ljava/util/EnumSet;"
,
enumInst
);
if
(
jthr
)
{
goto
done
;
}
enumSetObj
=
jVal
.
l
;
}
else
{
jclass
clazz
=
(
*
env
)
->
FindClass
(
env
,
READ_OPTION
);
if
(
!
clazz
)
{
jthr
=
newRuntimeError
(
env
,
"failed "
"to find class for %s"
,
READ_OPTION
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
STATIC
,
NULL
,
"java/util/EnumSet"
,
"noneOf"
,
"(Ljava/lang/Class;)Ljava/util/EnumSet;"
,
clazz
);
enumSetObj
=
jVal
.
l
;
}
// create global ref
opts
->
cachedEnumSet
=
(
*
env
)
->
NewGlobalRef
(
env
,
enumSetObj
);
if
(
!
opts
->
cachedEnumSet
)
{
jthr
=
getPendingExceptionAndClear
(
env
);
goto
done
;
}
*
enumSet
=
opts
->
cachedEnumSet
;
jthr
=
NULL
;
done:
(
*
env
)
->
DeleteLocalRef
(
env
,
enumInst
);
(
*
env
)
->
DeleteLocalRef
(
env
,
enumSetObj
);
return
jthr
;
}
static
int
hadoopReadZeroExtractBuffer
(
JNIEnv
*
env
,
const
struct
hadoopRzOptions
*
opts
,
struct
hadoopRzBuffer
*
buffer
)
{
int
ret
;
jthrowable
jthr
;
jvalue
jVal
;
uint8_t
*
directStart
;
void
*
mallocBuf
=
NULL
;
jint
position
;
jarray
array
=
NULL
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
buffer
->
byteBuffer
,
"java/nio/ByteBuffer"
,
"remaining"
,
"()I"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: "
);
goto
done
;
}
buffer
->
length
=
jVal
.
i
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
buffer
->
byteBuffer
,
"java/nio/ByteBuffer"
,
"position"
,
"()I"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopReadZeroExtractBuffer: ByteBuffer#position failed: "
);
goto
done
;
}
position
=
jVal
.
i
;
directStart
=
(
*
env
)
->
GetDirectBufferAddress
(
env
,
buffer
->
byteBuffer
);
if
(
directStart
)
{
// Handle direct buffers.
buffer
->
ptr
=
directStart
+
position
;
buffer
->
direct
=
1
;
ret
=
0
;
goto
done
;
}
// Handle indirect buffers.
// The JNI docs don't say that GetDirectBufferAddress throws any exceptions
// when it fails. However, they also don't clearly say that it doesn't. It
// seems safest to clear any pending exceptions here, to prevent problems on
// various JVMs.
(
*
env
)
->
ExceptionClear
(
env
);
if
(
!
opts
->
byteBufferPool
)
{
fputs
(
"hadoopReadZeroExtractBuffer: we read through the "
"zero-copy path, but failed to get the address of the buffer via "
"GetDirectBufferAddress. Please make sure your JVM supports "
"GetDirectBufferAddress.
\n
"
,
stderr
);
ret
=
ENOTSUP
;
goto
done
;
}
// Get the backing array object of this buffer.
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
buffer
->
byteBuffer
,
"java/nio/ByteBuffer"
,
"array"
,
"()[B"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopReadZeroExtractBuffer: ByteBuffer#array failed: "
);
goto
done
;
}
array
=
jVal
.
l
;
if
(
!
array
)
{
fputs
(
"hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL."
,
stderr
);
ret
=
EIO
;
goto
done
;
}
mallocBuf
=
malloc
(
buffer
->
length
);
if
(
!
mallocBuf
)
{
fprintf
(
stderr
,
"hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory
\n
"
,
buffer
->
length
);
ret
=
ENOMEM
;
goto
done
;
}
(
*
env
)
->
GetByteArrayRegion
(
env
,
array
,
position
,
buffer
->
length
,
mallocBuf
);
jthr
=
(
*
env
)
->
ExceptionOccurred
(
env
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: "
);
goto
done
;
}
buffer
->
ptr
=
mallocBuf
;
buffer
->
direct
=
0
;
ret
=
0
;
done:
free
(
mallocBuf
);
(
*
env
)
->
DeleteLocalRef
(
env
,
array
);
return
ret
;
}
static
int
translateZCRException
(
JNIEnv
*
env
,
jthrowable
exc
)
{
int
ret
;
char
*
className
=
NULL
;
jthrowable
jthr
=
classNameOfObject
(
exc
,
env
,
&
className
);
if
(
jthr
)
{
fputs
(
"hadoopReadZero: failed to get class name of "
"exception from read().
\n
"
,
stderr
);
destroyLocalReference
(
env
,
exc
);
destroyLocalReference
(
env
,
jthr
);
ret
=
EIO
;
goto
done
;
}
if
(
!
strcmp
(
className
,
"java.lang.UnsupportedOperationException"
))
{
ret
=
EPROTONOSUPPORT
;
goto
done
;
}
ret
=
printExceptionAndFree
(
env
,
exc
,
PRINT_EXC_ALL
,
"hadoopZeroCopyRead: ZeroCopyCursor#read failed"
);
done:
free
(
className
);
return
ret
;
}
struct
hadoopRzBuffer
*
hadoopReadZero
(
hdfsFile
file
,
struct
hadoopRzOptions
*
opts
,
int32_t
maxLength
)
{
JNIEnv
*
env
;
jthrowable
jthr
=
NULL
;
jvalue
jVal
;
jobject
enumSet
=
NULL
,
byteBuffer
=
NULL
;
struct
hadoopRzBuffer
*
buffer
=
NULL
;
int
ret
;
env
=
getJNIEnv
();
if
(
!
env
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
if
(
file
->
type
!=
INPUT
)
{
fputs
(
"Cannot read from a non-InputStream object!
\n
"
,
stderr
);
ret
=
EINVAL
;
goto
done
;
}
buffer
=
calloc
(
1
,
sizeof
(
struct
hadoopRzBuffer
));
if
(
!
buffer
)
{
ret
=
ENOMEM
;
goto
done
;
}
jthr
=
hadoopRzOptionsGetEnumSet
(
env
,
opts
,
&
enumSet
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopReadZero: hadoopRzOptionsGetEnumSet failed: "
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
file
->
file
,
HADOOP_ISTRM
,
"read"
,
"(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
"Ljava/nio/ByteBuffer;"
,
opts
->
byteBufferPool
,
maxLength
,
enumSet
);
if
(
jthr
)
{
ret
=
translateZCRException
(
env
,
jthr
);
goto
done
;
}
byteBuffer
=
jVal
.
l
;
if
(
!
byteBuffer
)
{
buffer
->
byteBuffer
=
NULL
;
buffer
->
length
=
0
;
buffer
->
ptr
=
NULL
;
}
else
{
buffer
->
byteBuffer
=
(
*
env
)
->
NewGlobalRef
(
env
,
byteBuffer
);
if
(
!
buffer
->
byteBuffer
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hadoopReadZero: failed to create global ref to ByteBuffer"
);
goto
done
;
}
ret
=
hadoopReadZeroExtractBuffer
(
env
,
opts
,
buffer
);
if
(
ret
)
{
goto
done
;
}
}
ret
=
0
;
done:
(
*
env
)
->
DeleteLocalRef
(
env
,
byteBuffer
);
if
(
ret
)
{
if
(
buffer
)
{
if
(
buffer
->
byteBuffer
)
{
(
*
env
)
->
DeleteGlobalRef
(
env
,
buffer
->
byteBuffer
);
}
free
(
buffer
);
}
errno
=
ret
;
return
NULL
;
}
else
{
errno
=
0
;
}
return
buffer
;
}
int32_t
hadoopRzBufferLength
(
const
struct
hadoopRzBuffer
*
buffer
)
{
return
buffer
->
length
;
}
const
void
*
hadoopRzBufferGet
(
const
struct
hadoopRzBuffer
*
buffer
)
{
return
buffer
->
ptr
;
}
void
hadoopRzBufferFree
(
hdfsFile
file
,
struct
hadoopRzBuffer
*
buffer
)
{
jvalue
jVal
;
jthrowable
jthr
;
JNIEnv
*
env
;
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
;
}
if
(
buffer
->
byteBuffer
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
file
->
file
,
HADOOP_ISTRM
,
"releaseBuffer"
,
"(Ljava/nio/ByteBuffer;)V"
,
buffer
->
byteBuffer
);
if
(
jthr
)
{
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hadoopRzBufferFree: releaseBuffer failed: "
);
// even on error, we have to delete the reference.
}
(
*
env
)
->
DeleteGlobalRef
(
env
,
buffer
->
byteBuffer
);
}
if
(
!
buffer
->
direct
)
{
free
(
buffer
->
ptr
);
}
memset
(
buffer
,
0
,
sizeof
(
*
buffer
));
free
(
buffer
);
}
char
***
hdfsGetHosts
(
hdfsFS
fs
,
const
char
*
path
,
tOffset
start
,
tOffset
length
)
{
// JAVA EQUIVALENT:
// fs.getFileBlockLoctions(new Path(path), start, length);
jthrowable
jthr
;
jobject
jPath
=
NULL
;
jobject
jFileStatus
=
NULL
;
jvalue
jFSVal
,
jVal
;
jobjectArray
jBlockLocations
=
NULL
,
jFileBlockHosts
=
NULL
;
jstring
jHost
=
NULL
;
char
***
blockHosts
=
NULL
;
int
i
,
j
,
ret
;
jsize
jNumFileBlocks
=
0
;
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
jobject
jFS
=
(
jobject
)
fs
;
//Create an object of org.apache.hadoop.fs.Path
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s): constructNewObjectOfPath"
,
path
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jFSVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getFileStatus"
,
"(Lorg/apache/hadoop/fs/Path;)"
"Lorg/apache/hadoop/fs/FileStatus;"
,
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_FILE_NOT_FOUND
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"):"
"FileSystem#getFileStatus"
,
path
,
start
,
length
);
destroyLocalReference
(
env
,
jPath
);
goto
done
;
}
jFileStatus
=
jFSVal
.
l
;
//org.apache.hadoop.fs.FileSystem#getFileBlockLocations
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getFileBlockLocations"
,
"(Lorg/apache/hadoop/fs/FileStatus;JJ)"
"[Lorg/apache/hadoop/fs/BlockLocation;"
,
jFileStatus
,
start
,
length
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"):"
"FileSystem#getFileBlockLocations"
,
path
,
start
,
length
);
goto
done
;
}
jBlockLocations
=
jVal
.
l
;
//Figure out no of entries in jBlockLocations
//Allocate memory and add NULL at the end
jNumFileBlocks
=
(
*
env
)
->
GetArrayLength
(
env
,
jBlockLocations
);
blockHosts
=
calloc
(
jNumFileBlocks
+
1
,
sizeof
(
char
**
));
if
(
blockHosts
==
NULL
)
{
ret
=
ENOMEM
;
goto
done
;
}
if
(
jNumFileBlocks
==
0
)
{
ret
=
0
;
goto
done
;
}
//Now parse each block to get hostnames
for
(
i
=
0
;
i
<
jNumFileBlocks
;
++
i
)
{
jobject
jFileBlock
=
(
*
env
)
->
GetObjectArrayElement
(
env
,
jBlockLocations
,
i
);
if
(
!
jFileBlock
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"):"
"GetObjectArrayElement(%d)"
,
path
,
start
,
length
,
i
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFileBlock
,
HADOOP_BLK_LOC
,
"getHosts"
,
"()[Ljava/lang/String;"
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"):"
"BlockLocation#getHosts"
,
path
,
start
,
length
);
goto
done
;
}
jFileBlockHosts
=
jVal
.
l
;
if
(
!
jFileBlockHosts
)
{
fprintf
(
stderr
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"):"
"BlockLocation#getHosts returned NULL"
,
path
,
start
,
length
);
ret
=
EINTERNAL
;
goto
done
;
}
//Figure out no of hosts in jFileBlockHosts, and allocate the memory
jsize
jNumBlockHosts
=
(
*
env
)
->
GetArrayLength
(
env
,
jFileBlockHosts
);
blockHosts
[
i
]
=
calloc
(
jNumBlockHosts
+
1
,
sizeof
(
char
*
));
if
(
!
blockHosts
[
i
])
{
ret
=
ENOMEM
;
goto
done
;
}
//Now parse each hostname
const
char
*
hostName
;
for
(
j
=
0
;
j
<
jNumBlockHosts
;
++
j
)
{
jHost
=
(
*
env
)
->
GetObjectArrayElement
(
env
,
jFileBlockHosts
,
j
);
if
(
!
jHost
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
"): "
"NewByteArray"
,
path
,
start
,
length
);
goto
done
;
}
hostName
=
(
const
char
*
)((
*
env
)
->
GetStringUTFChars
(
env
,
jHost
,
NULL
));
if
(
!
hostName
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsGetHosts(path=%s, start=%"
PRId64
", length=%"
PRId64
", "
"j=%d out of %d): GetStringUTFChars"
,
path
,
start
,
length
,
j
,
jNumBlockHosts
);
goto
done
;
}
blockHosts
[
i
][
j
]
=
strdup
(
hostName
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jHost
,
hostName
);
if
(
!
blockHosts
[
i
][
j
])
{
ret
=
ENOMEM
;
goto
done
;
}
destroyLocalReference
(
env
,
jHost
);
jHost
=
NULL
;
}
destroyLocalReference
(
env
,
jFileBlockHosts
);
jFileBlockHosts
=
NULL
;
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jFileStatus
);
destroyLocalReference
(
env
,
jBlockLocations
);
destroyLocalReference
(
env
,
jFileBlockHosts
);
destroyLocalReference
(
env
,
jHost
);
if
(
ret
)
{
if
(
blockHosts
)
{
hdfsFreeHosts
(
blockHosts
);
}
return
NULL
;
}
return
blockHosts
;
}
void
hdfsFreeHosts
(
char
***
blockHosts
)
{
int
i
,
j
;
for
(
i
=
0
;
blockHosts
[
i
];
i
++
)
{
for
(
j
=
0
;
blockHosts
[
i
][
j
];
j
++
)
{
free
(
blockHosts
[
i
][
j
]);
}
free
(
blockHosts
[
i
]);
}
free
(
blockHosts
);
}
tOffset
hdfsGetDefaultBlockSize
(
hdfsFS
fs
)
{
// JAVA EQUIVALENT:
// fs.getDefaultBlockSize();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
//FileSystem#getDefaultBlockSize()
jvalue
jVal
;
jthrowable
jthr
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getDefaultBlockSize"
,
"()J"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetDefaultBlockSize: FileSystem#getDefaultBlockSize"
);
return
-
1
;
}
return
jVal
.
j
;
}
tOffset
hdfsGetDefaultBlockSizeAtPath
(
hdfsFS
fs
,
const
char
*
path
)
{
// JAVA EQUIVALENT:
// fs.getDefaultBlockSize(path);
jthrowable
jthr
;
jobject
jFS
=
(
jobject
)
fs
;
jobject
jPath
;
tOffset
blockSize
;
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetDefaultBlockSize(path=%s): constructNewObjectOfPath"
,
path
);
return
-
1
;
}
jthr
=
getDefaultBlockSize
(
env
,
jFS
,
jPath
,
&
blockSize
);
(
*
env
)
->
DeleteLocalRef
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetDefaultBlockSize(path=%s): "
"FileSystem#getDefaultBlockSize"
,
path
);
return
-
1
;
}
return
blockSize
;
}
tOffset
hdfsGetCapacity
(
hdfsFS
fs
)
{
// JAVA EQUIVALENT:
// FsStatus fss = fs.getStatus();
// return Fss.getCapacity();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
//FileSystem#getStatus
jvalue
jVal
;
jthrowable
jthr
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getStatus"
,
"()Lorg/apache/hadoop/fs/FsStatus;"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetCapacity: FileSystem#getStatus"
);
return
-
1
;
}
jobject
fss
=
(
jobject
)
jVal
.
l
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
fss
,
HADOOP_FSSTATUS
,
"getCapacity"
,
"()J"
);
destroyLocalReference
(
env
,
fss
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetCapacity: FsStatus#getCapacity"
);
return
-
1
;
}
return
jVal
.
j
;
}
tOffset
hdfsGetUsed
(
hdfsFS
fs
)
{
// JAVA EQUIVALENT:
// FsStatus fss = fs.getStatus();
// return Fss.getUsed();
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
-
1
;
}
jobject
jFS
=
(
jobject
)
fs
;
//FileSystem#getStatus
jvalue
jVal
;
jthrowable
jthr
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getStatus"
,
"()Lorg/apache/hadoop/fs/FsStatus;"
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetUsed: FileSystem#getStatus"
);
return
-
1
;
}
jobject
fss
=
(
jobject
)
jVal
.
l
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
fss
,
HADOOP_FSSTATUS
,
"getUsed"
,
"()J"
);
destroyLocalReference
(
env
,
fss
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetUsed: FsStatus#getUsed"
);
return
-
1
;
}
return
jVal
.
j
;
}
static
jthrowable
getFileInfoFromStat
(
JNIEnv
*
env
,
jobject
jStat
,
hdfsFileInfo
*
fileInfo
)
{
jvalue
jVal
;
jthrowable
jthr
;
jobject
jPath
=
NULL
;
jstring
jPathName
=
NULL
;
jstring
jUserName
=
NULL
;
jstring
jGroupName
=
NULL
;
jobject
jPermission
=
NULL
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"isDir"
,
"()Z"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mKind
=
jVal
.
z
?
kObjectKindDirectory
:
kObjectKindFile
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getReplication"
,
"()S"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mReplication
=
jVal
.
s
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getBlockSize"
,
"()J"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mBlockSize
=
jVal
.
j
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getModificationTime"
,
"()J"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mLastMod
=
jVal
.
j
/
1000
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getAccessTime"
,
"()J"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mLastAccess
=
(
tTime
)
(
jVal
.
j
/
1000
);
if
(
fileInfo
->
mKind
==
kObjectKindFile
)
{
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getLen"
,
"()J"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mSize
=
jVal
.
j
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getPath"
,
"()Lorg/apache/hadoop/fs/Path;"
);
if
(
jthr
)
goto
done
;
jPath
=
jVal
.
l
;
if
(
jPath
==
NULL
)
{
jthr
=
newRuntimeError
(
env
,
"org.apache.hadoop.fs.FileStatus#"
"getPath returned NULL!"
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jPath
,
HADOOP_PATH
,
"toString"
,
"()Ljava/lang/String;"
);
if
(
jthr
)
goto
done
;
jPathName
=
jVal
.
l
;
const
char
*
cPathName
=
(
const
char
*
)
((
*
env
)
->
GetStringUTFChars
(
env
,
jPathName
,
NULL
));
if
(
!
cPathName
)
{
jthr
=
getPendingExceptionAndClear
(
env
);
goto
done
;
}
fileInfo
->
mName
=
strdup
(
cPathName
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jPathName
,
cPathName
);
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getOwner"
,
"()Ljava/lang/String;"
);
if
(
jthr
)
goto
done
;
jUserName
=
jVal
.
l
;
const
char
*
cUserName
=
(
const
char
*
)
((
*
env
)
->
GetStringUTFChars
(
env
,
jUserName
,
NULL
));
if
(
!
cUserName
)
{
jthr
=
getPendingExceptionAndClear
(
env
);
goto
done
;
}
fileInfo
->
mOwner
=
strdup
(
cUserName
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jUserName
,
cUserName
);
const
char
*
cGroupName
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getGroup"
,
"()Ljava/lang/String;"
);
if
(
jthr
)
goto
done
;
jGroupName
=
jVal
.
l
;
cGroupName
=
(
const
char
*
)
((
*
env
)
->
GetStringUTFChars
(
env
,
jGroupName
,
NULL
));
if
(
!
cGroupName
)
{
jthr
=
getPendingExceptionAndClear
(
env
);
goto
done
;
}
fileInfo
->
mGroup
=
strdup
(
cGroupName
);
(
*
env
)
->
ReleaseStringUTFChars
(
env
,
jGroupName
,
cGroupName
);
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jStat
,
HADOOP_STAT
,
"getPermission"
,
"()Lorg/apache/hadoop/fs/permission/FsPermission;"
);
if
(
jthr
)
goto
done
;
if
(
jVal
.
l
==
NULL
)
{
jthr
=
newRuntimeError
(
env
,
"%s#getPermission returned NULL!"
,
HADOOP_STAT
);
goto
done
;
}
jPermission
=
jVal
.
l
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jPermission
,
HADOOP_FSPERM
,
"toShort"
,
"()S"
);
if
(
jthr
)
goto
done
;
fileInfo
->
mPermissions
=
jVal
.
s
;
jthr
=
NULL
;
done:
if
(
jthr
)
hdfsFreeFileInfoEntry
(
fileInfo
);
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jPathName
);
destroyLocalReference
(
env
,
jUserName
);
destroyLocalReference
(
env
,
jGroupName
);
destroyLocalReference
(
env
,
jPermission
);
destroyLocalReference
(
env
,
jPath
);
return
jthr
;
}
static
jthrowable
getFileInfo
(
JNIEnv
*
env
,
jobject
jFS
,
jobject
jPath
,
hdfsFileInfo
**
fileInfo
)
{
// JAVA EQUIVALENT:
// fs.isDirectory(f)
// fs.getModificationTime()
// fs.getAccessTime()
// fs.getLength(f)
// f.getPath()
// f.getOwner()
// f.getGroup()
// f.getPermission().toShort()
jobject
jStat
;
jvalue
jVal
;
jthrowable
jthr
;
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"exists"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
"Z"
),
jPath
);
if
(
jthr
)
return
jthr
;
if
(
jVal
.
z
==
0
)
{
*
fileInfo
=
NULL
;
return
NULL
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_FS
,
"getFileStatus"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
JPARAM
(
HADOOP_STAT
)),
jPath
);
if
(
jthr
)
return
jthr
;
jStat
=
jVal
.
l
;
*
fileInfo
=
calloc
(
1
,
sizeof
(
hdfsFileInfo
));
if
(
!*
fileInfo
)
{
destroyLocalReference
(
env
,
jStat
);
return
newRuntimeError
(
env
,
"getFileInfo: OOM allocating hdfsFileInfo"
);
}
jthr
=
getFileInfoFromStat
(
env
,
jStat
,
*
fileInfo
);
destroyLocalReference
(
env
,
jStat
);
return
jthr
;
}
hdfsFileInfo
*
hdfsListDirectory
(
hdfsFS
fs
,
const
char
*
path
,
int
*
numEntries
)
{
// JAVA EQUIVALENT:
// Path p(path);
// Path []pathList = fs.listPaths(p)
// foreach path in pathList
// getFileInfo(path)
jthrowable
jthr
;
jobject
jPath
=
NULL
;
hdfsFileInfo
*
pathList
=
NULL
;
jobjectArray
jPathList
=
NULL
;
jvalue
jVal
;
jsize
jPathListSize
=
0
;
int
ret
;
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
jobject
jFS
=
(
jobject
)
fs
;
//Create an object of org.apache.hadoop.fs.Path
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsListDirectory(%s): constructNewObjectOfPath"
,
path
);
goto
done
;
}
jthr
=
invokeMethod
(
env
,
&
jVal
,
INSTANCE
,
jFS
,
HADOOP_DFS
,
"listStatus"
,
JMETHOD1
(
JPARAM
(
HADOOP_PATH
),
JARRPARAM
(
HADOOP_STAT
)),
jPath
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsListDirectory(%s): FileSystem#listStatus"
,
path
);
goto
done
;
}
jPathList
=
jVal
.
l
;
//Figure out the number of entries in that directory
jPathListSize
=
(
*
env
)
->
GetArrayLength
(
env
,
jPathList
);
if
(
jPathListSize
==
0
)
{
ret
=
0
;
goto
done
;
}
//Allocate memory
pathList
=
calloc
(
jPathListSize
,
sizeof
(
hdfsFileInfo
));
if
(
pathList
==
NULL
)
{
ret
=
ENOMEM
;
goto
done
;
}
//Save path information in pathList
jsize
i
;
jobject
tmpStat
;
for
(
i
=
0
;
i
<
jPathListSize
;
++
i
)
{
tmpStat
=
(
*
env
)
->
GetObjectArrayElement
(
env
,
jPathList
,
i
);
if
(
!
tmpStat
)
{
ret
=
printPendingExceptionAndFree
(
env
,
PRINT_EXC_ALL
,
"hdfsListDirectory(%s): GetObjectArrayElement(%d out of %d)"
,
path
,
i
,
jPathListSize
);
goto
done
;
}
jthr
=
getFileInfoFromStat
(
env
,
tmpStat
,
&
pathList
[
i
]);
destroyLocalReference
(
env
,
tmpStat
);
if
(
jthr
)
{
ret
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsListDirectory(%s): getFileInfoFromStat(%d out of %d)"
,
path
,
i
,
jPathListSize
);
goto
done
;
}
}
ret
=
0
;
done:
destroyLocalReference
(
env
,
jPath
);
destroyLocalReference
(
env
,
jPathList
);
if
(
ret
)
{
hdfsFreeFileInfo
(
pathList
,
jPathListSize
);
errno
=
ret
;
return
NULL
;
}
*
numEntries
=
jPathListSize
;
return
pathList
;
}
hdfsFileInfo
*
hdfsGetPathInfo
(
hdfsFS
fs
,
const
char
*
path
)
{
// JAVA EQUIVALENT:
// File f(path);
// fs.isDirectory(f)
// fs.lastModified() ??
// fs.getLength(f)
// f.getPath()
//Get the JNIEnv* corresponding to current thread
JNIEnv
*
env
=
getJNIEnv
();
if
(
env
==
NULL
)
{
errno
=
EINTERNAL
;
return
NULL
;
}
jobject
jFS
=
(
jobject
)
fs
;
//Create an object of org.apache.hadoop.fs.Path
jobject
jPath
;
jthrowable
jthr
=
constructNewObjectOfPath
(
env
,
path
,
&
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
PRINT_EXC_ALL
,
"hdfsGetPathInfo(%s): constructNewObjectOfPath"
,
path
);
return
NULL
;
}
hdfsFileInfo
*
fileInfo
;
jthr
=
getFileInfo
(
env
,
jFS
,
jPath
,
&
fileInfo
);
destroyLocalReference
(
env
,
jPath
);
if
(
jthr
)
{
errno
=
printExceptionAndFree
(
env
,
jthr
,
NOPRINT_EXC_ACCESS_CONTROL
|
NOPRINT_EXC_FILE_NOT_FOUND
|
NOPRINT_EXC_UNRESOLVED_LINK
,
"hdfsGetPathInfo(%s): getFileInfo"
,
path
);
return
NULL
;
}
if
(
!
fileInfo
)
{
errno
=
ENOENT
;
return
NULL
;
}
return
fileInfo
;
}
static
void
hdfsFreeFileInfoEntry
(
hdfsFileInfo
*
hdfsFileInfo
)
{
free
(
hdfsFileInfo
->
mName
);
free
(
hdfsFileInfo
->
mOwner
);
free
(
hdfsFileInfo
->
mGroup
);
memset
(
hdfsFileInfo
,
0
,
sizeof
(
hdfsFileInfo
));
}
void
hdfsFreeFileInfo
(
hdfsFileInfo
*
hdfsFileInfo
,
int
numEntries
)
{
//Free the mName, mOwner, and mGroup
int
i
;
for
(
i
=
0
;
i
<
numEntries
;
++
i
)
{
hdfsFreeFileInfoEntry
(
hdfsFileInfo
+
i
);
}
//Free entire block
free
(
hdfsFileInfo
);
}
/**
* vim: ts=4: sw=4: et:
*/
Event Timeline
Log In to Comment