Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,11 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"org.scodec" %%% "scodec-bits" % "1.2.4",
"org.typelevel" %%% "cats-core" % "2.13.0",
"org.typelevel" %%% "cats-effect" % "3.7.0-RC1",
"org.typelevel" %%% "cats-mtl" % "1.6.0",
"org.typelevel" %%% "cats-effect-laws" % "3.7.0-RC1" % Test,
"org.typelevel" %%% "cats-effect-testkit" % "3.7.0-RC1" % Test,
"org.typelevel" %%% "cats-laws" % "2.13.0" % Test,
"org.typelevel" %%% "cats-mtl-laws" % "1.6.0" % Test,
"org.typelevel" %%% "discipline-munit" % "2.0.0" % Test,
"org.typelevel" %%% "munit-cats-effect" % "2.2.0-RC1" % Test,
"org.typelevel" %%% "scalacheck-effect-munit" % "2.1.0-RC1" % Test
Expand Down
40 changes: 39 additions & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import fs2.concurrent._
import fs2.internal._
import org.typelevel.scalaccompat.annotation._
import Pull.StreamPullOps
import cats.mtl.{LiftKind, LiftValue}

import java.util.concurrent.Flow.{Publisher, Processor, Subscriber}
import java.util.concurrent.Flow.{Processor, Publisher, Subscriber}

/** A stream producing output of type `O` and which may evaluate `F` effects.
*
Expand Down Expand Up @@ -5810,9 +5811,46 @@ object Stream extends StreamLowPriority {
new Defer[Stream[F, *]] {
override def defer[A](fa: => Stream[F, A]): Stream[F, A] = Stream.empty ++ fa
}

implicit def liftKindInstance[F[_]](implicit F: Applicative[F]): LiftKind[F, Stream[F, *]] =
liftKindImpl(F)

implicit def liftValueFromResourceInstance[F[_]](implicit
F: MonadCancel[F, ?]
): LiftValue[Resource[F, *], Stream[F, *]] =
liftValueFromResourceImpl(implicitly)
}

private[fs2] trait StreamLowPriority {
implicit def monadInstance[F[_]]: Monad[Stream[F, *]] =
new Stream.StreamMonad[F]

protected[this] def liftKindImpl[F[_]](F: Applicative[F]): LiftKind[F, Stream[F, *]] =
new LiftKind[F, Stream[F, *]] {
val applicativeF: Applicative[F] = F
val applicativeG: Applicative[Stream[F, *]] = monadInstance
def apply[A](fa: F[A]): Stream[F, A] = Stream.eval(fa)
def limitedMapK[A](ga: Stream[F, A])(scope: F ~> F): Stream[F, A] =
ga.translate(scope)
}

implicit def liftKindComposedInstance[F[_], G[_]](implicit
inner: LiftKind[F, G]
): LiftKind[F, Stream[G, *]] =
inner.andThen(liftKindImpl(inner.applicativeG))

protected[this] def liftValueFromResourceImpl[F[_]](
applicativeResource: Applicative[Resource[F, *]]
)(implicit F: MonadCancel[F, ?]): LiftValue[Resource[F, *], Stream[F, *]] =
new LiftValue[Resource[F, *], Stream[F, *]] {
val applicativeF: Applicative[Resource[F, *]] = applicativeResource
val applicativeG: Applicative[Stream[F, *]] = monadInstance
def apply[A](fa: Resource[F, A]): Stream[F, A] = Stream.resource(fa)
}

implicit def liftValueFromResourceComposedInstance[F[_], G[_]](implicit
inner: LiftValue[F, Resource[G, *]],
G: MonadCancel[G, ?]
): LiftValue[F, Stream[G, *]] =
inner.andThen(liftValueFromResourceImpl(inner.applicativeG))
}
61 changes: 54 additions & 7 deletions core/shared/src/test/scala/fs2/StreamLawsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,39 @@

package fs2

import cats.Eq
import cats.effect.IO
import cats.{Applicative, Eq, ~>}
import cats.data.{IdT, OptionT}
import cats.effect.{Concurrent, IO, Ref, Resource}
import cats.effect.testkit.TestInstances
import cats.laws.discipline._
import cats.laws.discipline.arbitrary._
import cats.mtl.LiftValue
import cats.mtl.laws.discipline.{LiftKindTests, LiftValueTests}
import org.scalacheck.{Arbitrary, Gen}

class StreamLawsSuite extends Fs2Suite with TestInstances {
implicit val ticker: Ticker = Ticker()

implicit def eqStream[O: Eq]: Eq[Stream[IO, O]] =
Eq.instance((x, y) =>
Eq[IO[Vector[Either[Throwable, O]]]]
.eqv(x.attempt.compile.toVector, y.attempt.compile.toVector)
)
implicit def eqStream[F[_], O](implicit
F: Concurrent[F],
eqFVecEitherThrowO: Eq[F[Vector[Either[Throwable, O]]]]
): Eq[Stream[F, O]] =
Eq.by((_: Stream[F, O]).attempt.compile.toVector)

private[this] val counter: IO[Ref[IO, Int]] = IO.ref(0)

implicit val arbitraryScope: Arbitrary[IO ~> IO] =
Arbitrary {
Gen.const {
new (IO ~> IO) {
def apply[A](fa: IO[A]): IO[A] =
for {
ref <- counter
res <- ref.update(_ + 1) >> fa
} yield res
}
}
}

checkAll(
"MonadError[Stream[F, *], Throwable]",
Expand All @@ -50,4 +69,32 @@ class StreamLawsSuite extends Fs2Suite with TestInstances {
"Align[Stream[F, *]]",
AlignTests[Stream[IO, *]].align[Int, Int, Int, Int]
)
checkAll(
"LiftKind[IO, Stream[IO, *]",
LiftKindTests[IO, Stream[IO, *]].liftKind[Int, Int]
)
checkAll(
"LiftKind[IO, Stream[OptionT[IO, *], *]",
LiftKindTests[IO, Stream[OptionT[IO, *], *]].liftKind[Int, Int]
)
checkAll(
"LiftValue[Resource[IO, *], Stream[IO, *]",
LiftValueTests[Resource[IO, *], Stream[IO, *]].liftValue[Int, Int]
)
locally {
// this is a somewhat silly instance, but we need a
// `LiftValue[X, Resource[IO, *]]` instance where `X` is not `IO` because
// that already has a higher priority implicit instance
implicit val liftIdTResource: LiftValue[IdT[IO, *], Resource[IO, *]] =
new LiftValue[IdT[IO, *], Resource[IO, *]] {
val applicativeF: Applicative[IdT[IO, *]] = implicitly
val applicativeG: Applicative[Resource[IO, *]] = implicitly
def apply[A](fa: IdT[IO, A]): Resource[IO, A] =
Resource.eval(fa.value)
}
checkAll(
"LiftValue[IdT[IO, *], Stream[IO, *]] via Resource[IO, *]",
LiftValueTests[IdT[IO, *], Stream[IO, *]].liftValue[Int, Int]
)
}
}