Skip to content

Commit 81b8b16

Browse files
author
Gerald Quintana
committed
Handle missing directories
1 parent 5920e40 commit 81b8b16

File tree

10 files changed

+169
-77
lines changed

10 files changed

+169
-77
lines changed

src/main/scala/com/coxautodata/OptionsParsing.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.coxautodata
22

3-
import java.net.URI
3+
import com.coxautodata.utils.DirectoryUtils.missingDirectoryAction
4+
import com.coxautodata.utils.{DirectoryUtils, MissingDirectoryAction}
45

6+
import java.net.URI
57
import org.apache.hadoop.conf.Configuration
68
import org.apache.hadoop.fs.Path
79

@@ -64,6 +66,10 @@ object OptionsParsing {
6466
.action((i, c) => c.copyOptions(_.copy(maxBytesPerTask = i)))
6567
.text("Maximum number of bytes to copy in a single Spark task")
6668

69+
opt[String]("onMissingDirectory")
70+
.action((s, c) => c.copyOptions(_.copy(missingDirectoryAction = missingDirectoryAction(s))))
71+
.text("Maximum number of bytes to copy in a single Spark task")
72+
6773
help("help").text("prints this usage text")
6874

6975
arg[String]("[source_path...] <target_path>")

src/main/scala/com/coxautodata/SparkDistCP.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ object SparkDistCP extends Logging {
9595
qualifiedDestinationPath.toUri,
9696
options.updateOverwritePathBehaviour,
9797
options.numListstatusThreads,
98-
options.filterNot
98+
options.filterNot,
99+
options.missingDirectoryAction
99100
)
100101
val destinationRDD = FileListUtils.getDestinationFiles(
101102
sparkSession.sparkContext,

src/main/scala/com/coxautodata/SparkDistCPOptions.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.coxautodata
22

3+
import com.coxautodata.utils.{FailMissingDirectoryAction, MissingDirectoryAction}
34
import org.apache.hadoop.conf.Configuration
45
import org.apache.hadoop.fs.Path
56

@@ -24,7 +25,8 @@ case class SparkDistCPOptions(
2425
filters: Option[URI] = SparkDistCPOptions.Defaults.filters,
2526
filterNot: List[Regex] = SparkDistCPOptions.Defaults.filterNot,
2627
numListstatusThreads: Int = SparkDistCPOptions.Defaults.numListstatusThreads,
27-
verbose: Boolean = SparkDistCPOptions.Defaults.verbose
28+
verbose: Boolean = SparkDistCPOptions.Defaults.verbose,
29+
missingDirectoryAction: MissingDirectoryAction = SparkDistCPOptions.Defaults.missingDirectoryAction
2830
) {
2931

3032
val updateOverwritePathBehaviour: Boolean =
@@ -93,6 +95,7 @@ object SparkDistCPOptions {
9395
val filterNot: List[Regex] = List.empty
9496
val numListstatusThreads: Int = 10
9597
val verbose: Boolean = false
98+
val missingDirectoryAction : MissingDirectoryAction = FailMissingDirectoryAction
9699
}
97100

98101
}

src/main/scala/com/coxautodata/objects/Accumulators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class Accumulators(sparkSession: SparkSession) extends Serializable {
9191
exceptionCount.value.asScala.toSeq
9292
.sortWith { case ((_, v1), (_, v2)) => v1 > v2 }
9393
.map { case (k, v) => s"$k: ${intFormatter.format(v)}" }
94-
.mkString("\n")
94+
.mkString(System.getProperty("line.separator"))
9595
}
9696

9797
val bytesCopied: LongAccumulator =

src/main/scala/com/coxautodata/utils/CopyUtils.scala

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,13 @@ object CopyUtils extends Logging {
131131
)
132132
} else {
133133
val result = Try {
134-
if (destFS.exists(destPath.getParent)) {
135-
destFS.mkdirs(destPath)
136-
DirectoryCopyResult(
137-
definition.source.getPath.toUri,
138-
definition.destination,
139-
CopyActionResult.Created
140-
)
141-
} else
142-
throw new FileNotFoundException(
143-
s"Parent folder [${destPath.getParent}] does not exist."
144-
)
134+
options.missingDirectoryAction.requireDirectory(destFS, destPath.getParent)
135+
options.missingDirectoryAction.createDirectory(destFS, destPath)
136+
DirectoryCopyResult(
137+
definition.source.getPath.toUri,
138+
definition.destination,
139+
CopyActionResult.Created
140+
)
145141
}
146142
.recover { case _: FileAlreadyExistsException =>
147143
DirectoryCopyResult(
@@ -193,9 +189,9 @@ object CopyUtils extends Logging {
193189
destFS,
194190
definition.destination,
195191
removeExisting = false,
196-
ignoreErrors = options.ignoreErrors,
192+
options = options,
197193
taskAttemptID
198-
)
194+
)
199195
case Failure(e) if options.ignoreErrors =>
200196
logError(
201197
s"Exception whilst getting destination file information [${definition.destination}]",
@@ -223,9 +219,9 @@ object CopyUtils extends Logging {
223219
destFS,
224220
definition.destination,
225221
removeExisting = true,
226-
ignoreErrors = options.ignoreErrors,
222+
options = options,
227223
taskAttemptID
228-
)
224+
)
229225
case Success(d) if options.update =>
230226
Try {
231227
filesAreIdentical(
@@ -269,7 +265,7 @@ object CopyUtils extends Logging {
269265
destFS,
270266
definition.destination,
271267
removeExisting = true,
272-
ignoreErrors = options.ignoreErrors,
268+
options = options,
273269
taskAttemptID
274270
)
275271
}
@@ -326,7 +322,7 @@ object CopyUtils extends Logging {
326322
destFS: FileSystem,
327323
dest: URI,
328324
removeExisting: Boolean,
329-
ignoreErrors: Boolean,
325+
options: SparkDistCPOptions,
330326
taskAttemptID: Long
331327
): FileCopyResult = {
332328

@@ -342,10 +338,7 @@ object CopyUtils extends Logging {
342338
var out: Option[FSDataOutputStream] = None
343339
try {
344340
in = Some(sourceFS.open(sourceFile.getPath))
345-
if (!destFS.exists(tempPath.getParent))
346-
throw new RuntimeException(
347-
s"Destination folder [${tempPath.getParent}] does not exist"
348-
)
341+
options.missingDirectoryAction.requireDirectory(destFS, tempPath.getParent)
349342
out = Some(destFS.create(tempPath, false))
350343
IOUtils.copyBytes(
351344
in.get,
@@ -354,6 +347,7 @@ object CopyUtils extends Logging {
354347
)
355348

356349
} catch {
350+
case e: FileNotFoundException => throw new RuntimeException(e.getMessage)
357351
case e: Throwable => throw e
358352
} finally {
359353
in.foreach(_.close())
@@ -397,7 +391,7 @@ object CopyUtils extends Logging {
397391
sourceFile.len,
398392
CopyActionResult.Copied
399393
)
400-
case Failure(e) if ignoreErrors =>
394+
case Failure(e) if options.ignoreErrors =>
401395
logError(
402396
s"Failed to copy file [${sourceFile.getPath}] to [$destPath]",
403397
e
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.coxautodata.utils
2+
3+
import com.coxautodata.objects.Logging
4+
import com.coxautodata.utils.DirectoryUtils.NO_FILES
5+
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
6+
7+
import java.io.FileNotFoundException
8+
9+
abstract sealed class MissingDirectoryAction extends Logging {
10+
def requireDirectory(destFS: FileSystem, destPath: Path): Unit
11+
12+
def createDirectory(destFS: FileSystem, destPath: Path): Unit = destFS.mkdirs(destPath)
13+
14+
def listFiles(destFS: FileSystem, destPath: Path): RemoteIterator[LocatedFileStatus] = {
15+
if (destFS.exists(destPath)) {
16+
destFS.listLocatedStatus(destPath)
17+
} else {
18+
logWarning(s"Folder [${destPath.getParent}] does not exist.")
19+
NO_FILES
20+
}
21+
}
22+
23+
}
24+
25+
case object CreateMissingDirectoryAction extends MissingDirectoryAction {
26+
override def requireDirectory(destFS: FileSystem, destPath: Path): Unit = {
27+
if (!destFS.exists(destPath)) {
28+
destFS.mkdirs(destPath)
29+
}
30+
}
31+
}
32+
33+
case object FailMissingDirectoryAction extends MissingDirectoryAction {
34+
override def requireDirectory(destFS: FileSystem, destPath: Path): Unit = {
35+
if (!destFS.exists(destPath)) {
36+
throw new FileNotFoundException(s"Folder [${destPath}] does not exist.")
37+
}
38+
}
39+
40+
override def listFiles(destFS: FileSystem, destPath: Path): RemoteIterator[LocatedFileStatus] = {
41+
if (destFS.exists(destPath)) {
42+
destFS.listLocatedStatus(destPath)
43+
} else {
44+
throw new FileNotFoundException(s"Folder [${destPath}] does not exist.")
45+
}
46+
}
47+
}
48+
49+
case object LogMissingDirectoryAction extends MissingDirectoryAction {
50+
override def requireDirectory(destFS: FileSystem, destPath: Path): Unit = {
51+
if (!destFS.exists(destPath)) {
52+
logWarning(s"Folder [${destPath.getParent}] does not exist.")
53+
}
54+
}
55+
56+
override def createDirectory(destFS: FileSystem, destPath: Path): Unit = {
57+
logDebug(s"Do not create folder [${destPath}].")
58+
}
59+
}
60+
61+
private class EmptyRemoteIterator[E] extends RemoteIterator[E] {
62+
override def hasNext: Boolean = false
63+
64+
override def next(): E = throw new UnsupportedOperationException()
65+
}
66+
67+
object DirectoryUtils {
68+
val NO_FILES: RemoteIterator[LocatedFileStatus] = new EmptyRemoteIterator[LocatedFileStatus]()
69+
70+
def missingDirectoryAction(name: String): MissingDirectoryAction = name.trim.toLowerCase match {
71+
case "create" => CreateMissingDirectoryAction
72+
case "log" => LogMissingDirectoryAction
73+
case "fail" => FailMissingDirectoryAction
74+
case _ => throw new IllegalArgumentException(s"Invalid missing directory option [$name]")
75+
}
76+
77+
}

src/main/scala/com/coxautodata/utils/FileListUtils.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ object FileListUtils extends Logging {
5454
path: Path,
5555
threads: Int,
5656
includePathRootInDependents: Boolean,
57-
filterNot: List[Regex]
57+
filterNot: List[Regex],
58+
missingDirectoryAction: MissingDirectoryAction
5859
): Seq[(SerializableFileStatus, Seq[SerializableFileStatus])] = {
5960

6061
assert(threads > 0, "Number of threads must be positive")
@@ -93,8 +94,7 @@ object FileListUtils extends Logging {
9394
)
9495
threadsWorking.put(uuid, true)
9596
try {
96-
localFS
97-
.listLocatedStatus(p._1)
97+
missingDirectoryAction.listFiles(localFS, p._1)
9898
.foreach {
9999
case l if l.isSymlink =>
100100
throw new RuntimeException(s"Link [$l] is not supported")
@@ -171,7 +171,8 @@ object FileListUtils extends Logging {
171171
destinationURI: URI,
172172
updateOverwritePathBehaviour: Boolean,
173173
numListstatusThreads: Int,
174-
filterNot: List[Regex]
174+
filterNot: List[Regex],
175+
missingDirectoryAction: MissingDirectoryAction
175176
): RDD[KeyedCopyDefinition] = {
176177
val sourceRDD = sourceURIs
177178
.map { sourceURI =>
@@ -184,7 +185,8 @@ object FileListUtils extends Logging {
184185
new Path(sourceURI),
185186
numListstatusThreads,
186187
!updateOverwritePathBehaviour,
187-
filterNot
188+
filterNot,
189+
missingDirectoryAction
188190
)
189191
)
190192
.map { case (f, d) =>
@@ -232,7 +234,8 @@ object FileListUtils extends Logging {
232234
destinationPath,
233235
options.numListstatusThreads,
234236
false,
235-
List.empty
237+
List.empty,
238+
options.missingDirectoryAction
236239
)
237240
)
238241
.map { case (f, _) => (f.getPath.toUri, f) }

0 commit comments

Comments
 (0)