Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
41bfdc9
Added Processes to IONative
rahulrangers Jun 4, 2025
4385bb1
unguard ProcessSuit and add nowarn212
rahulrangers Jun 5, 2025
a67c869
fixed F.raiseError
rahulrangers Jun 5, 2025
634ecc7
removed posix_spawnp and used fork,chdir,execve
rahulrangers Jun 8, 2025
9af0b2e
converted 2 to UInt
rahulrangers Jun 8, 2025
1d904ff
removed unused imports
rahulrangers Jun 8, 2025
6a29252
removed PIDFD_NONBLOCK
rahulrangers Jun 13, 2025
0ef2cc5
added fallback
rahulrangers Jun 19, 2025
526f1db
refactored cleanup and exitvalue
rahulrangers Jul 12, 2025
953a825
return unit
rahulrangers Jul 12, 2025
f341ab1
format
rahulrangers Jul 13, 2025
42e7264
format
rahulrangers Jul 13, 2025
f1486b2
changed fallbackexitval
rahulrangers Jul 23, 2025
600671c
used guard_
rahulrangers Aug 7, 2025
7149372
change toULong to toUSize
rahulrangers Aug 7, 2025
b53f5d9
added type
rahulrangers Aug 7, 2025
a9b6ffd
added type
rahulrangers Aug 7, 2025
881c96d
added type
rahulrangers Aug 7, 2025
b007fbd
added explicit type
rahulrangers Aug 7, 2025
fad19f1
refactor createprocess
rahulrangers Aug 8, 2025
eaebb46
remove implicit z
rahulrangers Aug 8, 2025
4f6cf84
use Zone.acquire
rahulrangers Aug 8, 2025
8cba5c7
remove implicit
rahulrangers Aug 8, 2025
21b69b4
added implicit
rahulrangers Aug 8, 2025
40b2edf
changed fallback
rahulrangers Aug 8, 2025
818c423
Added Kqueue for macOS
rahulrangers Aug 11, 2025
43517d9
Added Kqueue for macOS
rahulrangers Aug 11, 2025
b03ac7e
blocking fallback
rahulrangers Aug 11, 2025
f3ddaeb
remove unused imports
rahulrangers Aug 11, 2025
2cd3b4a
Merge branch 'main' into native-process
mpilquist Aug 26, 2025
5c90b80
Merge branch 'typelevel:main' into native-process
rahulrangers Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import fs2.io.CollectionCompat.*

import java.lang

private[process] trait ProcessesCompanionPlatform {
private[process] trait ProcessesCompanionJvmNative {
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {

def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Expand Down
26 changes: 26 additions & 0 deletions io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package process

private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative
66 changes: 34 additions & 32 deletions io/native/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.kernel.Resource
import cats.effect.kernel.Sync
import cats.syntax.all._
import fs2.io.internal.NativeUtil._

import java.io.OutputStream
import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -60,36 +59,7 @@ private[fs2] trait ioplatform extends iojvmnative {
/** Stream of bytes read asynchronously from standard input. */
def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] =
if (LinktimeInfo.isLinux || LinktimeInfo.isMac)
Stream
.resource {
Resource
.eval {
setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F]
}
.flatMap { poller =>
poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK)
}
}
.flatMap { handle =>
Stream.repeatEval {
handle
.pollReadRec(()) { _ =>
IO {
val buf = new Array[Byte](bufSize)
val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toUSize))
if (readed > 0)
Right(Some(Chunk.array(buf, 0, readed)))
else if (readed == 0)
Right(None)
else
Left(())
}
}
.to
}
}
.unNoneTerminate
.unchunks
readFd(STDIN_FILENO, bufSize)
else
readInputStream(Sync[F].blocking(System.in), bufSize, false)

Expand All @@ -107,7 +77,39 @@ private[fs2] trait ioplatform extends iojvmnative {
else
writeOutputStream(Sync[F].blocking(System.err), false)

private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in =>
private[fs2] def readFd[F[_]: Async: LiftIO](fd: Int, bufSize: Int): Stream[F, Byte] =
Stream
.resource {
Resource
.eval {
setNonBlocking(fd) *> fileDescriptorPoller[F]
}
.flatMap { poller =>
poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK)
}
}
.flatMap { handle =>
Stream.repeatEval {
handle
.pollReadRec(()) { _ =>
IO {
val buf = new Array[Byte](bufSize)
val readed = guard(read(fd, buf.atUnsafe(0), bufSize.toUSize))
if (readed > 0)
Right(Some(Chunk.array(buf, 0, readed)))
else if (readed == 0)
Right(None)
else
Left(())
}
}
.to
}
}
.unNoneTerminate
.unchunks

private[fs2] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in =>
Stream
.resource {
Resource
Expand Down
270 changes: 270 additions & 0 deletions io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package process

import cats.effect.kernel.{Async, Resource}
import cats.syntax.all.*
import scala.scalanative.unsafe.*
import scala.scalanative.unsigned.*
import scala.scalanative.libc.*
import scala.scalanative.posix.sys.wait.*
import scala.scalanative.posix.errno.{EPERM, ECHILD}
import scala.scalanative.meta.LinktimeInfo
import scala.scalanative.posix.unistd.*
import scala.scalanative.posix.signal.*
import java.io.IOException
import cats.effect.LiftIO
import cats.effect.IO
import org.typelevel.scalaccompat.annotation._
import fs2.io.internal.NativeUtil._
import cats.effect.unsafe.KqueueSystem.Kqueue

@extern
@nowarn212("cat=unused")
object LibC {
def pidfd_open(pid: CInt, flags: CInt): CInt = extern
}

private final case class NativeProcess(
pid: pid_t,
stdinFd: Int,
stdoutFd: Int,
stderrFd: Int,
pidfd: Option[Int] = None
)

private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative {

private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = {
val pathEnv = sys.env.getOrElse("PATH", "")
pathEnv
.split(':')
.find { dir =>
val full = s"$dir/$cmd"
access(toCString(full), X_OK) == 0
}
.map(dir => s"$dir/$cmd")
}

@inline private def closeAll(fds: Int*): Unit =
fds.foreach(close)

def pipeResource[F[_]](implicit F: Async[F]): Resource[F, (Int, Int)] =
Resource.make {
F.blocking {
val fds = stackalloc[CInt](2.toUInt)
guard_(pipe(fds))
(fds(0), fds(1))
}
} { case (r, w) => F.blocking { close(r); close(w); () } }

def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =
def forLiftIO[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =

if (LinktimeInfo.isMac || LinktimeInfo.isLinux) {
new UnsealedProcesses[F] {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {

val pipesResource: Resource[F, ((Int, Int), (Int, Int), (Int, Int))] =
for {
stdinPipe <- pipeResource[F]
stdoutPipe <- pipeResource[F]
stderrPipe <- pipeResource[F]
} yield (stdinPipe, stdoutPipe, stderrPipe)

def createProcess(
stdinPipe: (Int, Int),
stdoutPipe: (Int, Int),
stderrPipe: (Int, Int)
): F[NativeProcess] = F.blocking {
Zone.acquire { implicit z =>
val envMap =
if (process.inheritEnv)
sys.env ++ process.extraEnv
else process.extraEnv

val envp = stackalloc[CString]((envMap.size + 1).toUSize)
envMap.zipWithIndex.foreach { case ((k, v), i) =>
envp(i.toUSize) = toCString(s"$k=$v")
}
envp(envMap.size.toUSize) = null

val allArgs = process.command +: process.args
val argv = stackalloc[CString](allArgs.length.toUSize + 1.toUSize)
allArgs.zipWithIndex.foreach { case (arg, i) =>
argv(i.toUSize) = toCString(arg)
}
argv(allArgs.length.toUSize) = null

val executable =
if (process.command.startsWith("/"))
process.command
else
findExecutable(process.command).getOrElse(process.command)
val ret = guard(fork())
ret match {
case 0 =>
closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1)
guard_(dup2(stdinPipe._1, STDIN_FILENO))
guard_(dup2(stdoutPipe._2, STDOUT_FILENO))
guard_(dup2(stderrPipe._2, STDERR_FILENO))
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)

process.workingDirectory.foreach { dir =>
if ((dir != null) && (dir.toString != ".")) {
guard_(chdir(toCString(dir.toString)))
}
}

execve(toCString(executable), argv, envp)
_exit(127)
throw new AssertionError("unreachable")
case pid =>
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
val pidfd =
if (LinktimeInfo.isLinux) {
val fd = LibC.pidfd_open(pid, 0)
if (fd >= 0) Some(fd) else None
} else None
NativeProcess(
pid = pid,
stdinFd = stdinPipe._2,
stdoutFd = stdoutPipe._1,
stderrFd = stderrPipe._1,
pidfd
)
}
}
}

def cleanup(proc: NativeProcess): F[Unit] =
F.blocking {
closeAll(proc.stdinFd, proc.stdoutFd, proc.stderrFd)
val alive = {
val res = kill(proc.pid, 0)
res == 0 || errno.errno == EPERM
}
if (alive) {
kill(proc.pid, SIGKILL)
val status = stackalloc[CInt]()
waitpid(proc.pid, status, 0)
()
} else {
val status = stackalloc[CInt]()
waitpid(proc.pid, status, WNOHANG)
()
Comment on lines +171 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this else? Since it doesn't seem to use the status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are having else to remove the zombie process from the kernel space as given in this:
A child that terminates, but has not been waited for becomes a
"zombie".
https://man7.org/linux/man-pages/man2/waitpid.2.html

}
}

pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) =>
Resource
.make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup)
.flatMap { nativeProcess =>
nativeProcess.pidfd match {
case Some(pidfd) =>
for {
poller <- Resource.eval(fileDescriptorPoller[F])
handle <- poller.registerFileDescriptor(pidfd, true, false).mapK(LiftIO.liftK)
} yield (nativeProcess, Some(handle))
case None =>
Resource.pure((nativeProcess, None))
}
}
.map { case (nativeProcess, pollHandleOpt) =>
new UnsealedProcess[F] {
def isAlive: F[Boolean] = F.delay {
kill(nativeProcess.pid, 0) == 0 || errno.errno == EPERM
}

def exitValue: F[Int] =
if (LinktimeInfo.isLinux) {
(nativeProcess.pidfd, pollHandleOpt) match {
case (Some(_), Some(handle)) =>
handle
.pollReadRec(()) { _ =>
IO {
val statusPtr = stackalloc[CInt]()
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
if (result == nativeProcess.pid) {
val exitCode = WEXITSTATUS(!statusPtr)
Right(exitCode)
} else if (result == 0) {
Left(())
} else {
if (errno.errno == ECHILD)
throw new IOException("No such process")
else
throw new IOException(
s"waitpid failed with errno: ${errno.errno}"
)
}
}
}
.to
case _ =>
fallbackExitValue(nativeProcess.pid)
}
} else {
fileDescriptorPoller[F] match {
case kq: Kqueue =>
kq.awaitEvent(nativeProcess.pid, -5, 0x0005, 0x80000000).to.map(_.toInt)
case _ => fallbackExitValue(nativeProcess.pid)
}
}

def stdin: Pipe[F, Byte, Nothing] = { in =>
in
.through(writeFd(nativeProcess.stdinFd))
.onFinalize {
F.blocking {
close(nativeProcess.stdinFd)
}.void
}
}

def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
.onFinalize {
F.blocking {
close(nativeProcess.stdoutFd)
}.void
}

def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
.onFinalize {
F.blocking {
close(nativeProcess.stderrFd)
}.void
}
}
}
}
}

private def fallbackExitValue(pid: pid_t): F[Int] = F.delay {
val status = stackalloc[CInt]()
guard_(waitpid(pid, status, 0))
WEXITSTATUS(!status)
}

}
} else super.forAsync[F]
}
Loading
Loading