Description

This particular data migration was performed on the staging environment. It is required to be performed on the production environment as well. Other than that, it could be a good reference on how to perform a data migration on a clickhouse database in general.

Keep the old data safe

RENAME TABLE subscriber_sources_activities TO subscriber_sources_activities_old;

create a new table with a relevant schema

CREATE TABLE statistics.subscriber_sources_activities
(
    `datetime` Date DEFAULT '1970-01-01',
    `key` String,
    `value` String,
    `channel_id` Int64,
    `sent_count` Int32,
    `delivered_count` Int32,
    `clicked_count` Int32,
    `closed_count` Int32,
    `subscribed_count` Int32,
    `unsubscribed_count` Int32,
    `page_view_count` Int32
)
ENGINE = SummingMergeTree((sent_count, delivered_count, clicked_count, closed_count, subscribed_count, unsubscribed_count, page_view_count))
PARTITION BY toYYYYMM(datetime)
ORDER BY (datetime, channel_id, key, value)
SETTINGS index_granularity = 8192

Prepare data migration insert statements

select concat('insert into subscriber_sources_activities (datetime, key, value, channel_id, sent_count, delivered_count, clicked_count, closed_count, subscribed_count, unsubscribed_count, page_view_count) select datetime, key, value, channel_id, sent_count, delivered_count, clicked_count, closed_count, subscribed_count, unsubscribed_count, toInt32(0) as page_view_count from subscriber_sources_activities_old where toYYYYMM(datetime)=',partition) as cmd
from system.parts
where database='statistics' and table='subscriber_sources_activities_old'
group by database, table, partition
order by partition

Update stream and materialized view

  1. Drop old stream
DROP TABLE statistics.subscriber_sources_activities_stream;
  1. Create a new stream
CREATE TABLE statistics.subscriber_sources_activities_stream
(
    `timestamp` Int32,
    `key` String,
    `value` String,
    `channel_id` Int64,
    `sent_count` Int32,
    `delivered_count` Int32,
    `clicked_count` Int32,
    `closed_count` Int32,
    `subscribed_count` Int32,
    `unsubscribed_count` Int32,
    `page_view_count` Int32
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092', kafka_topic_list = 'subscriber_sources_activities', kafka_format = 'AvroConfluent', kafka_skip_broken_messages = 0, kafka_group_name = 'statistics', kafka_num_consumers = 1
  1. Drop the old materialized view
DROP VIEW statistics.subscribers_source_activities_stream_buffer
  1. Create a new materialized view
CREATE MATERIALIZED VIEW statistics.subscribers_source_activities_stream_buffer TO statistics.subscriber_sources_activities
(
    `datetime` DateTime('UTC'),
    `key` String,
    `value` String,
    `channel_id` Int64,
    `sent_count` Int32,
    `delivered_count` Int32,
    `clicked_count` Int32,
    `closed_count` Int32,
    `subscribed_count` Int32,
    `unsubscribed_count` Int32,
    `page_view_count` Int32
) AS
SELECT
    toStartOfDay(toDateTime64(timestamp, 0)) AS datetime,
    key,
    value,
    channel_id,
    sent_count,
    delivered_count,
    clicked_count,
    closed_count,
    subscribed_count,
    unsubscribed_count,
    page_view_count
FROM statistics.subscriber_sources_activities_stream

The same steps but to be applied to the page_events table

RENAME TABLE page_events TO page_events_old;
 
 
CREATE TABLE statistics.page_events
(
    `timestamp` Int32,
    `channel_id` Int64,
    `visit_count` Int32,
    `browser` String,
    `subscriber_uuid` String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(toDateTime64(timestamp, 0))
ORDER BY (timestamp, channel_id)
SETTINGS index_granularity = 8192
 
 
select concat('insert into page_events (timestamp, channel_id, visit_count, browser, subscriber_uuid) select timestamp, channel_id, visit_count, browser, toString(\'\') as subscriber_uuid from page_events_old where toYYYYMM(toDateTime64(timestamp, 0))=',partition) as cmd 
from system.parts
where database='statistics' and table='page_events_old'
group by database, table, partition
order by partition
 
 
DROP TABLE statistics.page_events_stream;
 
 
CREATE TABLE statistics.page_events_stream
(
    `timestamp` Int32,
    `channel_id` Int64,
    `visit_count` Int32,
    `browser` String,
    `subscriber_uuid` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka-headless.kafka:9092', kafka_topic_list = 'page_events', kafka_format = 'AvroConfluent', kafka_skip_broken_messages = 0, kafka_group_name = 'statistics', kafka_num_consumers = 1
 
 
DROP VIEW statistics.page_events_stream_to_table;
 
 
CREATE MATERIALIZED VIEW statistics.page_events_stream_to_table TO statistics.page_events
(
    `timestamp` Int32,
    `channel_id` Int64,
    `visit_count` Int32,
    `browser` String,
    `subscriber_uuid` String
) AS
SELECT *
FROM statistics.page_events_stream
  • affected: