/*
 * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cats
package effect
package laws

import cats.effect.concurrent.Deferred
import cats.syntax.all._
import cats.laws._

import scala.concurrent.Promise

trait ConcurrentEffectLaws[F[_]] extends ConcurrentLaws[F] with EffectLaws[F] {
  implicit def F: ConcurrentEffect[F]

  def runAsyncRunCancelableCoherence[A](fa: F[A]) = {
    val fa1 = IO.async[A] { cb =>
      F.runAsync(fa)(r => IO(cb(r))).unsafeRunSync()
    }
    val fa2 = IO.cancelable[A] { cb =>
      F.toIO(F.runCancelable(fa)(r => IO(cb(r))).unsafeRunSync())
    }
    fa1 <-> fa2
  }

  def runCancelableIsSynchronous[A] = {
    val lh = Deferred.uncancelable[F, Unit].flatMap { latch =>
      val spawned = Promise[Unit]()
      // Never ending task
      val ff = F.cancelable[A] { _ =>
        spawned.success(()); latch.complete(())
      }
      // Execute, then cancel
      val token = F.delay(F.runCancelable(ff)(_ => IO.unit).unsafeRunSync()).flatMap { cancel =>
        // Waiting for the task to start before cancelling it
        Async.fromFuture(F.pure(spawned.future)) >> cancel
      }
      F.liftIO(F.runAsync(token)(_ => IO.unit).toIO) *> latch.get
    }
    lh <-> F.unit
  }

  def runCancelableStartCancelCoherence[A](a: A) = {
    // Cancellation via runCancelable
    val f1: F[A] = for {
      effect1 <- Deferred.uncancelable[F, A]
      latch <- F.delay(Promise[Unit]())
      never = F.cancelable[A] { _ =>
        latch.success(()); effect1.complete(a)
      }
      cancel <- F.liftIO(F.runCancelable(never)(_ => IO.unit).toIO)
      // Waiting for the task to start before cancelling it
      _ <- Async.fromFuture(F.pure(latch.future)) // TODO get rid of this, IO, and Future here
      _ <- cancel
      result <- effect1.get
    } yield result

    // Cancellation via start.flatMap(_.cancel)
    val f2: F[A] = for {
      effect2 <- Deferred.uncancelable[F, A]
      // Using a latch to ensure that the task started
      latch <- Deferred.uncancelable[F, Unit]
      never = F.bracket(latch.complete(()))(_ => F.never[Unit])(_ => effect2.complete(a))
      fiber <- F.start(never)
      // Waiting for the task to start before cancelling it
      _ <- latch.get
      _ <- F.start(fiber.cancel)
      result <- effect2.get
    } yield result

    f1 <-> f2
  }

  def toIORunCancelableConsistency[A](fa: F[A]) =
    ConcurrentEffect.toIOFromRunCancelable(fa) <-> F.toIO(fa)
}

object ConcurrentEffectLaws {
  def apply[F[_]](implicit F0: ConcurrentEffect[F], contextShift0: ContextShift[F]): ConcurrentEffectLaws[F] =
    new ConcurrentEffectLaws[F] {
      val F = F0
      val contextShift = contextShift0
    }
}