summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Lindström <daniel.lindstrom@ericsson.com>2015-07-17 08:55:04 (GMT)
committerArun Raghavan <arun@centricular.com>2015-10-28 05:55:50 (GMT)
commit4125079b0eb86aadf2472617d220a6e9c6509d79 (patch)
tree005f932569858871d23d7d321d0b57f5173e1feb
parent5e9d8ce15b7b9460d6028b27b69dce1506a87514 (diff)
downloadopenwebrtc-gst-plugins-4125079b0eb86aadf2472617d220a6e9c6509d79.tar.gz
openwebrtc-gst-plugins-4125079b0eb86aadf2472617d220a6e9c6509d79.tar.xz
scream: Using g_async_queue instead of gst_data_queue
Using g_async_queue instead of gst_data_queue because the timeout function is needed.
-rw-r--r--gst/scream/gstscreamqueue.c26
-rw-r--r--gst/scream/gstscreamqueue.h3
2 files changed, 11 insertions, 18 deletions
diff --git a/gst/scream/gstscreamqueue.c b/gst/scream/gstscreamqueue.c
index 4dc36c3..40b9bf3 100644
--- a/gst/scream/gstscreamqueue.c
+++ b/gst/scream/gstscreamqueue.c
@@ -288,10 +288,7 @@ static void gst_scream_queue_init(GstScreamQueue *self)
(GstDataQueueEmptyCallback)data_queue_empty_cb, self);
self->number_of_approved_packets = 0;
- self->incoming_packets = gst_data_queue_new(
- (GstDataQueueCheckFullFunction)data_queue_check_full_cb,
- (GstDataQueueFullCallback)data_queue_full_cb,
- (GstDataQueueEmptyCallback)data_queue_empty_cb, self);
+ self->incoming_packets = g_async_queue_new();
self->priority = DEFAULT_PRIORITY;
self->pass_through = DEFAULT_PASS_THROUGH;
@@ -309,11 +306,11 @@ static void gst_scream_queue_finalize(GObject *object)
}
gst_object_unref(self->approved_packets);
- while (!gst_data_queue_is_empty(self->incoming_packets)) {
- gst_data_queue_pop(self->incoming_packets, &item);
+ while(item = (GstDataQueueItem *)g_async_queue_try_pop(self->incoming_packets)) {
item->destroy(item);
}
- gst_object_unref(self->incoming_packets);
+
+ g_async_queue_unref(self->incoming_packets);
g_hash_table_unref(self->streams);
g_hash_table_unref(self->adapted_stream_ids);
@@ -441,10 +438,8 @@ static GstFlowReturn gst_scream_queue_sink_chain(GstPad *pad, GstObject *parent,
goto end;
}
- if (G_UNLIKELY(!gst_data_queue_push(self->incoming_packets, (GstDataQueueItem *)rtp_item))) {
- g_log(G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "Failed pusing RTP packet to incoming packet queue. flushing?");
- ((GstDataQueueItem *)rtp_item)->destroy(rtp_item);
- }
+ g_async_queue_push(self->incoming_packets, (gpointer)rtp_item);
+
end:
flow_ret = GST_PAD_IS_FLUSHING(pad) ? GST_FLOW_FLUSHING : flow_ret;
return flow_ret;
@@ -526,9 +521,8 @@ static void gst_scream_queue_srcpad_loop(GstScreamQueue *self)
}
self->next_approve_time = time_now_us + time_until_next_approve;
- if (!gst_data_queue_pop(self->incoming_packets, (GstDataQueueItem **)&item)) {
- /* flushing */
- g_warning("Failed to pop from incoming packets queue. Flushing?");
+ item = (GstDataQueueItem *)g_async_queue_timeout_pop(self->incoming_packets, time_until_next_approve);
+ if (!item) {
goto end;
}
@@ -732,9 +726,7 @@ static void gst_scream_queue_incoming_feedback(GstScreamQueue *self, guint ssrc,
rtcp_item->timestamp = timestamp;
rtcp_item->qbit = qbit;
- if (G_UNLIKELY(!gst_data_queue_push(self->incoming_packets, (GstDataQueueItem *)rtcp_item))) {
- ((GstDataQueueItem *)rtcp_item)->destroy(rtcp_item);
- }
+ g_async_queue_push(self->incoming_packets, (gpointer)rtcp_item);
}
static guint64 get_gst_time_us(GstScreamQueue *self)
diff --git a/gst/scream/gstscreamqueue.h b/gst/scream/gstscreamqueue.h
index ad019d2..f2b6043 100644
--- a/gst/scream/gstscreamqueue.h
+++ b/gst/scream/gstscreamqueue.h
@@ -58,7 +58,8 @@ struct _GstScreamQueue {
GHashTable *adapted_stream_ids;
GHashTable *ignored_stream_ids;
- GstDataQueue *incoming_packets;
+ /*GstDataQueue *incoming_packets;*/
+ GAsyncQueue *incoming_packets;
GstDataQueue *approved_packets;
guint number_of_approved_packets;
guint64 next_approve_time;