Skip to content

Commit f141b0e

Browse files
committed
Add waitForAllTaskTermination
Fix segmentation fault on Carthage termination. Carthage/Carthage#474 (comment)
1 parent 3fedab1 commit f141b0e

1 file changed

Lines changed: 25 additions & 6 deletions

File tree

ReactiveTask/Task.swift

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public struct TaskDescription {
4343
self.environment = environment
4444
self.standardInput = standardInput
4545
}
46+
47+
/// A GCD group which to wait completion
48+
private static var group = dispatch_group_create()
49+
50+
/// wait for all task termination
51+
public static func waitForAllTaskTermination() {
52+
dispatch_group_wait(TaskDescription.group, DISPATCH_TIME_FOREVER)
53+
}
4654
}
4755

4856
extension TaskDescription: Printable {
@@ -63,6 +71,9 @@ private final class Pipe {
6371

6472
/// A GCD queue upon which to deliver I/O callbacks.
6573
let queue: dispatch_queue_t
74+
75+
/// A GCD group which to wait completion
76+
let group: dispatch_group_t
6677

6778
/// Creates an NSFileHandle corresponding to the `readFD`. The file handle
6879
/// will not automatically close the descriptor.
@@ -77,20 +88,21 @@ private final class Pipe {
7788
}
7889

7990
/// Initializes a pipe object using existing file descriptors.
80-
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t) {
91+
init(readFD: Int32, writeFD: Int32, queue: dispatch_queue_t, group: dispatch_group_t) {
8192
precondition(readFD >= 0)
8293
precondition(writeFD >= 0)
8394

8495
self.readFD = readFD
8596
self.writeFD = writeFD
8697
self.queue = queue
98+
self.group = group
8799
}
88100

89101
/// Instantiates a new descriptor pair.
90-
class func create(queue: dispatch_queue_t) -> Result<Pipe, ReactiveTaskError> {
102+
class func create(queue: dispatch_queue_t, _ group: dispatch_group_t) -> Result<Pipe, ReactiveTaskError> {
91103
var fildes: [Int32] = [ 0, 0 ]
92104
if pipe(&fildes) == 0 {
93-
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue))
105+
return .success(self(readFD: fildes[0], writeFD: fildes[1], queue: queue, group: group))
94106
} else {
95107
return .failure(.POSIXError(errno))
96108
}
@@ -109,6 +121,7 @@ private final class Pipe {
109121
/// anywhere else, as it may close unexpectedly.
110122
func transferReadsToProducer() -> SignalProducer<dispatch_data_t, ReactiveTaskError> {
111123
return SignalProducer { observer, disposable in
124+
dispatch_group_enter(self.group)
112125
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in
113126
if error == 0 {
114127
sendCompleted(observer)
@@ -119,6 +132,7 @@ private final class Pipe {
119132
}
120133

121134
close(self.readFD)
135+
dispatch_group_leave(self.group)
122136
}
123137

124138
dispatch_io_set_low_water(channel, 1)
@@ -154,6 +168,7 @@ private final class Pipe {
154168
/// Returns a producer that will complete or error.
155169
func writeDataFromProducer(producer: SignalProducer<NSData, NoError>) -> SignalProducer<(), ReactiveTaskError> {
156170
return SignalProducer { observer, disposable in
171+
dispatch_group_enter(self.group)
157172
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.writeFD, self.queue) { error in
158173
if error == 0 {
159174
sendCompleted(observer)
@@ -164,6 +179,7 @@ private final class Pipe {
164179
}
165180

166181
close(self.writeFD)
182+
dispatch_group_leave(self.group)
167183
}
168184

169185
producer.startWithSignal { signal, producerDisposable in
@@ -363,6 +379,7 @@ public func ignoreTaskData<T, Error>(signal: Signal<TaskEvent<T>, Error>) -> Sig
363379
public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> {
364380
return SignalProducer { observer, disposable in
365381
let queue = dispatch_queue_create(taskDescription.description, DISPATCH_QUEUE_SERIAL)
382+
let group = TaskDescription.group
366383

367384
let task = NSTask()
368385
task.launchPath = taskDescription.launchPath
@@ -379,7 +396,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
379396
var stdinProducer: SignalProducer<(), ReactiveTaskError> = .empty
380397

381398
if let input = taskDescription.standardInput {
382-
switch Pipe.create(queue) {
399+
switch Pipe.create(queue, group) {
383400
case let .Success(pipe):
384401
task.standardInput = pipe.value.readHandle
385402

@@ -392,11 +409,11 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
392409
}
393410
}
394411

395-
SignalProducer(result: Pipe.create(queue))
412+
SignalProducer(result: Pipe.create(queue, group))
396413
|> flatMap(.Concat) { stdoutPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
397414
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe)
398415

399-
return SignalProducer(result: Pipe.create(queue))
416+
return SignalProducer(result: Pipe.create(queue, group))
400417
|> flatMap(.Merge) { stderrPipe -> SignalProducer<TaskEvent<NSData>, ReactiveTaskError> in
401418
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)
402419

@@ -449,6 +466,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
449466
task.standardOutput = stdoutPipe.writeHandle
450467
task.standardError = stderrPipe.writeHandle
451468

469+
dispatch_group_enter(group)
452470
task.terminationHandler = { task in
453471
let terminationStatus = task.terminationStatus
454472
if terminationStatus == EXIT_SUCCESS {
@@ -469,6 +487,7 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
469487
}
470488
|> start(observer)
471489
}
490+
dispatch_group_leave(group)
472491
}
473492

474493
task.launch()

0 commit comments

Comments
 (0)