diff --git a/project_tasks/src/main/scala/Bank.scala b/project_tasks/src/main/scala/Bank.scala index e3e43de..22c481d 100644 --- a/project_tasks/src/main/scala/Bank.scala +++ b/project_tasks/src/main/scala/Bank.scala @@ -2,8 +2,14 @@ class Bank(val allowedAttempts: Integer = 3) { private val transactionsQueue: TransactionQueue = new TransactionQueue() private val processedTransactions: TransactionQueue = new TransactionQueue() + private var processingThreadsStarted = false; + private val processingThreads: List[Thread] = + (1 to 1).map(_ => new Thread { + override def run = processTransactions + }).toList def addTransactionToQueue(from: Account, to: Account, amount: Double): Unit = { + printf("[%s]: Added transaction to queue\n", Thread.currentThread().toString()) transactionsQueue.push(new Transaction( transactionsQueue, processedTransactions, @@ -13,36 +19,45 @@ class Bank(val allowedAttempts: Integer = 3) { 10, )) - Main.thread(processTransaction(transactionsQueue.pop())) - } - // TODO - // project task 2 - // create a new transaction object and put it in the queue - // spawn a thread that calls processTransactions - - // There are mixed instructions for this method. - // It's called `processTransactions`, indicating that it should - // process all lists, the part in the assigment pdf indicates this as well. - // However the comment below is written as if there is only one transaction to - // be processed, and the fact that `addTransactionToQueue` calls this method every - // time something is added, supports that theory as well. - // We just went with the most logical option... - private def processTransactions(trx: Transaction): Unit = { - // thread = Main.thread(trx) - // thread.join() - trx() - if (trx.status == TransactionStatus.PENDING && trx.attempt < trx.allowedAttemps) { - processTransactions(trx) - } else { - processedTransactions.push(trx); + if (!processingThreadsStarted) { + processingThreads.foreach(t => { + t.start + print("Starting processing thread\n") + }) + processingThreadsStarted = true; } } - // TODO - // project task 2 - // Function that pops a transaction from the queue - // and spawns a thread to execute the transaction. - // Finally do the appropriate thing, depending on whether - // the transaction succeeded or not + // TODO + // project task 2 + // create a new transaction object and put it in the queue + // spawn a thread that calls processTransactions + + // This function is a worker that continuously + // pops elements from the queue and processes them. + // Multiple of these can be run on separate threads. + private def processTransactions: Unit = { + if (transactionsQueue.isEmpty) { + Thread.sleep(50) + } else { + val trx = transactionsQueue.pop + + Main.thread(trx.run).join() + + if (trx.status == TransactionStatus.PENDING) { + transactionsQueue.push(trx); + } else { + processedTransactions.push(trx); + } + } + + processTransactions + } + // TODO + // project task 2 + // Function that pops a transaction from the queue + // and spawns a thread to execute the transaction. + // Finally do the appropriate thing, depending on whether + // the transaction succeeded or not def addAccount(initialBalance: Double): Account = { new Account(this, initialBalance) diff --git a/project_tasks/src/main/scala/Transaction.scala b/project_tasks/src/main/scala/Transaction.scala index 34f71fe..7868dec 100644 --- a/project_tasks/src/main/scala/Transaction.scala +++ b/project_tasks/src/main/scala/Transaction.scala @@ -16,16 +16,16 @@ class TransactionQueue { def pop: Transaction = {queue.synchronized(queue.dequeue())} // Return whether the queue is empty - def isEmpty: Boolean = {queue.synchronized(queue.isEmpty())} + def isEmpty: Boolean = {queue.synchronized(queue.isEmpty)} // Add new element to the back of the queue - def push(t: Transaction): Unit = {queue.synchronized(queue.push(t))} + def push(t: Transaction): Unit = {queue.synchronized(queue.enqueue(t))} // Return the first element from the queue without removing it - def peek: Transaction = {queue.synchronized(queue.front())} + def peek: Transaction = {queue.synchronized(queue.front)} // Return an iterator to allow you to iterate over the queue - def iterator: Iterator[Transaction] = {queue.synchronized(queue.iterator())} + def iterator: Iterator[Transaction] = {queue.synchronized(queue.iterator)} } class Transaction(val transactionsQueue: TransactionQueue, @@ -39,22 +39,31 @@ class Transaction(val transactionsQueue: TransactionQueue, var attempt = 0 override def run: Unit = { - - def doTransaction() = { - // TODO - project task 3 - // Extend this method to satisfy requirements. - from withdraw amount - to deposit amount - } - + def doTransaction(): Unit = { // TODO - project task 3 - // make the code below thread safe - if (status == TransactionStatus.PENDING) { - doTransaction - Thread.sleep(50) // you might want this to make more room for - // new transactions to be added to the queue + // Extend this method to satisfy requirements. + if (from.withdraw(amount).isRight) { + return } + if (to.deposit(amount).isRight) { + if (from.deposit(amount).isRight) print("oof") + return + } + status = TransactionStatus.SUCCESS } + + attempt += 1 + if (status != TransactionStatus.PENDING) { + return + } + from.synchronized(to.synchronized(doTransaction)) + + Thread.sleep(50) + + if (attempt >= allowedAttemps) { + status = TransactionStatus.FAILED + } + } }